Langfuse.Ingestion (Langfuse v0.1.0)

View Source

Asynchronous event batching and ingestion to Langfuse.

This GenServer manages the queue of tracing events and sends them to Langfuse in batches. Events are queued immediately and sent asynchronously to avoid blocking application code.

Batching Behavior

Events are sent in batches when any of these conditions occur:

  • Batch size threshold is reached (configured via :batch_size)
  • Flush interval timer fires (configured via :flush_interval)
  • flush/1 is called explicitly
  • Application shutdown is initiated

Graceful Shutdown

The ingestion process traps exits and flushes all pending events during termination. This ensures no events are lost during application shutdown.

Telemetry

Batch flushes emit telemetry events. See Langfuse.Telemetry for details.

Usage

This module is used internally by the SDK. Events are enqueued automatically when creating traces, spans, generations, events, and scores.

Langfuse.trace(name: "request")

For explicit control:

Langfuse.flush()
Langfuse.shutdown()

Summary

Functions

Returns a specification to start this module under a supervisor.

Enqueues an event for batched ingestion.

Flushes all pending events synchronously.

Returns the current number of queued events.

Shuts down the ingestion process gracefully.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

enqueue(event)

@spec enqueue(map()) :: :ok

Enqueues an event for batched ingestion.

Events are queued asynchronously and sent in batches. If tracing is disabled via configuration, this is a no-op.

If an :event_handler function is configured, it will be called with the event instead of queueing. This is useful for testing.

This function is called internally by trace, span, generation, event, and score modules.

flush(opts \\ [])

@spec flush(keyword()) :: :ok | {:error, :timeout}

Flushes all pending events synchronously.

Blocks until all queued events are sent or the timeout is reached.

Options

  • :timeout - Maximum wait time in milliseconds. Defaults to 5,000.

Examples

Langfuse.Ingestion.flush()
# => :ok

Langfuse.Ingestion.flush(timeout: 10_000)
# => :ok

queue_size()

@spec queue_size() :: non_neg_integer()

Returns the current number of queued events.

Useful for monitoring and debugging.

Examples

Langfuse.Ingestion.queue_size()
# => 5

shutdown()

@spec shutdown() :: :ok

Shuts down the ingestion process gracefully.

Flushes all pending events before stopping. Uses a 30-second timeout.