Gralkor.CaptureBuffer (jido_gralkor v3.0.0)

Copy Markdown View Source

In-flight conversation buffer keyed by session_id.

Holds turns until an explicit flush — session lifetime is owned by the consumer; there is no idle-flush policy. On flush/1 (or shutdown via flush_all/0 from terminate/2), the buffered turns are handed to the configured flush_callback with retry: server-internal failures get the configured backoff (default 1s/2s/4s); contract errors (4xx) and upstream-LLM errors drop without retry.

See ex-capture-buffer in gralkor/TEST_TREES.md.

Summary

Functions

Append one turn (a list of Gralkor.Message) to the session's buffer.

Returns a specification to start this module under a supervisor.

Schedule a retry-backed flush of the session's turns. Returns :ok immediately.

Flush every buffered session and await each. Used at shutdown.

Synchronously flush the session's turns and wait for completion.

Return the buffered turns for session_id, or [] if none.

Functions

append(session_id, group_id, agent_name, user_name, msgs)

Append one turn (a list of Gralkor.Message) to the session's buffer.

agent_name is required and non-blank — it is bound on first append for the session and any later append with a different agent_name, user_name, or group_id raises ArgumentError.

user_name is required and non-blank — used at flush time to label user lines in the rendered transcript so graphiti's entity extraction produces a named user node rather than collapsing every user into a generic "User" entity.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

flush(session_id)

Schedule a retry-backed flush of the session's turns. Returns :ok immediately.

flush_all()

Flush every buffered session and await each. Used at shutdown.

flush_and_await(session_id, timeout_ms)

Synchronously flush the session's turns and wait for completion.

Returns :ok only after the flush callback has finished — for the Native adapter this means the episode is queryable via recall/4 (graphiti's add_episode is sync through embed + persist). Returns {:error, :timeout} if the configured retry budget (1s/2s/4s plus the flush's own latency) exceeds timeout_ms; the buffered turns are still available to flush again.

start_link(opts)

turns_for(session_id)

Return the buffered turns for session_id, or [] if none.