Tasque.Queue (Tasque v1.0.0)

View Source

Internal GenServer that manages the FIFO task queue, concurrency gating, and dispatch loop for a single Tasque instance.

Internal module

This module is not part of the public API. Use the functions in Tasque to interact with the queue. The implementation details documented here are provided for contributors and the curious.

State

The GenServer maintains the following fields:

  • :queue — an Erlang :queue of task entries waiting to be dispatched
  • :pending_refs — a map of internal task_ref => entry for running tasks
  • :queued_refs — a map of caller_ref => pid for waiting tasks
  • :cancelled_refs — a map of caller_ref => true for tombstoned tasks
  • :caller_to_task_ref — a map of caller_ref => task_ref for fast timeout lookups
  • :max_concurrency — the upper bound on map_size(pending_refs)
  • :task_supervisor — the registered name of the companion Task.Supervisor

Dispatch Algorithm

The private dispatch/1 function is called after every state-changing event (enqueue, completion, crash, timeout). It greedily fills available concurrency slots:

  1. If map_size(pending_refs) >= max_concurrency, return immediately
  2. If the queue is empty, return immediately
  3. Otherwise, dequeue the next entry, start it via Task.Supervisor.async_nolink/2, record it in :pending_refs, and recurse to fill any remaining slots

Message Protocol

Tasks are started with async_nolink, so results arrive as handle_info messages:

MessageMeaningAction
{task_ref, result}Successful completionDeliver {:ok, result}, demonitor, free slot
{:DOWN, task_ref, :process, _, reason}Task crashedDeliver {:exit, reason}, free slot
{:tasque_timeout, caller_ref}Per-task timeout firedIf queued, tombstone and skip during dispatch; if running, terminate task; in both cases deliver {:exit, :timeout}

A catch-all handle_info/2 clause silently discards unexpected messages (e.g., a late :DOWN arriving after a timeout has already handled the task).

Timeout Implementation

When a task with a :timeout option is enqueued, the queue schedules a {:tasque_timeout, caller_ref} message to itself via Process.send_after/3. If the task is still in the queue when the timeout fires, it is tombstoned and skipped during dispatch. If it is currently running, the queue attempts to terminate it and deliver a timeout result. If the task has already completed by the time the timeout message is handled, the normal completion or crash message wins. If the task completes before the timer fires, the timer is cancelled with Process.cancel_timer/1. Late timeout messages for already-completed tasks are harmless no-ops.

Summary

Functions

Returns a specification to start this module under a supervisor.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.