Telemetry
View SourceThe Tinkex telemetry system provides session-scoped event capture and reporting to the Tinker backend. It automatically tracks HTTP requests, queue state changes, exceptions, and custom application events with built-in batching, retries, and graceful shutdown semantics.
ServiceClient, TrainingClient, and SamplingClient wrap public entrypoints with the telemetry capture layer, so unhandled exceptions are logged automatically and fatal paths emit session-end events without additional instrumentation.
Overview
Tinkex telemetry consists of two main components:
- Telemetry.Reporter - A GenServer that batches events and ships them to
/api/v1/telemetry - Telemetry.Capture - Macros for capturing exceptions in synchronous and async code
Key features:
- Automatic session lifecycle tracking (SESSION_START, SESSION_END)
- Batched event delivery (up to 100 events per request)
- Exponential backoff retry logic (up to 3 retries)
- Periodic and threshold-based flushing
- Wait-until-drained semantics for graceful shutdown
- Environment-based enable/disable (TINKER_TELEMETRY)
Event Types
The telemetry reporter captures several event types:
- SESSION_START - Emitted when the reporter starts
- SESSION_END - Emitted during graceful shutdown
- GENERIC_EVENT - Custom application events with arbitrary data
- UNHANDLED_EXCEPTION - Captured exceptions with stacktraces
- HTTP telemetry - Automatic capture of
[:tinkex, :http, :request, :*]events - Queue telemetry - Automatic capture of
[:tinkex, :queue, :state_change]events Future telemetry - Automatic capture of
[:tinkex, :future, :timeout | :api_error | :connection_error | :request_failed | :validation_error]events withrequest_id,request_type,status, and elapsed time metadata
Setting Up the Reporter
The reporter is typically started automatically by ServiceClient, but you can start it manually:
config = Tinkex.Config.new(api_key: System.fetch_env!("TINKER_API_KEY"))
{:ok, reporter} =
Tinkex.Telemetry.Reporter.start_link(
config: config,
session_id: "session-123",
flush_interval_ms: 10_000, # Flush every 10 seconds
flush_threshold: 100, # Flush when queue reaches 100 events
http_timeout_ms: 5_000, # HTTP request timeout
max_retries: 3, # Retry failed sends up to 3 times
retry_base_delay_ms: 1_000 # Base delay for exponential backoff
)Configuration Options
| Option | Default | Description |
|---|---|---|
:config | required | Tinkex.Config.t() with API credentials |
:session_id | required | Tinker session identifier |
:handler_id | auto-generated | Telemetry handler ID |
:events | HTTP + queue | List of telemetry events to capture |
:attach_events? | true | Whether to attach telemetry handlers |
:flush_interval_ms | 10_000 | Periodic flush interval (10 seconds) |
:flush_threshold | 100 | Flush when queue reaches this size |
:flush_timeout_ms | 30_000 | Max wait time for drain operations |
:http_timeout_ms | 5_000 | HTTP request timeout |
:max_retries | 3 | Max retries per batch |
:retry_base_delay_ms | 1_000 | Base delay for exponential backoff |
:max_queue_size | 10_000 | Drop events beyond this size |
:max_batch_size | 100 | Events per POST request |
:enabled | from env | Override TINKER_TELEMETRY env flag |
Logging Custom Events
Use Reporter.log/4 to emit generic application events:
# Basic event
Reporter.log(reporter, "user.action", %{"button" => "submit"})
# With severity level
Reporter.log(reporter, "cache.miss", %{"key" => "user:123"}, :warning)
# Multiple data fields
Reporter.log(reporter, "request.processed", %{
"duration_ms" => 250,
"cache_hit" => true,
"user_id" => 42
}, :info)Severity Levels
Supported severity levels (atoms or strings):
:debug/"DEBUG":info/"INFO":warning/"WARNING":error/"ERROR":critical/"CRITICAL"
Capturing Exceptions
Non-Fatal Exceptions
Use log_exception/3 to capture exceptions without blocking:
try do
risky_operation()
rescue
exception ->
Reporter.log_exception(reporter, exception, :error)
# Exception is logged, async flush triggered
reraise exception, __STACKTRACE__
endFatal Exceptions
Use log_fatal_exception/3 for fatal errors that require immediate flushing:
try do
critical_operation()
rescue
exception ->
Reporter.log_fatal_exception(reporter, exception, :critical)
# Emits SESSION_END, flushes synchronously, waits until drained
reraise exception, __STACKTRACE__
endFatal exception behavior:
- Logs the exception as UNHANDLED_EXCEPTION
- Enqueues a SESSION_END event
- Flushes all pending events synchronously
- Waits until all batches are sent (up to
flush_timeout_ms) - Logs the session ID for debugging
Using Capture Macros
The Tinkex.Telemetry.Capture module provides macros for automatic exception capture.
Synchronous Code
Wrap synchronous code with capture_exceptions/2:
import Tinkex.Telemetry.Capture
capture_exceptions reporter: reporter do
perform_risky_operation()
end
# With options
capture_exceptions reporter: reporter, fatal?: true, severity: :critical do
critical_operation()
endThe macro automatically:
- Catches all exceptions and errors
- Logs them to the reporter
- Re-raises the original exception
Async Tasks
Wrap async operations with async_capture/2:
import Tinkex.Telemetry.Capture
task = async_capture reporter: reporter do
expensive_computation()
end
result = Task.await(task, 10_000)This wraps Task.async/1 and captures any exceptions that occur in the task process.
Options for both macros:
:reporter- Reporter pid ornil(no-op whennil):fatal?- Whentrue, callslog_fatal_exception/3(default:false):severity- Severity level for the exception (default::error)
Automatic Telemetry Events
The reporter automatically captures standard :telemetry events when :attach_events? is true (default).
HTTP Request Events
All HTTP requests emit three events:
# Start of request
[:tinkex, :http, :request, :start]
# measurements: %{system_time: integer()}
# metadata: %{session_id: string(), method: atom(), path: string()}
# Successful completion
[:tinkex, :http, :request, :stop]
# measurements: %{duration: integer()}
# metadata: %{session_id: string(), status: integer()}
# Exception during request
[:tinkex, :http, :request, :exception]
# measurements: %{duration: integer()}
# metadata: %{session_id: string(), kind: atom(), reason: term()}Queue State Changes
Queue state transitions emit:
[:tinkex, :queue, :state_change]
# measurements: %{queue_depth: integer()}
# metadata: %{session_id: string(), from: atom(), to: atom()}Session-Scoped Filtering
Only events with matching :session_id metadata are captured and forwarded to the backend. This prevents cross-session contamination when multiple sessions run concurrently.
Batching and Flush Semantics
The reporter uses intelligent batching to optimize network usage:
Automatic Flushing
Events are flushed automatically when:
- Periodic interval - Every
flush_interval_ms(default: 10 seconds) - Threshold reached - When queue size reaches
flush_threshold(default: 100 events) - Exception logged - Non-fatal exceptions trigger an async flush
- Fatal exception - Synchronous flush with wait-until-drained
Manual Flushing
Force a flush with Reporter.flush/2:
# Async flush (returns immediately)
Reporter.flush(reporter)
# Sync flush (waits for batches to be sent)
Reporter.flush(reporter, sync?: true)
# Sync flush with drain wait (waits until all events are acknowledged)
Reporter.flush(reporter, sync?: true, wait_drained?: true)Batching Behavior
- Events are grouped into batches of up to
max_batch_size(default: 100) - Multiple batches are sent sequentially
- Each batch includes session metadata (session_id, platform, SDK version)
Example batch structure:
%{
session_id: "session-abc123",
platform: "unix/linux",
sdk_version: "0.1.2",
events: [
%{
event: "GENERIC_EVENT",
event_id: "a1b2c3...",
event_session_index: 5,
severity: "INFO",
timestamp: "2025-11-26T10:30:45Z",
event_name: "user.action",
event_data: %{"button" => "submit"}
},
# ... up to 100 events
]
}Retry Behavior
Failed sends are retried with exponential backoff:
- First attempt - Immediate send
- Retry 1 - Wait ~1000ms (base_delay)
- Retry 2 - Wait ~2000ms (base_delay * 2)
- Retry 3 - Wait ~4000ms (base_delay * 4)
- Give up - Log warning and drop batch
The backoff delay includes 10% random jitter to prevent thundering herd effects.
Example:
{:ok, reporter} =
Tinkex.Telemetry.Reporter.start_link(
config: config,
session_id: session_id,
max_retries: 5, # Try up to 5 times
retry_base_delay_ms: 500 # Start with 500ms delay
)Wait-Until-Drained
For graceful shutdown, use wait_until_drained/2 to block until all queued events are sent:
# Queue some events
Reporter.log(reporter, "shutdown.starting", %{})
# Wait up to 30 seconds for all events to be sent
drained = Reporter.wait_until_drained(reporter, 30_000)
case drained do
true -> IO.puts("All telemetry sent successfully")
false -> IO.puts("Timeout waiting for telemetry drain")
endInternal counters:
push_counter- Incremented when events are enqueuedflush_counter- Incremented when events are sent- Drained when
flush_counter >= push_counter
This is automatically used during graceful shutdown with Reporter.stop/2.
Graceful Shutdown
Stop the reporter gracefully with Reporter.stop/2:
Reporter.stop(reporter, 5_000)This performs the following steps:
- Enqueues a SESSION_END event (if not already emitted)
- Flushes all pending events synchronously
- Waits until all batches are sent (up to timeout)
- Stops the GenServer
The reporter also implements terminate/2, so if the process is terminated normally, it will perform the same graceful shutdown sequence.
Disabling Telemetry
Telemetry can be disabled via the TINKER_TELEMETRY environment variable:
# Disable telemetry
export TINKER_TELEMETRY=0
# or
export TINKER_TELEMETRY=false
# or
export TINKER_TELEMETRY=no
# Enable telemetry (default)
export TINKER_TELEMETRY=1
# or
export TINKER_TELEMETRY=true
# or
export TINKER_TELEMETRY=yes
When disabled:
Reporter.start_link/1returns:ignore- All reporter functions become no-ops (return
falseor:ok) - No network requests are made
- No events are logged
You can also override the environment variable per-reporter:
# Force enable, even if TINKER_TELEMETRY=0
{:ok, reporter} =
Tinkex.Telemetry.Reporter.start_link(
config: config,
session_id: session_id,
enabled: true
)
# Force disable, even if TINKER_TELEMETRY=1
reporter =
case Tinkex.Telemetry.Reporter.start_link(
config: config,
session_id: session_id,
enabled: false
) do
{:ok, pid} -> pid
:ignore -> nil
endComplete Example
Here's a complete example showing all major features:
import Tinkex.Telemetry.Capture
{:ok, _} = Application.ensure_all_started(:tinkex)
config = Tinkex.Config.new(
api_key: System.fetch_env!("TINKER_API_KEY"),
base_url: "https://tinker.thinkingmachines.dev/services/tinker-prod"
)
# Start service client (includes telemetry reporter)
{:ok, service} = Tinkex.ServiceClient.start_link(config: config)
# Get the reporter
{:ok, reporter} =
case Tinkex.ServiceClient.telemetry_reporter(service) do
{:ok, pid} -> {:ok, pid}
{:error, :disabled} ->
IO.puts("Telemetry disabled, exiting")
System.halt(0)
end
# Log application start
Reporter.log(reporter, "app.started", %{
"environment" => "production",
"version" => "1.0.0"
})
# Perform sampling with automatic HTTP telemetry
{:ok, sampler} =
Tinkex.ServiceClient.create_sampling_client(service,
base_model: "meta-llama/Llama-3.1-8B"
)
{:ok, prompt} =
Tinkex.Types.ModelInput.from_text("Hello there",
model_name: "meta-llama/Llama-3.1-8B"
)
params = %Tinkex.Types.SamplingParams{max_tokens: 32, temperature: 0.7}
# Wrap in exception capture
capture_exceptions reporter: reporter do
{:ok, task} =
Tinkex.SamplingClient.sample(sampler, prompt, params, num_samples: 1)
{:ok, response} = Task.await(task, 10_000)
Reporter.log(reporter, "sampling.complete", %{
"tokens" => length(List.first(response.sequences, %{tokens: []}).tokens)
})
end
# Async task with exception capture
task = async_capture reporter: reporter do
Process.sleep(1_000)
:expensive_result
end
result = Task.await(task)
# Log completion and shutdown gracefully
Reporter.log(reporter, "app.stopping", %{"result" => inspect(result)})
Reporter.stop(reporter, 10_000) # Wait up to 10s for drain
IO.puts("All telemetry sent, goodbye!")Debugging
Enable Logger
Attach a logger to see telemetry events in real-time:
logger_id = Tinkex.Telemetry.attach_logger(level: :info)
# ... run your code ...
Tinkex.Telemetry.detach(logger_id)Check Reporter State
Inspect the reporter state for debugging:
state = :sys.get_state(reporter)
IO.inspect(state.queue_size, label: "Queue size")
IO.inspect(state.push_counter, label: "Pushed events")
IO.inspect(state.flush_counter, label: "Flushed events")
IO.inspect(state.session_ended?, label: "Session ended")Common Issues
Queue full warnings:
Telemetry queue full (10000), dropping eventSolution: Increase max_queue_size or reduce flush_threshold for more frequent flushing.
Retry failures:
Telemetry send failed after 3 retries: {:error, :timeout}Solution: Increase http_timeout_ms or max_retries, check network connectivity.
Drain timeout:
false = Reporter.wait_until_drained(reporter, 5_000)Solution: Increase timeout or check for network issues preventing batch delivery.
Best Practices
- Use ServiceClient's built-in reporter - It's automatically scoped to the session
- Capture exceptions at boundaries - Wrap top-level operations with
capture_exceptions - Use severity levels appropriately - Reserve
:criticalfor fatal errors - Flush before shutdown - Always call
Reporter.stop/2orwait_until_drained/2 - Monitor queue size - Watch for "queue full" warnings in production
- Test with telemetry disabled - Ensure your app works with
TINKER_TELEMETRY=0 - Structure event data consistently - Use the same keys across similar events
What to Read Next
- API overview:
docs/guides/api_reference.md - Getting started:
docs/guides/getting_started.md - Troubleshooting tips:
docs/guides/troubleshooting.md