Futures and Async Operations
View SourceThis guide covers asynchronous request handling in Tinkex using the Future module. Learn how to start polling operations, await results, handle queue states, and implement custom observers for production workloads.
Overview
Tinkex follows an async-by-default design where long-running operations (sampling, training, checkpoint creation) return immediately with a request ID, then poll the server until completion. The Future module provides the client-side polling abstraction that:
- Returns
Task.t()so you can integrate with your concurrency model - Implements exponential backoff with configurable timeouts
- Emits telemetry events for queue state transitions
- Supports custom observers via the
QueueStateObserverbehaviour
This approach decouples request initiation from result retrieval, enabling concurrent operations without blocking your application.
How Futures Work
When you call a Tinkex API that creates a long-running request (e.g., SamplingClient.sample/4 or TrainingClient.forward_backward/3), the server returns a request ID immediately. The client then polls /api/v1/retrieve_future until the request completes, fails, or times out.
Server-side polling flow:
- Client calls
Future.poll/2with request ID and config - Returns a
Task.t()that repeatedly callsretrieve_future - Server responds with one of:
%FutureCompletedResponse{}→ poll returns{:ok, result}%FuturePendingResponse{}→ sleep with exponential backoff, retry%FutureFailedResponse{}→ categorize error, retry or fail%TryAgainResponse{}→ emit queue state telemetry, sleep, retry
- Client awaits the task to get the final result
Backoff strategy:
- Initial backoff: 1 second
- Max backoff: 30 seconds
- Formula:
min(2^iteration * 1000ms, 30000ms) TryAgainResponsemay provideretry_after_msto override
Starting a Poll Operation
Use Future.poll/2 to begin polling a server-side future. It returns a Task.t() immediately, allowing you to await the result later or run multiple polls concurrently.
alias Tinkex.Future
# Poll a request by ID
task = Future.poll("req-abc123", config: config)
# Poll from a response map
response = %{"request_id" => "req-abc123", "status" => "pending"}
task = Future.poll(response, config: config)
# With custom timeouts
task = Future.poll("req-abc123",
config: config,
timeout: 60_000, # overall polling deadline (60s)
http_timeout: 5_000 # per-request HTTP timeout (5s)
)Options:
:config(required) —Tinkex.Config.t()with API credentials:timeout— polling deadline in milliseconds (default::infinity):http_timeout— per-request timeout (default:config.timeout):queue_state_observer— module implementingQueueStateObserverbehaviour:telemetry_metadata— additional metadata for telemetry events:sleep_fun— custom sleep function for testing (default:&Process.sleep/1)
Awaiting a Single Future
Use Future.await/2 to block until a polling task completes. This wraps Task.await/2 but converts exits and timeouts into {:error, %Tinkex.Error{type: :api_timeout}} tuples instead of raising.
# Basic await
task = Future.poll("req-abc123", config: config)
case Future.await(task, 30_000) do
{:ok, result} ->
IO.inspect(result, label: "success")
{:error, %Tinkex.Error{type: :api_timeout} = error} ->
IO.puts("Timed out: #{error.message}")
{:error, %Tinkex.Error{} = error} ->
IO.puts("Failed: #{error.message}")
endTimeout semantics:
Future.poll/2's:timeout— controls how long the polling loop runsFuture.await/2's timeout — controls how long the caller waits on the task- These are independent: set both if you want strict deadlines
# Poll for max 60s, but caller will wait max 70s
task = Future.poll("req-abc123", config: config, timeout: 60_000)
Future.await(task, 70_000)Awaiting Multiple Futures
Use Future.await_many/2 to await multiple polling tasks in parallel. Results are returned in input order, with each entry being {:ok, result} or {:error, %Tinkex.Error{}}.
# Start multiple polls concurrently
tasks = [
Future.poll("req-1", config: config),
Future.poll("req-2", config: config),
Future.poll("req-3", config: config)
]
# Await all (blocks until all complete or time out)
results = Future.await_many(tasks, 30_000)
# Process results in order
Enum.zip(["req-1", "req-2", "req-3"], results)
|> Enum.each(fn {id, result} ->
case result do
{:ok, data} -> IO.puts("#{id} succeeded: #{inspect(data)}")
{:error, error} -> IO.puts("#{id} failed: #{error.message}")
end
end)Key behavior:
- All tasks are awaited independently (no short-circuit on first failure)
- Order is preserved:
results[i]corresponds totasks[i] - Exits or timeouts are converted to
{:error, %Tinkex.Error{type: :api_timeout}} - Non-raising: you always get a list of results, never an exception
Practical example: concurrent client creation
# Create multiple sampling clients in parallel
checkpoint_paths = [
"tinker://run-1/weights/0010",
"tinker://run-2/weights/0020",
"tinker://run-3/weights/0030"
]
{:ok, service} = Tinkex.ServiceClient.start_link(config: config)
tasks =
Enum.map(checkpoint_paths, fn path ->
Tinkex.SamplingClient.create_async(service, model_path: path)
end)
results = Task.await_many(tasks, 60_000)
clients =
results
|> Enum.zip(checkpoint_paths)
|> Enum.map(fn {result, path} ->
case result do
{:ok, pid} ->
IO.puts("✓ #{path} -> #{inspect(pid)}")
pid
{:error, error} ->
IO.puts("✗ #{path} failed: #{error.message}")
nil
end
end)
|> Enum.reject(&is_nil/1)Training client async creation:
# Create training clients asynchronously
{:ok, service} = Tinkex.ServiceClient.start_link(config: config)
# Async LoRA training client
task = Tinkex.ServiceClient.create_lora_training_client_async(
service,
"meta-llama/Llama-3.1-8B",
rank: 32
)
{:ok, training_client} = Task.await(task, 30_000)
# Async training client from checkpoint
task = Tinkex.ServiceClient.create_training_client_from_state_async(
service,
"tinker://run-123/weights/0001"
)
{:ok, restored_client} = Task.await(task, 60_000)
# Async training client from checkpoint (weights + optimizer)
task = Tinkex.ServiceClient.create_training_client_from_state_with_optimizer_async(
service,
"tinker://run-123/weights/0001"
)
{:ok, restored_with_opt} = Task.await(task, 60_000)See examples/async_client_creation.exs for a complete runnable example.
Queue State Telemetry
The polling loop emits [:tinkex, :queue, :state_change] telemetry events whenever the server sends a TryAgainResponse with a queue state transition. Metadata always includes %{queue_state: atom, request_id: binary}.
Queue states:
:active— request is actively processing:paused_rate_limit— server is rate-limiting the request:paused_capacity— server lacks capacity (GPU slots full):unknown— unrecognized state from server
Example telemetry handler:
:telemetry.attach(
"tinkex-queue-logger",
[:tinkex, :queue, :state_change],
fn _event, _measurements, metadata, _config ->
IO.puts("Queue state: #{metadata.queue_state} (request: #{metadata.request_id})")
end,
nil
)
# Now poll a request
task = Future.poll("req-abc123", config: config)
Future.await(task, 30_000)
# Telemetry handler will print state changes during pollingCustom metadata:
Add your own metadata via :telemetry_metadata to correlate events with your application context:
task = Future.poll("req-abc123",
config: config,
telemetry_metadata: %{
user_id: "user-123",
experiment_id: "exp-456"
}
)Telemetry events will include both your custom metadata and request_id.
QueueStateObserver Behaviour
For more control than telemetry, implement the Tinkex.QueueStateObserver behaviour to receive direct callbacks on queue state transitions. This is useful for backpressure tracking or adaptive request scheduling.
Behaviour definition:
@callback on_queue_state_change(QueueState.t()) :: any()Example observer:
defmodule MyApp.QueueObserver do
@behaviour Tinkex.QueueStateObserver
require Logger
@impl true
def on_queue_state_change(queue_state) do
case queue_state do
:paused_rate_limit ->
Logger.warning("Rate limited, backing off new requests")
MyApp.RequestScheduler.pause()
:paused_capacity ->
Logger.warning("Capacity exhausted, waiting for GPU slots")
MyApp.RequestScheduler.pause()
:active ->
Logger.info("Queue active again, resuming requests")
MyApp.RequestScheduler.resume()
_ ->
:ok
end
end
endUsing the observer:
task = Future.poll("req-abc123",
config: config,
queue_state_observer: MyApp.QueueObserver
)
Future.await(task, 30_000)The observer receives callbacks alongside telemetry events. If the callback crashes, a warning is logged but polling continues.
Internal usage:
SamplingClient and TrainingClient can accept a :queue_state_observer option and forward it to Future.poll/2. This allows downstream applications to react to queue state changes without modifying client code.
Timeout Handling
Tinkex provides two levels of timeout control for fine-grained deadline management:
Poll Timeout
The :timeout option in Future.poll/2 controls the overall polling deadline — how long the polling loop will run before giving up. When exceeded, the task returns {:error, %Tinkex.Error{type: :api_timeout}}.
# Poll for max 60 seconds
task = Future.poll("req-abc123", config: config, timeout: 60_000)
# If polling exceeds 60s, you get an error
case Future.await(task, :infinity) do
{:error, %Tinkex.Error{type: :api_timeout, message: msg}} ->
IO.puts("Poll timeout: #{msg}")
endDefault: :infinity (poll forever)
Await Timeout
The timeout argument to Future.await/2 controls how long the caller is willing to wait on the task process. This is independent from the polling timeout and useful for request prioritization.
task = Future.poll("req-abc123", config: config)
# Caller waits max 10 seconds
case Future.await(task, 10_000) do
{:error, %Tinkex.Error{type: :api_timeout}} ->
IO.puts("Caller gave up after 10s")
endWhen the await timeout is exceeded, the task is killed with :brutal_kill and the caller receives {:error, %Tinkex.Error{type: :api_timeout}}.
Default: :infinity (wait forever)
HTTP Timeout
The :http_timeout option controls the timeout for each individual HTTP request to retrieve_future. Defaults to config.timeout (typically 60 seconds).
task = Future.poll("req-abc123",
config: config,
http_timeout: 5_000 # Each HTTP call times out after 5s
)Combining Timeouts
For production workloads, set all three to enforce strict SLAs:
task = Future.poll("req-abc123",
config: config,
timeout: 120_000, # Poll for max 2 minutes
http_timeout: 10_000 # Each HTTP request times out after 10s
)
case Future.await(task, 150_000) do # Caller waits max 2.5 minutes
{:ok, result} -> handle_success(result)
{:error, error} -> handle_timeout(error)
endError Handling
Future.poll/2 categorizes errors and applies appropriate retry logic:
User Errors (fail immediately)
When the server returns %FutureFailedResponse{} with category: "user", the polling loop fails immediately without retrying. This indicates a permanent error like invalid input.
task = Future.poll("req-bad-input", config: config)
case Future.await(task, 30_000) do
{:error, %Tinkex.Error{type: :request_failed, category: :user, message: msg}} ->
IO.puts("User error: #{msg}")
# Don't retry — fix the input and resubmit
endServer Errors (retry until timeout)
When category is "server" or "provider", the polling loop retries with exponential backoff until the poll timeout is exceeded. The last error encountered is returned.
task = Future.poll("req-flaky", config: config, timeout: 30_000)
case Future.await(task, :infinity) do
{:error, %Tinkex.Error{type: :request_failed, category: :server, message: msg}} ->
IO.puts("Server error after retries: #{msg}")
# Consider exponential backoff before resubmitting
endNetwork Errors
HTTP-level errors (connection refused, DNS failure, etc.) are returned as {:error, %Tinkex.Error{type: :network}} and do not automatically retry. Handle these at the call site.
Task Exits
If the polling task crashes or exits unexpectedly, Future.await/2 converts the exit into {:error, %Tinkex.Error{type: :api_timeout}} with the exit reason in the data field.
case Future.await(task, 30_000) do
{:error, %Tinkex.Error{type: :api_timeout, data: %{exit_reason: reason}}} ->
IO.puts("Task crashed: #{Exception.format_exit(reason)}")
endBest Practices
1. Always Use Futures for Concurrent Operations
Don't block on each request sequentially — start multiple polls and await them in parallel:
# Bad: sequential awaits
results =
Enum.map(request_ids, fn id ->
task = Future.poll(id, config: config)
Future.await(task, 30_000)
end)
# Good: parallel awaits
tasks = Enum.map(request_ids, &Future.poll(&1, config: config))
results = Future.await_many(tasks, 30_000)2. Set Explicit Timeouts in Production
Default :infinity timeouts are fine for development but can cause resource leaks in production. Always set explicit deadlines:
task = Future.poll(request_id,
config: config,
timeout: Application.get_env(:my_app, :poll_timeout, 120_000),
http_timeout: Application.get_env(:my_app, :http_timeout, 10_000)
)
Future.await(task, Application.get_env(:my_app, :await_timeout, 150_000))3. Monitor Queue States for Capacity Planning
Use telemetry or custom observers to track :paused_capacity events. Frequent capacity pauses indicate you need more GPU slots or should reduce request rate:
:telemetry.attach(
"capacity-alerter",
[:tinkex, :queue, :state_change],
fn _event, _measurements, %{queue_state: :paused_capacity}, _config ->
MyApp.Metrics.increment("tinkex.capacity_pause")
# Alert if pauses exceed threshold
end,
nil
)4. Handle Both Success and Failure
Always pattern match on both {:ok, result} and {:error, error} — network failures and server errors are common in distributed systems:
case Future.await(task, 30_000) do
{:ok, result} ->
handle_success(result)
{:error, %Tinkex.Error{type: :request_failed, category: :user} = error} ->
handle_user_error(error)
{:error, %Tinkex.Error{} = error} ->
handle_transient_error(error)
end5. Use Task Supervision for Long-Lived Polls
If you need to poll for minutes or hours, supervise the task to ensure it doesn't leak on process crashes:
{:ok, task_supervisor} = Task.Supervisor.start_link()
task =
Task.Supervisor.async_nolink(task_supervisor, fn ->
poll_task = Future.poll(request_id, config: config, timeout: 3_600_000)
Future.await(poll_task, :infinity)
end)
# Task is supervised — if the parent crashes, cleanup happens automatically6. Test with Custom Sleep Functions
Inject a no-op sleep function in tests to avoid actual delays:
# In test
sleep_fun = fn _ms -> :ok end
task = Future.poll(request_id, config: config, sleep_fun: sleep_fun)
# Polling completes instantly without sleepingComplete Example
Here's a full example demonstrating futures, async operations, queue state monitoring, and error handling:
defmodule MyApp.AsyncSampling do
alias Tinkex.{ServiceClient, SamplingClient, Future, Config}
require Logger
defmodule QueueMonitor do
@behaviour Tinkex.QueueStateObserver
@impl true
def on_queue_state_change(queue_state) do
Logger.metadata(tinker_queue_state: queue_state)
case queue_state do
:paused_rate_limit ->
Logger.warning("Rate limited — consider reducing request rate")
:paused_capacity ->
Logger.warning("Capacity exhausted — GPU slots full")
:active ->
Logger.info("Queue active")
_ ->
:ok
end
end
end
def run_concurrent_samples(prompts, opts \\ []) do
config = Config.new(api_key: System.fetch_env!("TINKER_API_KEY"))
{:ok, service} = ServiceClient.start_link(config: config)
{:ok, sampler} =
ServiceClient.create_sampling_client(service,
base_model: "meta-llama/Llama-3.1-8B"
)
# Attach telemetry
:telemetry.attach(
"queue-logger",
[:tinkex, :queue, :state_change],
&log_queue_event/4,
nil
)
try do
# Start all samples concurrently
tasks =
Enum.map(prompts, fn prompt ->
{:ok, model_input} =
Tinkex.Types.ModelInput.from_text(prompt,
model_name: "meta-llama/Llama-3.1-8B"
)
params = %Tinkex.Types.SamplingParams{max_tokens: 64}
# Returns Task.t() immediately
{:ok, task} =
SamplingClient.sample(sampler, model_input, params,
num_samples: 1,
queue_state_observer: QueueMonitor,
timeout: Keyword.get(opts, :timeout, 120_000),
await_timeout: Keyword.get(opts, :await_timeout, 150_000)
)
task
end)
# Await all in parallel
results = Task.await_many(tasks, Keyword.get(opts, :await_timeout, 150_000))
# Process results
Enum.zip(prompts, results)
|> Enum.map(fn {prompt, result} ->
case result do
{:ok, response} ->
text = hd(response.sequences).tokens
Logger.info("Prompt: #{prompt}\nResponse: #{text}")
{:ok, text}
{:error, error} ->
Logger.error("Failed: #{prompt}\nError: #{error.message}")
{:error, error}
end
end)
after
:telemetry.detach("queue-logger")
GenServer.stop(sampler)
GenServer.stop(service)
end
end
defp log_queue_event(_event, _measurements, metadata, _config) do
Logger.info("Queue state changed",
request_id: metadata.request_id,
queue_state: metadata.queue_state
)
end
end
# Run it
prompts = [
"Explain async programming in Elixir",
"What are the benefits of OTP?",
"How does Task.async work?"
]
MyApp.AsyncSampling.run_concurrent_samples(prompts,
timeout: 60_000,
await_timeout: 90_000
)What to Read Next
- API overview:
docs/guides/api_reference.md - Training loop patterns:
docs/guides/training_loop.md - Troubleshooting timeout issues:
docs/guides/troubleshooting.md - Getting started with the SDK:
docs/guides/getting_started.md