Tasque (Tasque v1.0.0)

View Source

An asynchronous, bounded-concurrency task queue for Elixir.

Tasque lets you enqueue work — anonymous functions or MFA tuples — and execute it asynchronously under a supervised Task.Supervisor, with a configurable ceiling on simultaneous tasks. Results are delivered back to the caller via plain OTP messages, so you can choose between blocking (await/2) and non-blocking (selective receive) consumption styles.

Quick Start

Add a Tasque instance to your application's supervision tree:

children = [
  {Tasque, name: MyApp.Queue, max_concurrency: 10}
]

Supervisor.start_link(children, strategy: :one_for_one)

Then enqueue work from anywhere:

{:ok, ref} = Tasque.queue_task(MyApp.Queue, fn -> expensive_work() end)

# Block until the result arrives (or timeout):
{:ok, result} = Tasque.await(ref)

Result Format

Every task result is delivered as a {:tasque_result, ref, outcome} message to the process that called queue_task/3. The outcome is one of:

OutcomeMeaning
{:ok, result}Task completed successfully and returned result
{:exit, reason}Task crashed, was killed, or timed out

When using await/2, the outcome tuple is returned directly.

Task Formats

Tasks can be provided as either a zero-arity anonymous function or an MFA tuple. MFA tuples are normalized internally to fn -> apply(m, f, a) end.

# Anonymous function
Tasque.queue_task(queue, fn -> String.upcase("hello") end)

# MFA tuple
Tasque.queue_task(queue, {String, :upcase, ["hello"]})

Supervision Architecture

Each Tasque instance starts a small supervision subtree:

Tasque.Supervisor (:one_for_all)
 Task.Supervisor    runs tasks via async_nolink
 Tasque.Queue       GenServer managing the FIFO queue

The :one_for_all strategy ensures that if either child crashes, both are restarted together, keeping the system in a consistent state. The Task.Supervisor starts first so its name is available to Tasque.Queue at init time.

Concurrency Control

When the number of in-flight tasks is below :max_concurrency, enqueued work is dispatched immediately. Once the limit is reached, tasks wait in a FIFO queue and are dispatched as running tasks complete, crash, or time out — each of these events frees a concurrency slot.

Fault Isolation

Tasks run under Task.Supervisor.async_nolink/2, so a crashing task:

  • Does not crash the queue GenServer
  • Does not affect other concurrently running tasks
  • Produces an {:exit, reason} result delivered to the original caller
  • Frees its concurrency slot, allowing the next queued task to dispatch

Per-Task Timeouts

You can set a timeout on individual tasks at enqueue time:

Tasque.queue_task(queue, fn -> slow_work() end, timeout: 5_000)

The timeout starts when the task is enqueued, so it includes time spent waiting in the queue as well as time spent running.

If the timeout fires while the task is still queued, the queue tombstones it, delivers {:exit, :timeout} to the caller, and skips it during dispatch.

If the timeout fires while the task is already running:

  1. The task process is killed (Task.Supervisor.terminate_child/2)
  2. {:exit, :timeout} is delivered to the caller
  3. The concurrency slot is freed

Pass timeout: :infinity to disable the per-task timeout. Otherwise, :timeout must be a positive integer in milliseconds. If no :timeout option is given, the task runs without a timeout.

Await timeout vs. task timeout

Tasque.await/2's timeout controls how long the caller is willing to wait. It does not cancel the task — the task will continue running, its result will still be delivered to the caller's mailbox, and it will still occupy a concurrency slot.

The :timeout option on queue_task/3 is enforced by the queue itself. If it fires while the task is already running, the task process is terminated. If it fires while the task is still queued, the task is dropped before dispatch.

Multiple Queues

You can run independent queues with different concurrency limits for different workload categories:

children = [
  {Tasque, name: MyApp.CpuQueue, max_concurrency: System.schedulers_online()},
  {Tasque, name: MyApp.IoQueue,  max_concurrency: 50}
]

Each queue maintains its own task supervisor, dispatch loop, and concurrency budget.

Caller Refs

The reference returned by queue_task/3 is a caller ref — a make_ref() created by the queue GenServer and decoupled from the internal Task ref used by the task supervisor. This means:

  • The caller never holds a direct reference to the underlying task process
  • Refs are unique per enqueue call, even if the same function is submitted twice
  • The queue maintains the mapping from internal task refs to caller refs

Edge Cases

  • Caller dies before task completes — the result message is sent to a dead PID and silently dropped. The task still runs to completion (or timeout) and the concurrency slot is freed normally.

  • Await times outawait/2 returns {:error, :timeout} but the task keeps running. The {:tasque_result, ref, outcome} message will eventually arrive in the caller's mailbox. You can call await/2 again with the same ref to retrieve it, or use a selective receive.

