Anvil.Telemetry (Anvil v0.1.1)

View Source

Telemetry integration for Anvil labeling system.

Provides instrumentation for all core operations following the event naming convention: [:anvil, domain, action, lifecycle?]

Event Categories

  • Queue Events: queue creation, status changes
  • Assignment Events: dispatch, timeout, completion
  • Label Events: submission, validation
  • Agreement Events: computation, low score detection
  • Export Events: generation, progress tracking
  • Storage Events: query timing

Usage

Emitting Events

# Count event
Anvil.Telemetry.emit_queue_created(queue_id, metadata)

# Duration event (using span)
Anvil.Telemetry.span(:assignment_dispatch, metadata, fn ->
  result = perform_dispatch()
  {result, additional_metadata}
end)

Attaching Handlers

:telemetry.attach(
  "my-handler",
  [:anvil, :label, :submit, :stop],
  &MyModule.handle_event/4,
  nil
)

Testing

import Telemetry.Test

test "emits telemetry event" do
  attach_telemetry_handler([:anvil, :queue, :created])

  Anvil.Telemetry.emit_queue_created(queue_id, %{})

  assert_received {:telemetry, [:anvil, :queue, :created], %{}, %{queue_id: ^queue_id}}
end

Summary

Functions

Emits an assignment completed event.

Emits an assignment created event.

Emits an assignment expired event.

Emits an assignment timeout event (batch).

Emits an export completed event.

Emits an export failed event.

Emits an export progress event.

Emits a label validation failed event.

Emits a low agreement score event.

Emits a queue created event.

Emits a schema validation event.

Generic telemetry span wrapper.

Wraps batch agreement recomputation in a telemetry span.

Wraps agreement computation in a telemetry span.

Wraps assignment dispatch in a telemetry span.

Wraps export generation in a telemetry span.

Wraps label submission in a telemetry span.

Wraps storage query in a telemetry span.

Functions

emit_assignment_completed(assignment_id, metadata)

@spec emit_assignment_completed(binary(), map()) :: :ok

Emits an assignment completed event.

emit_assignment_created(assignment_id, metadata)

@spec emit_assignment_created(binary(), map()) :: :ok

Emits an assignment created event.

emit_assignment_expired(assignment_id, metadata)

@spec emit_assignment_expired(binary(), map()) :: :ok

Emits an assignment expired event.

emit_assignment_timed_out(count, metadata)

@spec emit_assignment_timed_out(integer(), map()) :: :ok

Emits an assignment timeout event (batch).

emit_export_completed(export_id, metadata)

@spec emit_export_completed(binary(), map()) :: :ok

Emits an export completed event.

emit_export_failed(export_id, reason, metadata)

@spec emit_export_failed(binary(), term(), map()) :: :ok

Emits an export failed event.

emit_export_progress(rows_processed, metadata)

@spec emit_export_progress(integer(), map()) :: :ok

Emits an export progress event.

emit_label_validation_failed(assignment_id, errors, metadata)

@spec emit_label_validation_failed(binary(), list(), map()) :: :ok

Emits a label validation failed event.

emit_low_agreement_score(score, metadata)

@spec emit_low_agreement_score(float(), map()) :: :ok

Emits a low agreement score event.

emit_queue_created(queue_id, metadata)

@spec emit_queue_created(binary(), map()) :: :ok

Emits a queue created event.

emit_queue_status_changed(queue_id, from_status, to_status, metadata)

@spec emit_queue_status_changed(binary(), atom(), atom(), map()) :: :ok

Emits a queue status changed event.

emit_schema_migration(from_version, to_version, metadata)

@spec emit_schema_migration(binary(), binary(), map()) :: :ok

Emits a schema migration event.

emit_schema_validation(schema_id, valid?, metadata)

@spec emit_schema_validation(binary(), boolean(), map()) :: :ok

Emits a schema validation event.

span(operation, metadata, fun)

@spec span(atom(), map(), (-> {any(), map()})) :: any()

Generic telemetry span wrapper.

The function must return a {result, metadata} tuple where metadata will be merged with the initial metadata for the stop/exception events.

Examples

Anvil.Telemetry.span(:my_operation, %{queue_id: id}, fn ->
  result = do_work()
  {result, %{work_count: 42}}
end)

This emits:

  • [:anvil, :my_operation, :start] with initial metadata
  • [:anvil, :my_operation, :stop] with duration and merged metadata
  • [:anvil, :my_operation, :exception] if an error occurs

span_agreement_batch_recompute(metadata, fun)

@spec span_agreement_batch_recompute(map(), (-> {any(), map()})) :: any()

Wraps batch agreement recomputation in a telemetry span.

span_agreement_compute(metadata, fun)

@spec span_agreement_compute(map(), (-> {any(), map()})) :: any()

Wraps agreement computation in a telemetry span.

span_assignment_dispatch(metadata, fun)

@spec span_assignment_dispatch(map(), (-> {any(), map()})) :: any()

Wraps assignment dispatch in a telemetry span.

Returns {result, metadata} tuple from the function.

span_export_generate(metadata, fun)

@spec span_export_generate(map(), (-> {any(), map()})) :: any()

Wraps export generation in a telemetry span.

span_label_submit(metadata, fun)

@spec span_label_submit(map(), (-> {any(), map()})) :: any()

Wraps label submission in a telemetry span.

span_storage_query(operation, metadata, fun)

@spec span_storage_query(binary(), map(), (-> {any(), map()})) :: any()

Wraps storage query in a telemetry span.