Snakepit.Telemetry.GrpcStream (Snakepit v0.6.10)

View Source

Manages gRPC telemetry streams from Python workers.

This GenServer maintains bidirectional telemetry streams with Python workers, translating Python telemetry events into Elixir :telemetry events.

Features:

  • Automatic stream registration when workers connect
  • Dynamic sampling rate adjustments
  • Event filtering
  • Graceful handling of worker disconnections

Summary

Functions

Returns a specification to start this module under a supervisor.

Gets the current state of all registered streams.

Registers a worker for telemetry streaming.

Starts the telemetry stream manager.

Enables or disables telemetry for a specific worker.

Removes a worker from telemetry streaming.

Updates event filters for a specific worker.

Updates the sampling rate for a specific worker.

Types

worker_ctx()

@type worker_ctx() :: %{
  worker_id: String.t(),
  pool_name: atom(),
  python_pid: integer() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

list_streams()

Gets the current state of all registered streams.

register_worker(channel, worker_ctx)

Registers a worker for telemetry streaming.

Automatically initiates a telemetry stream with the worker and starts consuming events.

Examples

iex> channel = connect_to_worker()
iex> Snakepit.Telemetry.GrpcStream.register_worker(channel, %{
...>   worker_id: "worker_1",
...>   pool_name: :default,
...>   python_pid: 12345
...> })
:ok

start_link(opts)

Starts the telemetry stream manager.

toggle(worker_id, enabled)

Enables or disables telemetry for a specific worker.

unregister_worker(worker_id)

Removes a worker from telemetry streaming.

Called when a worker disconnects or terminates.

update_filter(worker_id, opts)

Updates event filters for a specific worker.

update_sampling(worker_id, rate, patterns \\ [])

Updates the sampling rate for a specific worker.

Examples

iex> Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)
:ok

iex> Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.5, ["python.call.*"])
:ok