crew

A worker pool for Gleam that distributes work across a limited number of worker processes. Workers are pooled to avoid the overhead of spawning and killing processes for each task.

The pool manages a queue of work items and distributes them to idle workers. When no workers are available, work is queued until a worker becomes free. The pool handles worker crashes gracefully and automatically manages the worker lifecycle.

Example

import crew
import gleam/erlang/process

pub fn main() {
  // Create a pool name
  let pool_name = process.new_name("image_downloader")

  // Start an unsupervised pool
  let assert Ok(_) =
      crew.new(pool_name, download_image)
      |> crew.fixed_size(4)
      |> crew.start

  // Execute work on the pool
  let result = crew.run(pool_name, 5000, "tasteful-ramen.jpeg")
}

Types

A builder for configuring a worker pool before starting it.

pub opaque type Builder(state, work, result)
pub opaque type PoolMsg(work, result)

Values

pub fn call(
  in pool: process.Name(PoolMsg(work, result)),
  timeout timeout: Int,
  msg work: work,
) -> result

Send a single piece of work to one of the workers and wait for the result.

This function blocks until the work is completed or the timeout is reached. If no worker is available, the message will be queued and sent to the next free worker.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for completion in milliseconds
  • work - The message to send to the worker function

Panics

  • If the pool does not complete the work within the specified timeout
  • If the pool is not running
  • If the worker crashes while executing the work
pub fn call_parallel(
  in pool: process.Name(PoolMsg(work, result)),
  timeout timeout: Int,
  msg work: List(work),
) -> List(result)

Send multiple pieces of work concurrently to the pool workers, without ordering guarantees.

Work is distributed among the available workers and executed concurrently. Results are returned in the order they complete, not the order they were submitted.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all work to complete in milliseconds
  • work - A list of messages describing the work to be done

Panics

  • If the pool does not complete all work within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing work
pub fn cancel(
  pool: process.Name(PoolMsg(work, result)),
  receive: process.Subject(Result(result, process.ExitReason)),
) -> Nil

The first time enqueue* is called from a process, the pool starts to monitor that process and cancels all ongoing work in case it goes down.

Sometimes it is useful to manually unsubscribe and cancel all ongoing work for a subject. Doing so will also remove the monitor added in the pool once the last receive subject got cancelled.

Note that finished work might still arrive on this selector after cancel got called.

pub fn fixed_size(
  builder: Builder(state, work, result),
  size: Int,
) -> Builder(state, work, result)

Set the number of worker processes in the pool to a fixed number.

When set to less than 1 starting the pool will fail.

Example

crew.new(pool_name)
|> crew.fixed_size(8)  // Use 8 workers regardless of CPU count
pub fn max_queue_length(
  builder: Builder(state, work, result),
  max_queue_length: Int,
) -> Builder(state, work, result)

To avoid overloading the system, the internal work queue is limited.

If this limit is reached, callers of enqueue have to wait until enough work has been done before continuing.

By default, the max_queue_size depends on the number of workers in the pool.

Example

crew.new(pool_name, worder)
|> crew.max_queue_length(100)
pub fn new(
  name: process.Name(PoolMsg(work, result)),
  work: fn(work) -> result,
) -> Builder(Nil, work, result)

Create a new worker pool builder with the given name.

The name is used to register the pool so that work can be sent to it.

By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).

Example

let pool_name = process.new_name("pool")
let builder = crew.new(pool_name, worker)
pub fn new_with_initialiser(
  name: process.Name(PoolMsg(work, result)),
  timeout init_timeout: Int,
  init init: fn() -> state,
  run work: fn(state, work) -> result,
) -> Builder(state, work, result)

Create a new worker pool builder with the given name and initialiser.

The name is used to register the pool so that work can be sent to it. The initialiser will run on the worker process before it registers itself as a worker.

By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).

pub fn new_with_state(
  name: process.Name(PoolMsg(work, result)),
  state: state,
  work: fn(state, work) -> result,
) -> Builder(state, work, result)

Create a new worker pool builder with the given name and state.

The name is used to register the pool so that work can be sent to it.

By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).

pub fn start(
  builder: Builder(state, work, result),
) -> Result(static_supervisor.Supervisor, actor.StartError)

Start an unsupervised worker pool from the given builder.

Returns a supervisor that manages the pool and its workers. In most cases, you should use supervised instead to get a child specification that can be added to your application’s supervision tree.

Panics

This function will exit the process if any workers fail to start, similar to static_supervisor.start.

Example

let assert Ok(pool_supervisor) =
  crew.new(pool_name)
  |> crew.start
pub fn submit(
  pool: process.Name(PoolMsg(work, result)),
  timeout: Int,
  receive: process.Subject(Result(result, process.ExitReason)),
  work: work,
) -> Nil

Submit a single piece of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.

This is a lower-level function for submitting work. It is the callers responsibility to handle timeouts, submission order and failures. Most users should prefer call.

The first time you enqueue is called from a process, the pool sets up a monitor making sure work is cancelled when the process no longer exists to receive a result. You can clean up this monitor early by using cancel.

pub fn submit_all(
  pool: process.Name(PoolMsg(work, result)),
  timeout: Int,
  receive: process.Subject(Result(result, process.ExitReason)),
  work: List(work),
) -> Nil

Submit multiple pieces of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.

This is a lower-level function for submitting work. It is the callers responsibility to handle timeouts, submission order and failures. Most users should prefer the call_parallel function.

The first time you enqueue is called from a process, the pool sets up a monitor making sure work is cancelled when the process no longer exists to receive a result. You can clean up this monitor early by using cancel.

pub fn supervised(
  builder: Builder(state, work, result),
) -> supervision.ChildSpecification(static_supervisor.Supervisor)

Create a child specification for a supervised worker pool.

This is the recommended way to start a worker pool as part of your application’s supervision tree. The returned child specification can be added to a supervisor using static_supervisor.add.

Example

let pool_spec =
  crew.new(pool_name)
  |> crew.fixed_size(4)
  |> crew.supervised

let assert Ok(_) =
  supervisor.new(supervisor.OneForOne)
  |> supervisor.add(pool_spec)
  |> supervisor.start
Search Document