DurableStreams.StreamManager (Streamkeeper v0.3.0)

View Source

High-level API for managing streams.

This module provides a clean interface for stream operations, handling process lookup and error cases gracefully.

Summary

Functions

Appends data to a stream.

Closes a stream, preventing further appends.

Creates a new stream with the given ID and options.

Deletes a stream and all its data.

Gets the metadata for a stream.

Reads data from a stream starting after the given offset.

Reads messages from a stream as a list (for JSON mode). Each message is returned separately instead of concatenated.

Functions

append(stream_id, data, opts \\ [])

Appends data to a stream.

Returns {:ok, offset} on success, where offset is the position of the appended data that can be used for subsequent reads.

Options

  • :seq - Optional sequence string for ordering enforcement (default: nil)

close(stream_id)

Closes a stream, preventing further appends.

Reading from a closed stream will still work, but the response will include closed: true in the result.

create(stream_id, opts \\ [])

Creates a new stream with the given ID and options.

Options

  • :content_type - The content type (default: "application/octet-stream")
  • :ttl - Time-to-live in seconds (default: nil)
  • :storage - Storage backend module (default: DurableStreams.Storage.ETS)

Examples

{:ok, "my-stream"} = DurableStreams.StreamManager.create("my-stream")
{:ok, "json-stream"} = DurableStreams.StreamManager.create("json-stream", content_type: "application/json")

delete(stream_id)

Deletes a stream and all its data.

get_metadata(stream_id)

Gets the metadata for a stream.

read(stream_id, offset, opts \\ [])

Reads data from a stream starting after the given offset.

Options

  • :live - If true, long-polls for new data (default: false)
  • :timeout - Timeout in milliseconds for live reads (default: 30_000)

Examples

{:ok, result} = DurableStreams.StreamManager.read("my-stream", "-1")
# result.data, result.offset, result.has_more, result.closed

read_messages(stream_id, offset, opts \\ [])

Reads messages from a stream as a list (for JSON mode). Each message is returned separately instead of concatenated.

Options

  • :live - If true, long-polls for new messages (default: false)
  • :timeout - Timeout in milliseconds for live reads (default: 30_000)