Hephaestus.Telemetry (hephaestus v0.3.1)

Copy Markdown View Source

Telemetry event emission for Hephaestus workflow and step lifecycle.

This module provides helper functions that Runner implementations call to emit :telemetry events at every significant lifecycle point. Using these helpers (rather than calling :telemetry.execute/3 directly) ensures consistent event names, measurements, and metadata across all runners (Local, Oban, custom).

Events

Hephaestus emits 11 events organized in two spans and five standalone events:

Workflow Span

  • [:hephaestus, :workflow, :start] --- workflow instance created
  • [:hephaestus, :workflow, :stop] --- workflow completed successfully
  • [:hephaestus, :workflow, :exception] --- workflow failed

Step Span

  • [:hephaestus, :step, :start] --- step execution begins
  • [:hephaestus, :step, :stop] --- step completed successfully
  • [:hephaestus, :step, :exception] --- step failed or raised

Standalone

  • [:hephaestus, :step, :async] --- step returned {:async}
  • [:hephaestus, :step, :resume] --- async step resumed
  • [:hephaestus, :workflow, :transition] --- engine activated next steps
  • [:hephaestus, :engine, :advance] --- engine advance tick
  • [:hephaestus, :runner, :init] --- runner supervision tree started

API Stability

Event names and metadata structure are public API. Patch versions only add fields, never remove or rename. See the Telemetry guide for full details.

Summary

Functions

Emits [:hephaestus, :engine, :advance] on each Engine.advance/1 call by the runner.

Returns the event name [:hephaestus, :engine, :advance].

Returns all 11 Hephaestus telemetry event names. Used by LogHandler to attach to all events.

Emits [:hephaestus, :runner, :init] when the Hephaestus supervision tree starts.

Returns the event name [:hephaestus, :runner, :init].

Emits [:hephaestus, :step, :async] when a step returns {:async}.

Returns the event name [:hephaestus, :step, :async].

Emits [:hephaestus, :step, :exception] when a step returns {:error, reason} or raises.

Returns the event name [:hephaestus, :step, :exception].

Emits [:hephaestus, :step, :resume] when an async step receives an external event or timeout.

Returns the event name [:hephaestus, :step, :resume].

Emits [:hephaestus, :step, :start] before a step's execute/3 callback is invoked.

Returns the event name [:hephaestus, :step, :start].

Emits [:hephaestus, :step, :stop] when a step returns {:ok, event} or {:ok, event, updates}.

Returns the event name [:hephaestus, :step, :stop].

Emits [:hephaestus, :workflow, :exception] when a workflow reaches :failed status.

Returns the event name [:hephaestus, :workflow, :exception].

Emits [:hephaestus, :workflow, :start] when a runner creates a new workflow instance.

Returns the event name [:hephaestus, :workflow, :start].

Emits [:hephaestus, :workflow, :stop] when a workflow reaches :completed status.

Returns the event name [:hephaestus, :workflow, :stop].

Emits [:hephaestus, :workflow, :transition] when the engine activates next steps after a step completion.

Returns the event name [:hephaestus, :workflow, :transition].

Types

event_name()

@type event_name() :: [atom()]

Functions

engine_advance(instance, duration, extra_metadata)

@spec engine_advance(Hephaestus.Core.Instance.t(), integer(), map()) :: :ok

Emits [:hephaestus, :engine, :advance] on each Engine.advance/1 call by the runner.

Measurements

  • :duration --- advance tick execution time in native units
  • :active_steps_count --- number of currently active steps (extracted from extra_metadata)
  • :completed_in_advance --- steps completed in this advance tick (extracted from extra_metadata)

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • Plus any additional fields in extra_metadata (e.g., :status_before, :status_after, :iteration)
  • Plus caller-supplied telemetry_metadata from the instance

engine_advance_event()

@spec engine_advance_event() :: event_name()

Returns the event name [:hephaestus, :engine, :advance].

events()

@spec events() :: [event_name()]

Returns all 11 Hephaestus telemetry event names. Used by LogHandler to attach to all events.

runner_init(extra_metadata)

@spec runner_init(map()) :: :ok

Emits [:hephaestus, :runner, :init] when the Hephaestus supervision tree starts.

Measurements

Metadata

  • :name --- registered name of the runner
  • :runner --- the runner module
  • :storage --- the storage module
  • :pid --- PID of the runner process

runner_init_event()

@spec runner_init_event() :: event_name()

Returns the event name [:hephaestus, :runner, :init].

step_async(instance, step, duration, extra_metadata)

@spec step_async(Hephaestus.Core.Instance.t(), module(), integer(), map()) :: :ok

Emits [:hephaestus, :step, :async] when a step returns {:async}.

Measurements

  • :duration --- step execution time in native units before going async

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :step --- the step module that went async
  • Plus any additional fields in extra_metadata (e.g., :step_key, :instance_status)
  • Plus caller-supplied telemetry_metadata from the instance

step_async_event()

@spec step_async_event() :: event_name()

Returns the event name [:hephaestus, :step, :async].

step_exception(instance, step, duration, kind, reason, stacktrace, extra_metadata)

@spec step_exception(
  Hephaestus.Core.Instance.t(),
  module(),
  integer(),
  atom(),
  term(),
  list() | nil,
  map()
) :: :ok

Emits [:hephaestus, :step, :exception] when a step returns {:error, reason} or raises.

