crew/task_pool
The task_pool module implements a simple, generic task pool that can run
arbitrary functions passed to it using the generic crew pool under the hood.
Types
A builder for configuring a worker pool before starting it.
pub opaque type Builder
Values
pub fn fixed_size(builder: Builder, size: Int) -> Builder
Set the number of worker processes in the pool to a fixed number.
When set to less than 1 starting the pool will fail.
Example
task_pool.new(pool_name)
|> task_pool.fixed_size(8) // Use 8 workers regardless of CPU count
pub fn max_queue_length(
builder: Builder,
max_queue_length: Int,
) -> Builder
To avoid overloading the system, the internal work queue is limited.
If this limit is reached, callers of enqueue have to wait until enough
work has been done before continuing.
By default, the max_queue_size depends on the number workers in the pool.
Example
task_pool.new(pool_name)
|> task_pool.max_queue_length(100)
pub fn new(name: process.Name(PoolMsg)) -> Builder
Create a new worker pool builder with the given name.
The name is used to register the pool so that work can be sent to it.
By default, the pool will have a number of workers equal to the number of scheduler threads available on the system (typically the number of CPU cores).
Example
let pool_name = process.new_name("pool")
let builder = task_pool.new(pool_name)
pub fn parallel_map(
over list: List(a),
in pool: process.Name(PoolMsg),
timeout timeout: Int,
with fun: fn(a) -> b,
) -> List(b)
Apply a function to each element of a list concurrently using the worker pool.
This is similar to list.map but executes the mapping function
concurrently across all workers. Results are returned in the same
order as the input list.
There’s a bunch of extra overhead involved with spawning a work item per list element and making sure the order matches. Depending on your workload it might make sense to split your list into chunks first to reduce work queue pressure.
Parameters
list- The list of items to map overpool- The name of the pool to execute work ontimeout- Maximum time to wait for all mappings to complete in millisecondsfun- The function to apply to each item
Panics
- If the pool does not complete all mappings within the specified timeout
- If the pool is not running
- If any worker crashes while executing a mapping
Example
let user_ids = [1, 2, 3, 4, 5]
let users = task_pool.parallel_map(user_ids, pool_name, 5000, fetch_user_data)
pub fn run(
in pool: process.Name(PoolMsg),
timeout timeout: Int,
do work: fn() -> any,
) -> any
Execute a single piece of work on the pool and wait for the result.
This function blocks until the work is completed or the timeout is reached. The work function is executed on one of the worker processes in the pool.
Parameters
pool- The name of the pool to execute work ontimeout- Maximum time to wait for completion in millisecondswork- A function containing the work to be executed
Panics
- If the pool does not complete the work within the specified timeout
- If the pool is not running
- If the worker crashes while executing the work
Example
let response = task_pool.run(pool_name, 5000, fn() {
httpc.send(...)
})
pub fn run_all(
in pool: process.Name(PoolMsg),
timeout timeout: Int,
do work: List(fn() -> any),
) -> List(any)
Execute multiple pieces of work concurrently on the pool, without ordering guarantees.
Work is distributed among the available workers and executed concurrently. Results are returned in the order they complete, not the order they were submitted.
Parameters
pool- The name of the pool to execute work ontimeout- Maximum time to wait for all work to complete in millisecondswork- A list of functions containing work to be executed
Panics
- If the pool does not complete all work within the specified timeout
- If the pool is not running
- If any worker crashes while executing work
Example
let users = task_pool.run_all(pool_name, 10000, [
fn() { fetch_user_data(user1) },
fn() { fetch_user_data(user2) },
fn() { fetch_user_data(user3) },
])
pub fn run_sorted(
in pool: process.Name(PoolMsg),
timeout timeout: Int,
do work: List(fn() -> any),
) -> List(any)
Execute multiple pieces of work concurrently on the pool and return results in submission order.
Work functions are distributed across available workers and executed concurrently, but results are reordered to match the original submission order before being returned.
Parameters
pool- The name of the pool to execute work ontimeout- Maximum time to wait for all work to complete in millisecondswork- A list of functions containing work to be executed
Panics
- If the pool does not complete all work within the specified timeout
- If the pool is not running
- If any worker crashes while executing work
Example
let assert [user1, user2, user3] = task_pool.run_sorted(pool_name, 10000, [
fn() { fetch_user_data(user1) },
fn() { fetch_user_data(user2) },
fn() { fetch_user_data(user3) },
])
pub fn start(
builder: Builder,
) -> Result(static_supervisor.Supervisor, actor.StartError)
Start an unsupervised worker pool from the given builder.
Returns a supervisor that manages the pool and its workers. In most cases,
you should use supervised instead to get a child specification that can
be added to your application’s supervision tree.
Panics
This function will exit the process if any workers fail to start, similar
to static_supervisor.start.
Example
let assert Ok(pool_supervisor) =
task_pool.new(pool_name)
|> task_pool.start
pub fn submit(
pool: process.Name(PoolMsg),
timeout: Int,
receive: process.Subject(Result(result, process.ExitReason)),
work: fn() -> result,
) -> Nil
Submit a single piece of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.
This is a lower-level function for submitting work. It is the callers
responsibility to handle timeouts, submission order and failures.
Most users should prefer the run* functions.
The first time you enqueue is called from a process, the pool sets up
a monitor making sure work is cancelled when the process no longer exists
to receive a result. You can clean up this monitor early by using cancel.
pub fn submit_all(
pool: process.Name(PoolMsg),
timeout: Int,
receive: process.Subject(Result(result, process.ExitReason)),
work: List(fn() -> result),
) -> Nil
Submit multiple pieces of work to the pool using a subscription channel. The work will be done asynchronously and the result will be sent back to the provided subject. The timeout controls how long to wait for the work to be added to the queue successfully.
This is a lower-level function for submitting work. It is the callers
responsibility to handle timeouts, submission order and failures.
Most users should prefer the run* functions.
The first time you enqueue is called from a process, the pool sets up
a monitor making sure work is cancelled when the process no longer exists
to receive a result. You can clean up this monitor early by using cancel.
pub fn supervised(
builder: Builder,
) -> supervision.ChildSpecification(static_supervisor.Supervisor)
Create a child specification for a supervised worker pool.
This is the recommended way to start a worker pool as part of your
application’s supervision tree. The returned child specification can be
added to a supervisor using static_supervisor.add.
Example
let pool_spec =
task_pool.new(pool_name)
|> task_pool.fixed_size(4)
|> task_pool.supervised
let assert Ok(_) =
supervisor.new(supervisor.OneForOne)
|> supervisor.add(pool_spec)
|> supervisor.start