crew/task_pool

The task_pool module implements a simple, generic task pool that can run arbitrary functions passed to it using the generic crew pool under the hood.

Types

A builder for configuring a worker pool before starting it.

pub opaque type Builder
pub type PoolMsg

Values

pub fn fixed_size(builder: Builder, size: Int) -> Builder

Set the number of worker processes in the pool to a fixed number.

When set to less than 1 starting the pool will fail.

Example

task_pool.new(pool_name)
|> task_pool.fixed_size(8)  // Use 8 workers regardless of CPU count
pub fn max_queue_length(
  builder: Builder,
  max_queue_length: Int,
) -> Builder

To avoid overloading the system, the internal work queue is limited.

If this limit is reached, callers of enqueue have to wait until enough work has been done before continuing.

By default, the max_queue_size depends on the number workers in the pool.

Example

task_pool.new(pool_name)
|> task_pool.max_queue_length(100)
pub fn new(name: process.Name(PoolMsg)) -> Builder

Create a new worker pool builder with the given name.

The name is used to register the pool so that work can be sent to it.

By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).

Example

let pool_name = process.new_name("pool")
let builder = task_pool.new(pool_name)
pub fn parallel_map(
  over list: List(a),
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  with fun: fn(a) -> b,
) -> List(b)

Apply a function to each element of a list concurrently using the worker pool.

This is similar to list.map but executes the mapping function concurrently across all workers. Results are returned in the same order as the input list.

There’s a bunch of extra overhead involved with spawning a work item per list element and making sure the order matches. Depending on your workload it might make sense to split your list into chunks first to reduce work queue pressure.

Parameters

  • list - The list of items to map over
  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all mappings to complete in milliseconds
  • fun - The function to apply to each item

Panics

  • If the pool does not complete all mappings within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing a mapping

Example

let user_ids = [1, 2, 3, 4, 5]
let users = task_pool.parallel_map(user_ids, pool_name, 5000, fetch_user_data)
pub fn run(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: fn() -> any,
) -> any

Execute a single piece of work on the pool and wait for the result.

This function blocks until the work is completed or the timeout is reached. The work function is executed on one of the worker processes in the pool.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for completion in milliseconds
  • work - A function containing the work to be executed

Panics

  • If the pool does not complete the work within the specified timeout
  • If the pool is not running
  • If the worker crashes while executing the work

Example

let response = task_pool.run(pool_name, 5000, fn() {
  httpc.send(...)
})
pub fn run_all(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: List(fn() -> any),
) -> List(any)

Execute multiple pieces of work concurrently on the pool, without ordering guarantees.

Work is distributed among the available workers and executed concurrently. Results are returned in the order they complete, not the order they were submitted.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all work to complete in milliseconds
  • work - A list of functions containing work to be executed

Panics

  • If the pool does not complete all work within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing work

Example

let users = task_pool.run_all(pool_name, 10000, [
  fn() { fetch_user_data(user1) },
  fn() { fetch_user_data(user2) },
  fn() { fetch_user_data(user3) },
])
pub fn run_sorted(
  in pool: process.Name(PoolMsg),
  timeout timeout: Int,
  do work: List(fn() -> any),
) -> List(any)

Execute multiple pieces of work concurrently on the pool and return results in submission order.

Work functions are distributed across available workers and executed concurrently, but results are reordered to match the original submission order before being returned.

Parameters

  • pool - The name of the pool to execute work on
  • timeout - Maximum time to wait for all work to complete in milliseconds
  • work - A list of functions containing work to be executed

Panics

  • If the pool does not complete all work within the specified timeout
  • If the pool is not running
  • If any worker crashes while executing work

Example

let assert [user1, user2, user3] = task_pool.run_sorted(pool_name, 10000, [
  fn() { fetch_user_data(user1) },
  fn() { fetch_user_data(user2) },
  fn() { fetch_user_data(user3) },
])
pub fn start(
  builder: Builder,
) -> Result(static_supervisor.Supervisor, actor.StartError)

Start an unsupervised worker pool from the given builder.

Returns a supervisor that manages the pool and its workers. In most cases, you should use supervised instead to get a child specification that can be added to your application’s supervision tree.

Panics

This function will exit the process if any workers fail to start, similar to static_supervisor.start.

Example

let assert Ok(pool_supervisor) =
  task_pool.new(pool_name)
  |> task_pool.start
pub fn submit(
  pool: process.Name(PoolMsg),
  timeout: Int,
  receive: process.Subject(Result(result, process.ExitReason)),
  work: fn() -> result,
) -> Nil

Submit a single piece of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.

This is a lower-level function for submitting work. It is the callers responsibility to handle timeouts, submission order and failures. Most users should prefer the run* functions.

The first time you enqueue is called from a process, the pool sets up a monitor making sure work is cancelled when the process no longer exists to receive a result. You can clean up this monitor early by using cancel.

pub fn submit_all(
  pool: process.Name(PoolMsg),
  timeout: Int,
  receive: process.Subject(Result(result, process.ExitReason)),
  work: List(fn() -> result),
) -> Nil

Submit multiple pieces of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.

This is a lower-level function for submitting work. It is the callers responsibility to handle timeouts, submission order and failures. Most users should prefer the run* functions.

The first time you enqueue is called from a process, the pool sets up a monitor making sure work is cancelled when the process no longer exists to receive a result. You can clean up this monitor early by using cancel.

pub fn supervised(
  builder: Builder,
) -> supervision.ChildSpecification(static_supervisor.Supervisor)

Create a child specification for a supervised worker pool.

This is the recommended way to start a worker pool as part of your application’s supervision tree. The returned child specification can be added to a supervisor using static_supervisor.add.

Example

let pool_spec =
  task_pool.new(pool_name)
  |> task_pool.fixed_size(4)
  |> task_pool.supervised

let assert Ok(_) =
  supervisor.new(supervisor.OneForOne)
  |> supervisor.add(pool_spec)
  |> supervisor.start
Search Document