Forge.Telemetry (Forge v0.1.1)
View SourceTelemetry instrumentation for Forge pipelines.
This module provides helper functions for emitting structured telemetry events
throughout pipeline execution. All events follow the naming convention:
[:forge, :component, :action]
Event Categories
Pipeline Events
[:forge, :pipeline, :start]- Pipeline execution started[:forge, :pipeline, :stop]- Pipeline execution completed[:forge, :pipeline, :exception]- Pipeline execution failed with exception
Stage Events
[:forge, :stage, :start]- Stage processing started for a sample[:forge, :stage, :stop]- Stage processing completed (success or error)[:forge, :stage, :retry]- Stage retry attempt
Measurement Events
[:forge, :measurement, :start]- Measurement computation started[:forge, :measurement, :stop]- Measurement computation completed[:forge, :measurement, :batch_complete]- Batch measurement completed
Storage Events
[:forge, :storage, :sample_write]- Sample written to storage[:forge, :storage, :artifact_upload]- Artifact uploaded[:forge, :storage, :artifact_download]- Artifact downloaded
DLQ Events
[:forge, :dlq, :enqueue]- Sample moved to dead-letter queue
Usage
# Emit events directly
Telemetry.pipeline_start(pipeline_id, run_id, :my_pipeline)
# Or use the span helper for automatic timing
Telemetry.span([:forge, :stage], %{stage: "MyStage"}, fn ->
# ... processing logic
{:ok, result}
end)Attaching Handlers
:telemetry.attach(
"my-handler",
[:forge, :pipeline, :start],
fn event, measurements, metadata, config ->
# Handle event
end,
nil
)
Summary
Functions
Attaches telemetry handlers for logging and metrics aggregation.
Emits a DLQ enqueue event.
Emits a batch measurement complete event.
Emits a measurement start event.
Emits a measurement stop event.
Emits a pipeline exception event.
Emits a pipeline start event.
Emits a pipeline stop event.
Executes a function and emits telemetry span events.
Emits a stage retry event.
Emits a stage start event.
Emits a stage stop event.
Emits a storage artifact download event.
Emits a storage artifact upload event.
Emits a storage sample write event.
Functions
Attaches telemetry handlers for logging and metrics aggregation.
This function should be called once during application startup to attach handlers that will process telemetry events.
Options
:reporters- List of reporter modules to attach (default:[])
Emits a DLQ enqueue event.
Measurements
(empty map)
Metadata
:sample_id- ID of the sample being moved to DLQ:stage- Stage where failure occurred:error- Error reason or message
Emits a batch measurement complete event.
Measurements
:duration- Duration in native time units:batch_size- Number of samples in batch
Metadata
:measurement_key- Measurement key/name
Emits a measurement start event.
Measurements
:system_time- System time when measurement started (native units)
Metadata
:sample_id- ID of the sample being measured:measurement_key- Measurement key/name:version- Measurement version
Emits a measurement stop event.
Measurements
:duration- Duration in native time units
Metadata
:sample_id- ID of the sample being measured:measurement_key- Measurement key/name:outcome-:computedor:cached
Emits a pipeline exception event.
Measurements
:duration- Duration before exception (native time units)
Metadata
:pipeline_id- Unique identifier for the pipeline definition:run_id- Unique identifier for this execution run:exception- Exception module (e.g.,RuntimeError)
Emits a pipeline start event.
Measurements
:system_time- System time when pipeline started (native units)
Metadata
:pipeline_id- Unique identifier for the pipeline definition:run_id- Unique identifier for this execution run:name- Pipeline name (atom or string)
Emits a pipeline stop event.
Measurements
:duration- Duration in native time units:samples_processed- Number of samples successfully processed
Metadata
:pipeline_id- Unique identifier for the pipeline definition:run_id- Unique identifier for this execution run:outcome-:completedor:failed
Executes a function and emits telemetry span events.
This is a convenience function for wrapping operations with automatic start/stop timing. It emits:
event ++ [:start]before executionevent ++ [:stop]after successful executionevent ++ [:exception]if an exception is raised
Parameters
event- Base event name (e.g.,[:forge, :stage])metadata- Metadata to include in all eventsfun- Function to execute
Returns
The return value of the function.
Examples
result = Telemetry.span([:forge, :stage], %{stage: "MyStage"}, fn ->
# ... processing logic
{:ok, processed_sample}
end)
Emits a stage retry event.
Measurements
:attempt- Attempt number (1-based):delay_ms- Delay before next retry in milliseconds
Metadata
:sample_id- ID of the sample being processed:stage- Stage module name or identifier:error- Error reason or message
Emits a stage start event.
Measurements
:system_time- System time when stage started (native units)
Metadata
:sample_id- ID of the sample being processed:stage- Stage module name or identifier:pipeline_id- Pipeline identifier (optional):run_id- Run identifier (optional)
Emits a stage stop event.
Measurements
:duration- Duration in native time units
Metadata
:sample_id- ID of the sample being processed:stage- Stage module name or identifier:outcome-:success,:error, or:skip:error_type- Error classification (optional, only for errors)
Emits a storage artifact download event.
Measurements
:duration- Duration in native time units:size_bytes- Size of artifact in bytes
Metadata
:artifact_key- Artifact key/path
Emits a storage artifact upload event.
Measurements
:duration- Duration in native time units:size_bytes- Size of artifact in bytes
Metadata
:artifact_key- Artifact key/path:deduplication- Whether deduplication was used (boolean)
Emits a storage sample write event.
Measurements
:duration- Duration in native time units:size_bytes- Size of data written in bytes
Metadata
:sample_id- ID of the sample being stored:storage_backend- Storage backend identifier (e.g.,:postgres,:ets)