Measurements

  • :duration --- step execution time in native units

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :step --- the step module that failed
  • :kind --- :error, :throw, or :exit
  • :reason --- the error term
  • :stacktrace --- stacktrace list or nil
  • Plus any additional fields in extra_metadata (e.g., :step_key)
  • Plus caller-supplied telemetry_metadata from the instance

step_exception_event()

@spec step_exception_event() :: event_name()

Returns the event name [:hephaestus, :step, :exception].

step_resume(instance, step, extra_metadata)

@spec step_resume(Hephaestus.Core.Instance.t(), module(), map()) :: :ok

Emits [:hephaestus, :step, :resume] when an async step receives an external event or timeout.

Measurements

  • :system_time --- wall-clock time from System.system_time/0
  • :wait_duration --- time spent waiting in native units (extracted from extra_metadata)

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :step --- the step module being resumed
  • Plus any additional fields in extra_metadata (e.g., :step_key, :resume_event, :source)
  • Plus caller-supplied telemetry_metadata from the instance

step_resume_event()

@spec step_resume_event() :: event_name()

Returns the event name [:hephaestus, :step, :resume].

step_start(instance, step, extra_metadata)

@spec step_start(Hephaestus.Core.Instance.t(), module(), map()) :: :ok

Emits [:hephaestus, :step, :start] before a step's execute/3 callback is invoked.

Measurements

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :step --- the step module being executed
  • Plus any additional fields in extra_metadata (e.g., :step_key, :concurrent, :active_steps_count)
  • Plus caller-supplied telemetry_metadata from the instance

step_start_event()

@spec step_start_event() :: event_name()

Returns the event name [:hephaestus, :step, :start].

step_stop(instance, step, duration, extra_metadata)

@spec step_stop(Hephaestus.Core.Instance.t(), module(), integer(), map()) :: :ok

Emits [:hephaestus, :step, :stop] when a step returns {:ok, event} or {:ok, event, updates}.

Measurements

  • :duration --- step execution time in native units

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :step --- the step module that completed
  • Plus any additional fields in extra_metadata (e.g., :step_key, :event, :has_context_updates, :transitions_to)
  • Plus caller-supplied telemetry_metadata from the instance

step_stop_event()

@spec step_stop_event() :: event_name()

Returns the event name [:hephaestus, :step, :stop].

workflow_exception(instance, kind, reason, stacktrace, extra_metadata)

@spec workflow_exception(
  Hephaestus.Core.Instance.t(),
  atom(),
  term(),
  list() | nil,
  map()
) :: :ok

Emits [:hephaestus, :workflow, :exception] when a workflow reaches :failed status.

Measurements

  • :duration --- elapsed time in native units since workflow start (or nil after VM restart)

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :kind --- :error, :throw, or :exit
  • :reason --- the error term
  • :stacktrace --- stacktrace list or nil if {:error, reason} return
  • :status --- always :failed
  • Plus any additional fields in extra_metadata (e.g., :failed_step, :runner)
  • Plus caller-supplied telemetry_metadata from the instance

workflow_exception_event()

@spec workflow_exception_event() :: event_name()

Returns the event name [:hephaestus, :workflow, :exception].

workflow_start(instance, extra_metadata)

@spec workflow_start(Hephaestus.Core.Instance.t(), map()) :: :ok

Emits [:hephaestus, :workflow, :start] when a runner creates a new workflow instance.

Measurements

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • Plus any fields in extra_metadata (e.g., :initial_step, :context_keys, :runner)
  • Plus caller-supplied telemetry_metadata from the instance

workflow_start_event()

@spec workflow_start_event() :: event_name()

Returns the event name [:hephaestus, :workflow, :start].

workflow_stop(instance, extra_metadata)

@spec workflow_stop(Hephaestus.Core.Instance.t(), map()) :: :ok

Emits [:hephaestus, :workflow, :stop] when a workflow reaches :completed status.

Measurements

  • :duration --- elapsed time in native units since workflow start (or nil after VM restart)
  • :step_count --- number of steps completed (extracted from extra_metadata)
  • :advance_count --- number of engine advance ticks (extracted from extra_metadata)

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :status --- always :completed
  • Plus any additional fields in extra_metadata (e.g., :completed_steps, :runner)
  • Plus caller-supplied telemetry_metadata from the instance

workflow_stop_event()

@spec workflow_stop_event() :: event_name()

Returns the event name [:hephaestus, :workflow, :stop].

workflow_transition(instance, from_step, event, targets, extra_metadata)

@spec workflow_transition(
  Hephaestus.Core.Instance.t(),
  module(),
  atom(),
  [module()],
  map()
) :: :ok

Emits [:hephaestus, :workflow, :transition] when the engine activates next steps after a step completion.

Measurements

  • :targets_count --- number of activated target steps

Metadata

  • :instance_id --- unique identifier for the workflow instance
  • :workflow --- the workflow module
  • :from_step --- the step module that triggered the transition
  • :event --- the event atom returned by the step
  • :targets --- list of activated step modules
  • :fan_out --- true if targets_count > 1
  • Plus any additional fields in extra_metadata
  • Plus caller-supplied telemetry_metadata from the instance

workflow_transition_event()

@spec workflow_transition_event() :: event_name()

Returns the event name [:hephaestus, :workflow, :transition].