crew
A worker pool for Gleam that distributes work across a limited number of worker processes. Workers are pooled to avoid the overhead of spawning and killing processes for each task.
The pool manages a queue of work items and distributes them to idle workers. When no workers are available, work is queued until a worker becomes free. The pool handles worker crashes gracefully and automatically manages the worker lifecycle.
Example
import crew
import gleam/erlang/process
pub fn main() {
// Create a pool name
let pool_name = process.new_name("image_downloader")
// Start an unsupervised pool
let assert Ok(_) =
crew.new(pool_name, download_image)
|> crew.fixed_size(4)
|> crew.start
// Execute work on the pool
let result = crew.run(pool_name, 5000, "tasteful-ramen.jpeg")
}
Types
A builder for configuring a worker pool before starting it.
pub opaque type Builder(state, work, result)
Values
pub fn call(
in pool: process.Name(PoolMsg(work, result)),
timeout timeout: Int,
msg work: work,
) -> result
Send a single piece of work to one of the workers and wait for the result.
This function blocks until the work is completed or the timeout is reached. If no worker is available, the message will be queued and sent to the next free worker.
Parameters
pool- The name of the pool to execute work ontimeout- Maximum time to wait for completion in millisecondswork- The message to send to the worker function
Panics
- If the pool does not complete the work within the specified timeout
- If the pool is not running
- If the worker crashes while executing the work
pub fn call_parallel(
in pool: process.Name(PoolMsg(work, result)),
timeout timeout: Int,
msg work: List(work),
) -> List(result)
Send multiple pieces of work concurrently to the pool workers, without ordering guarantees.
Work is distributed among the available workers and executed concurrently. Results are returned in the order they complete, not the order they were submitted.
Parameters
pool- The name of the pool to execute work ontimeout- Maximum time to wait for all work to complete in millisecondswork- A list of messages describing the work to be done
Panics
- If the pool does not complete all work within the specified timeout
- If the pool is not running
- If any worker crashes while executing work
pub fn cancel(
pool: process.Name(PoolMsg(work, result)),
receive: process.Subject(Result(result, process.ExitReason)),
) -> Nil
The first time enqueue* is called from a process, the pool starts to monitor
that process and cancels all ongoing work in case it goes down.
Sometimes it is useful to manually unsubscribe and cancel all ongoing work
for a subject. Doing so will also remove the monitor added in the pool once
the last receive subject got cancelled.
Note that finished work might still arrive on this selector after
cancel got called.
pub fn fixed_size(
builder: Builder(state, work, result),
size: Int,
) -> Builder(state, work, result)
Set the number of worker processes in the pool to a fixed number.
When set to less than 1 starting the pool will fail.
Example
crew.new(pool_name)
|> crew.fixed_size(8) // Use 8 workers regardless of CPU count
pub fn max_queue_length(
builder: Builder(state, work, result),
max_queue_length: Int,
) -> Builder(state, work, result)
To avoid overloading the system, the internal work queue is limited.
If this limit is reached, callers of enqueue have to wait until enough
work has been done before continuing.
By default, the max_queue_size depends on the number of workers in the pool.
Example
crew.new(pool_name, worder)
|> crew.max_queue_length(100)
pub fn new(
name: process.Name(PoolMsg(work, result)),
work: fn(work) -> result,
) -> Builder(Nil, work, result)
Create a new worker pool builder with the given name.
The name is used to register the pool so that work can be sent to it.
By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).
Example
let pool_name = process.new_name("pool")
let builder = crew.new(pool_name, worker)
pub fn new_with_initialiser(
name: process.Name(PoolMsg(work, result)),
timeout init_timeout: Int,
init init: fn() -> state,
run work: fn(state, work) -> result,
) -> Builder(state, work, result)
Create a new worker pool builder with the given name and initialiser.
The name is used to register the pool so that work can be sent to it. The initialiser will run on the worker process before it registers itself as a worker.
By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).
pub fn new_with_state(
name: process.Name(PoolMsg(work, result)),
state: state,
work: fn(state, work) -> result,
) -> Builder(state, work, result)
Create a new worker pool builder with the given name and state.
The name is used to register the pool so that work can be sent to it.
By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).
pub fn start(
builder: Builder(state, work, result),
) -> Result(static_supervisor.Supervisor, actor.StartError)
Start an unsupervised worker pool from the given builder.
Returns a supervisor that manages the pool and its workers. In most cases,
you should use supervised instead to get a child specification that can
be added to your application’s supervision tree.
Panics
This function will exit the process if any workers fail to start, similar
to static_supervisor.start.
Example
let assert Ok(pool_supervisor) =
crew.new(pool_name)
|> crew.start
pub fn submit(
pool: process.Name(PoolMsg(work, result)),
timeout: Int,
receive: process.Subject(Result(result, process.ExitReason)),
work: work,
) -> Nil
Submit a single piece of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.
This is a lower-level function for submitting work. It is the callers
responsibility to handle timeouts, submission order and failures.
Most users should prefer call.
The first time you enqueue is called from a process, the pool sets up
a monitor making sure work is cancelled when the process no longer exists
to receive a result. You can clean up this monitor early by using cancel.
pub fn submit_all(
pool: process.Name(PoolMsg(work, result)),
timeout: Int,
receive: process.Subject(Result(result, process.ExitReason)),
work: List(work),
) -> Nil
Submit multiple pieces of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.
This is a lower-level function for submitting work. It is the callers
responsibility to handle timeouts, submission order and failures.
Most users should prefer the call_parallel function.
The first time you enqueue is called from a process, the pool sets up
a monitor making sure work is cancelled when the process no longer exists
to receive a result. You can clean up this monitor early by using cancel.
pub fn supervised(
builder: Builder(state, work, result),
) -> supervision.ChildSpecification(static_supervisor.Supervisor)
Create a child specification for a supervised worker pool.
This is the recommended way to start a worker pool as part of your
application’s supervision tree. The returned child specification can be
added to a supervisor using static_supervisor.add.
Example
let pool_spec =
crew.new(pool_name)
|> crew.fixed_size(4)
|> crew.supervised
let assert Ok(_) =
supervisor.new(supervisor.OneForOne)
|> supervisor.add(pool_spec)
|> supervisor.start