PhoenixGenApi.WorkerPool (PhoenixGenApi v1.1.2)

View Source

Generic worker pool for executing tasks asynchronously.

This module provides a pooling mechanism for executing async and stream tasks without spawning unlimited processes. Workers are supervised and reused across multiple requests.

Architecture

The worker pool consists of:

  • A pool of worker processes (GenServers)
  • A queue for pending tasks when all workers are busy
  • A supervisor managing the worker processes

Configuration

Configure pool sizes in your config.exs:

config :phoenix_gen_api, :worker_pool,
  async_pool_size: 100,
  stream_pool_size: 50,
  max_queue_size: 1000

Usage

# Execute a task asynchronously
WorkerPool.execute_async(:async_pool, fn ->
  # Your async work here
  result = do_work()
  send(caller_pid, {:result, result})
end)

# Execute a stream task
WorkerPool.execute_async(:stream_pool, fn ->
  # Your stream work here
  StreamCall.handle_stream(request, config)
end)

Worker States

Each worker can be in one of two states:

  • :idle - Available to accept new work
  • :busy - Currently executing a task

Queue Management

When all workers are busy, tasks are queued. If the queue exceeds the max size, new tasks will be rejected to prevent memory exhaustion.

Supervision

Workers are supervised and automatically restarted on failure. Failed tasks are not retried automatically - the caller should handle failures.

Summary

Functions

Returns a specification to start this module under a supervisor.

Executes a task asynchronously using the specified pool.

Starts the worker pool.

Gets the current status of the worker pool.

Types

pool_name()

@type pool_name() :: :async_pool | :stream_pool

task()

@type task() :: (-> any())

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute_async(pool_name, task)

@spec execute_async(pool_name(), task()) :: :ok | {:error, :queue_full}

Executes a task asynchronously using the specified pool.

Parameters

  • pool_name - The name of the worker pool
  • task - A zero-arity function to execute

Returns

  • :ok - Task was accepted (either started or queued)
  • {:error, :queue_full} - Queue is at maximum capacity

Examples

WorkerPool.execute_async(:async_pool, fn ->
  # Do async work
  process_data()
end)

start_link(opts)

Starts the worker pool.

Parameters

  • opts - Keyword list with:
    • :name - The pool name (required)
    • :pool_size - Number of workers (default: 10)
    • :max_queue_size - Maximum queued tasks (default: 1000)

status(pool_name)

@spec status(pool_name()) :: map()

Gets the current status of the worker pool.

Returns

A map containing:

  • :idle_workers - Number of idle workers
  • :busy_workers - Number of busy workers
  • :queued_tasks - Number of tasks in queue