PhoenixMicro.Utils.WorkerPool (PhoenixMicro v1.0.0)

Copy Markdown View Source

A lightweight bounded worker pool built on Task.Supervisor.

Wraps Task.Supervisor with a semaphore to enforce a maximum concurrency limit, preventing message handlers from spawning unbounded processes when the downstream is slow.

Architecture

WorkerPool (GenServer)
 Task.Supervisor (owned)
 semaphore: :counters ref (atomic, lock-free)
 pending queue: :queue (messages waiting for a slot)

When all max_concurrency slots are taken:

  • New tasks are queued internally.
  • As workers complete, queued tasks are dispatched automatically.
  • If the queue exceeds max_queue_size, back-pressure is applied by returning {:error, :overloaded}.

Usage

# Start as part of a supervisor
children = [
  {PhoenixMicro.Utils.WorkerPool, name: :payment_pool, max_concurrency: 10}
]

# Submit work
case WorkerPool.submit(:payment_pool, fn -> process_payment(msg) end) do
  {:ok, task} -> :ok
  {:error, :overloaded} -> nack_message(msg)
end

# Submit and wait for result
{:ok, result} = WorkerPool.submit_await(:payment_pool, fn -> compute() end, 5_000)

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns current pool status: active workers, pending queue size.

Submits a function to the pool for async execution. Returns {:ok, task} or {:error, :overloaded} if the queue is full.

Submits a function and waits for its result. Returns {:ok, result} or {:error, :timeout | :overloaded}.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

status(pool \\ __MODULE__)

@spec status(GenServer.name()) :: %{
  active: integer(),
  pending: integer(),
  max_concurrency: integer()
}

Returns current pool status: active workers, pending queue size.

submit(pool \\ __MODULE__, fun)

@spec submit(GenServer.name(), (-> term())) :: {:ok, Task.t()} | {:error, :overloaded}

Submits a function to the pool for async execution. Returns {:ok, task} or {:error, :overloaded} if the queue is full.

submit_await(pool \\ __MODULE__, fun, timeout \\ 5000)

@spec submit_await(GenServer.name(), (-> term()), timeout()) ::
  {:ok, term()} | {:error, :timeout | :overloaded}

Submits a function and waits for its result. Returns {:ok, result} or {:error, :timeout | :overloaded}.