Forge.Telemetry (Forge v0.1.1)

View Source

Telemetry instrumentation for Forge pipelines.

This module provides helper functions for emitting structured telemetry events throughout pipeline execution. All events follow the naming convention: [:forge, :component, :action]

Event Categories

Pipeline Events

  • [:forge, :pipeline, :start] - Pipeline execution started
  • [:forge, :pipeline, :stop] - Pipeline execution completed
  • [:forge, :pipeline, :exception] - Pipeline execution failed with exception

Stage Events

  • [:forge, :stage, :start] - Stage processing started for a sample
  • [:forge, :stage, :stop] - Stage processing completed (success or error)
  • [:forge, :stage, :retry] - Stage retry attempt

Measurement Events

  • [:forge, :measurement, :start] - Measurement computation started
  • [:forge, :measurement, :stop] - Measurement computation completed
  • [:forge, :measurement, :batch_complete] - Batch measurement completed

Storage Events

  • [:forge, :storage, :sample_write] - Sample written to storage
  • [:forge, :storage, :artifact_upload] - Artifact uploaded
  • [:forge, :storage, :artifact_download] - Artifact downloaded

DLQ Events

  • [:forge, :dlq, :enqueue] - Sample moved to dead-letter queue

Usage

# Emit events directly
Telemetry.pipeline_start(pipeline_id, run_id, :my_pipeline)

# Or use the span helper for automatic timing
Telemetry.span([:forge, :stage], %{stage: "MyStage"}, fn ->
  # ... processing logic
  {:ok, result}
end)

Attaching Handlers

:telemetry.attach(
  "my-handler",
  [:forge, :pipeline, :start],
  fn event, measurements, metadata, config ->
    # Handle event
  end,
  nil
)

Summary

Functions

attach_handlers(opts \\ [])

Attaches telemetry handlers for logging and metrics aggregation.

This function should be called once during application startup to attach handlers that will process telemetry events.

Options

  • :reporters - List of reporter modules to attach (default: [])

dlq_enqueue(sample_id, stage, error)

Emits a DLQ enqueue event.

Measurements

(empty map)

Metadata

  • :sample_id - ID of the sample being moved to DLQ
  • :stage - Stage where failure occurred
  • :error - Error reason or message

measurement_batch_complete(measurement_key, duration, batch_size)

Emits a batch measurement complete event.

Measurements

  • :duration - Duration in native time units
  • :batch_size - Number of samples in batch

Metadata

  • :measurement_key - Measurement key/name

measurement_start(sample_id, measurement_key, version)

Emits a measurement start event.

Measurements

  • :system_time - System time when measurement started (native units)

Metadata

  • :sample_id - ID of the sample being measured
  • :measurement_key - Measurement key/name
  • :version - Measurement version

measurement_stop(sample_id, measurement_key, duration, outcome)

Emits a measurement stop event.

Measurements

  • :duration - Duration in native time units

Metadata

  • :sample_id - ID of the sample being measured
  • :measurement_key - Measurement key/name
  • :outcome - :computed or :cached

pipeline_exception(pipeline_id, run_id, duration, exception)

Emits a pipeline exception event.

Measurements

  • :duration - Duration before exception (native time units)

Metadata

  • :pipeline_id - Unique identifier for the pipeline definition
  • :run_id - Unique identifier for this execution run
  • :exception - Exception module (e.g., RuntimeError)

pipeline_start(pipeline_id, run_id, name)

Emits a pipeline start event.

Measurements

  • :system_time - System time when pipeline started (native units)

Metadata

  • :pipeline_id - Unique identifier for the pipeline definition
  • :run_id - Unique identifier for this execution run
  • :name - Pipeline name (atom or string)

pipeline_stop(pipeline_id, run_id, duration, outcome, samples_processed \\ 0)

Emits a pipeline stop event.

Measurements

  • :duration - Duration in native time units
  • :samples_processed - Number of samples successfully processed

Metadata

  • :pipeline_id - Unique identifier for the pipeline definition
  • :run_id - Unique identifier for this execution run
  • :outcome - :completed or :failed

span(event, metadata, fun)

Executes a function and emits telemetry span events.

This is a convenience function for wrapping operations with automatic start/stop timing. It emits:

  • event ++ [:start] before execution
  • event ++ [:stop] after successful execution
  • event ++ [:exception] if an exception is raised

Parameters

  • event - Base event name (e.g., [:forge, :stage])
  • metadata - Metadata to include in all events
  • fun - Function to execute

Returns

The return value of the function.

Examples

result = Telemetry.span([:forge, :stage], %{stage: "MyStage"}, fn ->
  # ... processing logic
  {:ok, processed_sample}
end)

stage_retry(sample_id, stage, attempt, delay_ms, error)

Emits a stage retry event.

Measurements

  • :attempt - Attempt number (1-based)
  • :delay_ms - Delay before next retry in milliseconds

Metadata

  • :sample_id - ID of the sample being processed
  • :stage - Stage module name or identifier
  • :error - Error reason or message

stage_start(sample_id, stage, pipeline_id \\ nil, run_id \\ nil)

Emits a stage start event.

Measurements

  • :system_time - System time when stage started (native units)

Metadata

  • :sample_id - ID of the sample being processed
  • :stage - Stage module name or identifier
  • :pipeline_id - Pipeline identifier (optional)
  • :run_id - Run identifier (optional)

stage_stop(sample_id, stage, duration, outcome, error_type \\ nil)

Emits a stage stop event.

Measurements

  • :duration - Duration in native time units

Metadata

  • :sample_id - ID of the sample being processed
  • :stage - Stage module name or identifier
  • :outcome - :success, :error, or :skip
  • :error_type - Error classification (optional, only for errors)

storage_artifact_download(artifact_key, duration, size_bytes)

Emits a storage artifact download event.

Measurements

  • :duration - Duration in native time units
  • :size_bytes - Size of artifact in bytes

Metadata

  • :artifact_key - Artifact key/path

storage_artifact_upload(artifact_key, duration, size_bytes, deduplication)

Emits a storage artifact upload event.

Measurements

  • :duration - Duration in native time units
  • :size_bytes - Size of artifact in bytes

Metadata

  • :artifact_key - Artifact key/path
  • :deduplication - Whether deduplication was used (boolean)

storage_sample_write(sample_id, duration, size_bytes, storage_backend)

Emits a storage sample write event.

Measurements

  • :duration - Duration in native time units
  • :size_bytes - Size of data written in bytes

Metadata

  • :sample_id - ID of the sample being stored
  • :storage_backend - Storage backend identifier (e.g., :postgres, :ets)