PhoenixGenApi.WorkerPool (PhoenixGenApi v2.6.1)

Copy Markdown View Source

Generic worker pool for executing tasks asynchronously.

This module provides a pooling mechanism for executing async and stream tasks without spawning unlimited processes. Workers are supervised and reused across multiple requests.

Architecture

The worker pool consists of:

  • A pool of worker processes (GenServers)
  • A queue for pending tasks when all workers are busy
  • A supervisor managing the worker processes
  • Pool-level circuit breaker for fault tolerance

Configuration

Configure pool sizes in your config.exs:

config :phoenix_gen_api, :worker_pool,
  async_pool_size: 100,
  stream_pool_size: 50,
  max_queue_size: 1000,
  task_timeout: 30_000

Usage

# Execute a task asynchronously
WorkerPool.execute_async(:async_pool, fn ->
  # Your async work here
  result = process_data()
  send(caller_pid, {:result, result})
end)

# Execute a stream task
WorkerPool.execute_async(:stream_pool, fn ->
  # Your stream work here
  StreamCall.handle_stream(request, config)
end)

Worker States

Each worker can be in one of two states:

  • :idle - Available to accept new work
  • :busy - Currently executing a task

Queue Management

When all workers are busy, tasks are queued. If the queue exceeds the max size, new tasks will be rejected to prevent memory exhaustion.

Circuit Breaker

The pool tracks consecutive failures across all workers. If the failure rate exceeds the threshold, the pool enters a degraded state and rejects new tasks for a cooldown period.

Supervision

Workers are supervised and automatically restarted on failure. Failed tasks are not retried automatically - the caller should handle failures.

Summary

Functions

Returns a specification to start this module under a supervisor.

Executes a task asynchronously using the specified pool.

Starts the worker pool.

Gets the current status of the worker pool.

Types

pool_name()

@type pool_name() :: :async_pool | :stream_pool

task()

@type task() :: (-> any())

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute_async(pool_name, task)

@spec execute_async(pool_name(), task()) :: :ok | {:error, :queue_full}

Executes a task asynchronously using the specified pool.

Parameters

  • pool_name - The name of the worker pool
  • task - A zero-arity function to execute

Returns

  • :ok - Task was accepted (either started or queued)
  • {:error, :queue_full} - Queue is at maximum capacity

Examples

WorkerPool.execute_async(:async_pool, fn ->
  # Do async work
  process_data()
end)

start_link(opts)

Starts the worker pool.

Parameters

  • opts - Keyword list with:
    • :name - The pool name (required)
    • :pool_size - Number of workers (default: 10)
    • :max_queue_size - Maximum queued tasks (default: 1000)

status(pool_name)

@spec status(pool_name()) :: map()

Gets the current status of the worker pool.

Returns

A map containing:

  • :idle_workers - Number of idle workers
  • :busy_workers - Number of busy workers
  • :queued_tasks - Number of tasks in queue
  • :circuit_open - Whether circuit breaker is open
  • :total_tasks_executed - Total tasks executed since start
  • :total_tasks_failed - Total tasks failed since start