Langfuse.Ingestion (Langfuse v0.1.0)
View SourceAsynchronous event batching and ingestion to Langfuse.
This GenServer manages the queue of tracing events and sends them to Langfuse in batches. Events are queued immediately and sent asynchronously to avoid blocking application code.
Batching Behavior
Events are sent in batches when any of these conditions occur:
- Batch size threshold is reached (configured via
:batch_size) - Flush interval timer fires (configured via
:flush_interval) flush/1is called explicitly- Application shutdown is initiated
Graceful Shutdown
The ingestion process traps exits and flushes all pending events during termination. This ensures no events are lost during application shutdown.
Telemetry
Batch flushes emit telemetry events. See Langfuse.Telemetry for details.
Usage
This module is used internally by the SDK. Events are enqueued automatically when creating traces, spans, generations, events, and scores.
Langfuse.trace(name: "request")For explicit control:
Langfuse.flush()
Langfuse.shutdown()
Summary
Functions
Returns a specification to start this module under a supervisor.
Enqueues an event for batched ingestion.
Flushes all pending events synchronously.
Returns the current number of queued events.
Shuts down the ingestion process gracefully.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec enqueue(map()) :: :ok
Enqueues an event for batched ingestion.
Events are queued asynchronously and sent in batches. If tracing is disabled via configuration, this is a no-op.
If an :event_handler function is configured, it will be called
with the event instead of queueing. This is useful for testing.
This function is called internally by trace, span, generation, event, and score modules.
@spec flush(keyword()) :: :ok | {:error, :timeout}
Flushes all pending events synchronously.
Blocks until all queued events are sent or the timeout is reached.
Options
:timeout- Maximum wait time in milliseconds. Defaults to 5,000.
Examples
Langfuse.Ingestion.flush()
# => :ok
Langfuse.Ingestion.flush(timeout: 10_000)
# => :ok
@spec queue_size() :: non_neg_integer()
Returns the current number of queued events.
Useful for monitoring and debugging.
Examples
Langfuse.Ingestion.queue_size()
# => 5
@spec shutdown() :: :ok
Shuts down the ingestion process gracefully.
Flushes all pending events before stopping. Uses a 30-second timeout.