crew

A worker pool for Gleam that distributes work across a limited number of worker processes. Workers are pooled to avoid the overhead of spawning and killing processes for each task.

The pool manages a queue of work items and distributes them to idle workers. When no workers are available, work is queued until a worker becomes free. The pool handles worker crashes gracefully and automatically manages the worker lifecycle.

Example

import crew
import gleam/otp/static_supervisor as supervisor

pub fn main() {
  // Create a pool name
  let pool_name = process.new_name("my_crew")

  // Start an unsupervised pool
  let assert Ok(_) =
      crew.new(pool_name)
      |> crew.fixed_size(4)
      |> crew.start

  // Execute work on the pool
  let result = crew.work(pool_name, 5000, fn() {
    // Some expensive computation
    expensive_computation()
  })
}

Types

A builder for configuring a worker pool before starting it.

pub opaque type Builder

A channel represents a typed subscription to a worker pool for receiving work results. This allows you to send work and receive results asynchronously.

pub opaque type Channel(a)
pub opaque type PoolMsg

Values

pub fn enqueue(channel: Channel(a), work: fn() -> a) -> Nil

Submit a single piece of work to the pool using a subscription channel.

This is a lower-level function for submitting work. Results must be handled using a selector with select_map. It is the callers responsibility to handle timeouts and submission order. Most users should prefer the work* functions.

pub fn enqueue_many(
  channel: Channel(a),
  work: List(fn() -> a),
) -> Nil

Submit multiple pieces of work to the pool using a subscription channel.

This is a lower-level function for submitting work. Results must be handled using a selector with select_map. It is the callers responsibility to handle timeouts and submission order. Most users should prefer the work* functions.

pub fn fixed_size(builder: Builder, size: Int) -> Builder

Set the number of worker processes in the pool to a fixed number.

When set to less than 1 starting the pool will fail.

Example

crew.new(pool_name)
|> crew.fixed_size(8)  // Use 8 workers regardless of CPU count
pub fn new(name: process.Name(PoolMsg)) -> Builder

Create a new worker pool builder with the given name.

The name is used to register the pool so that work can be sent to it.

By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).

Example

let pool_name = process.new_name("pool")
let builder = crew.new(pool_name)
pub fn parallel_map(
  over list: List(a),
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  with fun: fn(a) -> b,
) -> List(b)

Apply a function to each element of a list concurrently using the worker pool.

This is similar to list.map but executes the mapping function concurrently across all workers. Results are returned in the same order as the input list.

There’s a bunch of extra overhead involved with spawning a work item per list element and making sure the order matches. Depending on your workload it might make sense to split your list into chunks first to reduce work queue pressure.

Parameters

  • list - The list of items to map over
  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all mappings to complete in milliseconds
  • fun - The function to apply to each item

Panics

  • If the pool does not complete all mappings within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing a mapping

Example

let user_ids = [1, 2, 3, 4, 5]
let users = crew.parallel_map(user_ids, pool_name, 5000, fetch_user_data)
pub fn select_map(
  selector: process.Selector(msg),
  channel: Channel(a),
  tagger: fn(a) -> msg,
) -> process.Selector(msg)

Add a worker pool channel to a selector for receiving work results.

This allows you to receive work results as part of the larger message-handling loop. Used in conjunction with subscribe and enqueue for lower-level pool usage.

Work results arrive in completion order. It is your responsibility to handle the lifecycle and ordering.

This adds a specific monitor to the selector to handle the case where the pool goes down, so it’s better to prefer few channels over many.

Panics

  • If a worker or the pool crashes while executing work, the selector will panic with details about the crash.
pub fn start(
  builder: Builder,
) -> Result(static_supervisor.Supervisor, actor.StartError)

Start an unsupervised worker pool from the given builder.

Returns a supervisor that manages the pool and its workers. In most cases, you should use supervised instead to get a child specification that can be added to your application’s supervision tree.

Panics

This function will exit the process if any workers fail to start, similar to static_supervisor.start.

Example

let assert Ok(pool_supervisor) =
  crew.new(pool_name)
  |> crew.start
pub fn subscribe(pool: process.Name(PoolMsg)) -> Channel(a)

Create a typed subscription channel to the worker pool for work submission.

This function provides a lower-level interface for submitting work to the pool. In many cases calling the work* function from a separate process will be easier.

Subscribing to the pool creates a reference to the current process in the pool that must be cleaned up using unsubscribe. The returned channel can be used with select_map and enqueue for custom message handling.

Panics

  • If the pool is not running

Example

let channel = crew.subscribe(pool_name)
crew.enqueue(channel, fn() { some_work() })
// Handle results with selector...
crew.unsubscribe(channel, selector)
pub fn supervised(
  builder: Builder,
) -> supervision.ChildSpecification(static_supervisor.Supervisor)

Create a child specification for a supervised worker pool.

This is the recommended way to start a worker pool as part of your application’s supervision tree. The returned child specification can be added to a supervisor using static_supervisor.add.

Example

let pool_spec =
  crew.new(pool_name)
  |> crew.fixed_size(4)
  |> crew.supervised

let assert Ok(_) =
  supervisor.new(supervisor.OneForOne)
  |> supervisor.add(pool_spec)
  |> supervisor.start
pub fn unsubscribe(
  channel: Channel(a),
  selector: process.Selector(msg),
) -> process.Selector(msg)

Remove a subscription channel from the pool and clean up resources.

This function should be called when you’re done using a channel obtained from subscribe. It stops any in-progress work for this channel and removes the channel’s handlers from the selector.

Any finished work that is still coming in is dropped.

pub fn work(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: fn() -> any,
) -> any

Execute a single piece of work on the pool and wait for the result.

This function blocks until the work is completed or the timeout is reached. The work function is executed on one of the worker processes in the pool.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for completion in milliseconds
  • work - A function containing the work to be executed

Panics

  • If the pool does not complete the work within the specified timeout
  • If the pool is not running
  • If the worker crashes while executing the work

Example

let response = crew.work(pool_name, 5000, fn() {
  httpc.send(...)
})
pub fn work_many(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: List(fn() -> any),
) -> List(any)

Execute multiple pieces of work concurrently on the pool, without ordering guarantees.

Work is distributed among the available workers and executed concurrently. Results are returned in the order they complete, not the order they were submitted.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all work to complete in milliseconds
  • work - A list of functions containing work to be executed

Panics

  • If the pool does not complete all work within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing work

Example

let users = crew.work_many(pool_name, 10000, [
  fn() { fetch_user_data(user1) },
  fn() { fetch_user_data(user2) },
  fn() { fetch_user_data(user3) },
])
pub fn work_ordered(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: List(fn() -> any),
) -> List(any)

Execute multiple pieces of work concurrently on the pool and return results in submission order.

Work functions are distributed across available workers and executed concurrently, but results are reordered to match the original submission order before being returned.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all work to complete in milliseconds
  • work - A list of functions containing work to be executed

Panics

  • If the pool does not complete all work within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing work

Example

let assert [user1, user2, user3] = crew.work_ordered(pool_name, 10000, [
  fn() { fetch_user_data(user1) },
  fn() { fetch_user_data(user2) },
  fn() { fetch_user_data(user3) },
])
Search Document