Tinkex.Future (Tinkex v0.3.4)

View Source

Client-side future abstraction responsible for polling server-side futures.

poll/2 returns Task.t({:ok, map()} | {:error, Tinkex.Error.t()}). Callers can Task.await/2 or supervise the task to integrate with their concurrency model.

Queue state telemetry

The polling loop emits [:tinkex, :queue, :state_change] events whenever the queue state transitions (e.g., :active -> :paused_rate_limit). Telemetry metadata always includes %{queue_state: atom, request_id: binary} so observers can react:

:telemetry.attach(
  "tinkex-queue-state-logger",
  [:tinkex, :queue, :state_change],
  fn _event, _measurements, %{queue_state: queue_state}, _config ->
    Logger.info("Queue state changed: #{inspect(queue_state)}")
  end,
  nil
)

Provide opts[:queue_state_observer] with a module that implements Tinkex.QueueStateObserver to receive direct callbacks when transitions occur. TrainingClient/SamplingClient will implement this behaviour downstream.

Summary

Functions

Await the result of a polling task.

Await multiple polling tasks, returning the underlying results in input order.

Begin polling a future request.

Types

poll_backoff_policy()

@type poll_backoff_policy() ::
  :none
  | {:exponential, pos_integer(), pos_integer()}
  | (non_neg_integer() -> non_neg_integer())

poll_result()

@type poll_result() :: {:ok, map()} | {:error, Tinkex.Error.t()}

poll_task()

@type poll_task() :: Task.t()

sleep_fun()

@type sleep_fun() :: (non_neg_integer() -> any())

Functions

await(task, timeout \\ :infinity)

@spec await(poll_task(), timeout()) :: poll_result()

Await the result of a polling task.

Wraps Task.await/2, converting exits or timeouts into {:error, %Tinkex.Error{}} tuples with type :api_timeout. The timeout here controls how long the caller is willing to wait on the task process and is independent from the polling timeout configured in poll/2.

await_many(tasks, timeout \\ :infinity)

@spec await_many([poll_task()], timeout()) :: [poll_result()]

Await multiple polling tasks, returning the underlying results in input order.

Each entry mirrors the Task's return value ({:ok, result} or {:error, %Tinkex.Error{}}). When a task exits or times out we convert it to {:error, %Tinkex.Error{type: :api_timeout}} rather than raising.

poll(request_or_payload, opts \\ [])

@spec poll(
  String.t()
  | %{request_id: String.t()}
  | %{required(String.t()) => String.t()},
  keyword()
) :: poll_task()

Begin polling a future request.

Accepts either the request id string or a map that contains "request_id" / :request_id. Per-request HTTP timeouts can be supplied via :http_timeout, while :timeout controls the overall polling deadline (:infinity by default). Tests can inject a custom :sleep_fun (defaults to &Process.sleep/1).

Use :poll_backoff to control backoff for 408/5xx polling retries. Supported values: :exponential, {:exponential, initial_ms, max_ms}, or a 1-arity function that returns a non-negative delay in milliseconds. Defaults to no backoff unless configured.