Tasque (Tasque v1.0.0)
View SourceAn asynchronous, bounded-concurrency task queue for Elixir.
Tasque lets you enqueue work — anonymous functions or MFA tuples — and
execute it asynchronously under a supervised Task.Supervisor, with a
configurable ceiling on simultaneous tasks. Results are delivered back to
the caller via plain OTP messages, so you can choose between blocking
(await/2) and non-blocking (selective receive) consumption styles.
Quick Start
Add a Tasque instance to your application's supervision tree:
children = [
{Tasque, name: MyApp.Queue, max_concurrency: 10}
]
Supervisor.start_link(children, strategy: :one_for_one)Then enqueue work from anywhere:
{:ok, ref} = Tasque.queue_task(MyApp.Queue, fn -> expensive_work() end)
# Block until the result arrives (or timeout):
{:ok, result} = Tasque.await(ref)Result Format
Every task result is delivered as a {:tasque_result, ref, outcome} message
to the process that called queue_task/3. The outcome is one of:
| Outcome | Meaning |
|---|---|
{:ok, result} | Task completed successfully and returned result |
{:exit, reason} | Task crashed, was killed, or timed out |
When using await/2, the outcome tuple is returned directly.
Task Formats
Tasks can be provided as either a zero-arity anonymous function or an
MFA tuple. MFA tuples are normalized internally to fn -> apply(m, f, a) end.
# Anonymous function
Tasque.queue_task(queue, fn -> String.upcase("hello") end)
# MFA tuple
Tasque.queue_task(queue, {String, :upcase, ["hello"]})Supervision Architecture
Each Tasque instance starts a small supervision subtree:
Tasque.Supervisor (:one_for_all)
├── Task.Supervisor — runs tasks via async_nolink
└── Tasque.Queue — GenServer managing the FIFO queueThe :one_for_all strategy ensures that if either child crashes, both
are restarted together, keeping the system in a consistent state. The
Task.Supervisor starts first so its name is available to Tasque.Queue
at init time.
Concurrency Control
When the number of in-flight tasks is below :max_concurrency, enqueued
work is dispatched immediately. Once the limit is reached, tasks wait in
a FIFO queue and are dispatched as running tasks complete, crash, or
time out — each of these events frees a concurrency slot.
Fault Isolation
Tasks run under Task.Supervisor.async_nolink/2, so a crashing task:
- Does not crash the queue GenServer
- Does not affect other concurrently running tasks
- Produces an
{:exit, reason}result delivered to the original caller - Frees its concurrency slot, allowing the next queued task to dispatch
Per-Task Timeouts
You can set a timeout on individual tasks at enqueue time:
Tasque.queue_task(queue, fn -> slow_work() end, timeout: 5_000)The timeout starts when the task is enqueued, so it includes time spent waiting in the queue as well as time spent running.
If the timeout fires while the task is still queued, the queue tombstones
it, delivers {:exit, :timeout} to the caller, and skips it during
dispatch.
If the timeout fires while the task is already running:
- The task process is killed (
Task.Supervisor.terminate_child/2) {:exit, :timeout}is delivered to the caller- The concurrency slot is freed
Pass timeout: :infinity to disable the per-task timeout. Otherwise,
:timeout must be a positive integer in milliseconds. If no :timeout
option is given, the task runs without a timeout.
Await timeout vs. task timeout
Tasque.await/2's timeout controls how long the caller is willing
to wait. It does not cancel the task — the task will continue
running, its result will still be delivered to the caller's mailbox,
and it will still occupy a concurrency slot.
The :timeout option on queue_task/3 is enforced by the queue
itself. If it fires while the task is already running, the task
process is terminated. If it fires while the task is still queued,
the task is dropped before dispatch.
Multiple Queues
You can run independent queues with different concurrency limits for different workload categories:
children = [
{Tasque, name: MyApp.CpuQueue, max_concurrency: System.schedulers_online()},
{Tasque, name: MyApp.IoQueue, max_concurrency: 50}
]Each queue maintains its own task supervisor, dispatch loop, and concurrency budget.
Caller Refs
The reference returned by queue_task/3 is a caller ref — a
make_ref() created by the queue GenServer and decoupled from the
internal Task ref used by the task supervisor. This means:
- The caller never holds a direct reference to the underlying task process
- Refs are unique per enqueue call, even if the same function is submitted twice
- The queue maintains the mapping from internal task refs to caller refs
Edge Cases
Caller dies before task completes — the result message is sent to a dead PID and silently dropped. The task still runs to completion (or timeout) and the concurrency slot is freed normally.
Await times out —
await/2returns{:error, :timeout}but the task keeps running. The{:tasque_result, ref, outcome}message will eventually arrive in the caller's mailbox. You can callawait/2again with the same ref to retrieve it, or use a selectivereceive.
Summary
Functions
Block the calling process until the task identified by ref completes
or the timeout expires.
Returns a child specification for starting a Tasque instance under a supervisor.
Enqueue a task for asynchronous execution. Returns immediately with
{:ok, ref}, where ref is a unique reference for tracking the result.
Starts a Tasque instance directly.
Types
Functions
@spec await(ref :: reference(), timeout :: timeout()) :: {:ok, result :: any()} | {:exit, reason :: any()} | {:error, :timeout}
Block the calling process until the task identified by ref completes
or the timeout expires.
Returns the task outcome directly:
{:ok, result}— task completed successfully{:exit, reason}— task crashed or was killed (including:timeout){:error, :timeout}— the await timed out (task is still running)
The default timeout is 5000 ms.
Important
An await timeout does not cancel the task. The task continues
running, occupies its concurrency slot, and its result message will
still arrive in the caller's mailbox. If you need to enforce a hard
deadline, use the :timeout option on queue_task/3 instead.
Examples
iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitDoc, fn -> 42 end)
iex> Tasque.await(ref)
{:ok, 42}With a custom timeout:
iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitTimeoutDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitTimeoutDoc, fn -> Process.sleep(:infinity) end)
iex> Tasque.await(ref, 50)
{:error, :timeout}
Returns a child specification for starting a Tasque instance under a supervisor.
This is invoked automatically when you use {Tasque, opts} tuple syntax in
a supervision tree.
Options
:name(required) — the name used to register the queue. Can be an atom, a{:global, term}tuple, or a{:via, module, term}tuple. Internal processes derive their names from this value. For example, ifname: MyApp.Queueis provided, the derived names areMyApp.Queue.TaskSupervisorandMyApp.Queue.Supervisor. For:globaland:vianames, Tasque derives matching companion names using the same naming strategy.:max_concurrency— the maximum number of tasks that may execute simultaneously. Defaults to10.
Examples
As part of a supervision tree:
children = [
{Tasque, name: MyApp.Queue, max_concurrency: 5}
]Or build the spec manually:
Tasque.child_spec(name: MyApp.Queue, max_concurrency: 5)
#=> %{id: MyApp.Queue, start: {Tasque.Supervisor, :start_link, [[name: MyApp.Queue, max_concurrency: 5]]}, type: :supervisor}
@spec queue_task(GenServer.server(), task(), keyword()) :: {:ok, ref :: reference()} | {:error, :invalid_task}
Enqueue a task for asynchronous execution. Returns immediately with
{:ok, ref}, where ref is a unique reference for tracking the result.
The result will be delivered as a {:tasque_result, ref, outcome} message
to the calling process. Use await/2 or a selective receive to consume it.
Task Formats
- Zero-arity function —
fn -> :work end - MFA tuple —
{Module, :function, [args]}
Options
:timeout— per-task timeout as a positive integer in milliseconds, or:infinity. The timeout starts when the task is enqueued, so it includes time spent waiting in the queue as well as time spent running. If it fires while queued, the task is dropped before dispatch; if it fires while running, the task process is killed. In either case,{:exit, :timeout}is delivered. If omitted, no timeout is applied.
Returns
{:ok, ref}on success.{:error, :invalid_task}if the task is neither a zero-arity function nor an MFA tuple.
Examples
iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskDoc, fn -> 1 + 1 end)
iex> is_reference(ref)
true
iex> Tasque.await(ref)
{:ok, 2}With an MFA tuple:
iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskMfaDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskMfaDoc, {String, :upcase, ["hello"]})
iex> Tasque.await(ref)
{:ok, "HELLO"}With a per-task timeout:
iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskTimeoutDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskTimeoutDoc, fn -> Process.sleep(:infinity) end, timeout: 50)
iex> receive do
...> {:tasque_result, ^ref, outcome} -> outcome
...> after
...> 1_000 -> :never_arrived
...> end
{:exit, :timeout}
Starts a Tasque instance directly.
This is primarily useful for testing or starting a queue dynamically.
In production applications, it is heavily recommended to start Tasque
as part of your application's supervision tree instead of calling
start_link/1 directly.
See child_spec/1 for supported options.