Summary

Functions

Block the calling process until the task identified by ref completes or the timeout expires.

Returns a child specification for starting a Tasque instance under a supervisor.

Enqueue a task for asynchronous execution. Returns immediately with {:ok, ref}, where ref is a unique reference for tracking the result.

Starts a Tasque instance directly.

Types

task()

@type task() :: (-> any()) | {module(), atom(), [any()]}

Functions

await(ref, timeout \\ 5000)

@spec await(ref :: reference(), timeout :: timeout()) ::
  {:ok, result :: any()} | {:exit, reason :: any()} | {:error, :timeout}

Block the calling process until the task identified by ref completes or the timeout expires.

Returns the task outcome directly:

  • {:ok, result} — task completed successfully
  • {:exit, reason} — task crashed or was killed (including :timeout)
  • {:error, :timeout} — the await timed out (task is still running)

The default timeout is 5000 ms.

Important

An await timeout does not cancel the task. The task continues running, occupies its concurrency slot, and its result message will still arrive in the caller's mailbox. If you need to enforce a hard deadline, use the :timeout option on queue_task/3 instead.

Examples

iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitDoc, fn -> 42 end)
iex> Tasque.await(ref)
{:ok, 42}

With a custom timeout:

iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitTimeoutDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitTimeoutDoc, fn -> Process.sleep(:infinity) end)
iex> Tasque.await(ref, 50)
{:error, :timeout}

child_spec(opts)

Returns a child specification for starting a Tasque instance under a supervisor.

This is invoked automatically when you use {Tasque, opts} tuple syntax in a supervision tree.

Options

  • :name (required) — the name used to register the queue. Can be an atom, a {:global, term} tuple, or a {:via, module, term} tuple. Internal processes derive their names from this value. For example, if name: MyApp.Queue is provided, the derived names are MyApp.Queue.TaskSupervisor and MyApp.Queue.Supervisor. For :global and :via names, Tasque derives matching companion names using the same naming strategy.

  • :max_concurrency — the maximum number of tasks that may execute simultaneously. Defaults to 10.

Examples

As part of a supervision tree:

children = [
  {Tasque, name: MyApp.Queue, max_concurrency: 5}
]

Or build the spec manually:

Tasque.child_spec(name: MyApp.Queue, max_concurrency: 5)
#=> %{id: MyApp.Queue, start: {Tasque.Supervisor, :start_link, [[name: MyApp.Queue, max_concurrency: 5]]}, type: :supervisor}

queue_task(queue, task, opts \\ [])

@spec queue_task(GenServer.server(), task(), keyword()) ::
  {:ok, ref :: reference()} | {:error, :invalid_task}

Enqueue a task for asynchronous execution. Returns immediately with {:ok, ref}, where ref is a unique reference for tracking the result.

The result will be delivered as a {:tasque_result, ref, outcome} message to the calling process. Use await/2 or a selective receive to consume it.

Task Formats

  • Zero-arity functionfn -> :work end
  • MFA tuple{Module, :function, [args]}

Options

  • :timeout — per-task timeout as a positive integer in milliseconds, or :infinity. The timeout starts when the task is enqueued, so it includes time spent waiting in the queue as well as time spent running. If it fires while queued, the task is dropped before dispatch; if it fires while running, the task process is killed. In either case, {:exit, :timeout} is delivered. If omitted, no timeout is applied.

Returns

  • {:ok, ref} on success.
  • {:error, :invalid_task} if the task is neither a zero-arity function nor an MFA tuple.

Examples

iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskDoc, fn -> 1 + 1 end)
iex> is_reference(ref)
true
iex> Tasque.await(ref)
{:ok, 2}

With an MFA tuple:

iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskMfaDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskMfaDoc, {String, :upcase, ["hello"]})
iex> Tasque.await(ref)
{:ok, "HELLO"}

With a per-task timeout:

iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskTimeoutDoc, max_concurrency: 5)
iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskTimeoutDoc, fn -> Process.sleep(:infinity) end, timeout: 50)
iex> receive do
...>   {:tasque_result, ^ref, outcome} -> outcome
...> after
...>   1_000 -> :never_arrived
...> end
{:exit, :timeout}

start_link(opts)

Starts a Tasque instance directly.

This is primarily useful for testing or starting a queue dynamically. In production applications, it is heavily recommended to start Tasque as part of your application's supervision tree instead of calling start_link/1 directly.

See child_spec/1 for supported options.