DurableStreams (Streamkeeper v0.3.0)

View Source

Elixir implementation of the Durable Streams protocol.

Durable Streams provides append-only, URL-addressable byte logs with support for long-polling and Server-Sent Events (SSE).

Getting Started

The library starts automatically as part of your OTP application. You can use the programmatic API immediately:

# Create a stream
{:ok, "events"} = DurableStreams.create("events", content_type: "text/plain")

# Append data
{:ok, offset} = DurableStreams.append("events", "user logged in")

# Read all data from the beginning
{:ok, result} = DurableStreams.read("events", "-1")
result.data  # => "user logged in"

Phoenix Integration

Forward requests from your Phoenix router to expose the HTTP API:

# In your router.ex
forward "/v1/stream", DurableStreams.Protocol.Plug

Standalone HTTP Server

For standalone usage with Cowboy, create a router that forwards to Protocol.Plug:

defmodule MyApp.Router do
  use Plug.Router
  plug :match
  plug :dispatch
  forward "/v1/stream", to: DurableStreams.Protocol.Plug
end

{:ok, _} = Plug.Cowboy.http(MyApp.Router, [], port: 4000)

JSON Mode

Streams with content_type: "application/json" operate in JSON mode, where arrays are flattened and messages are stored separately:

{:ok, _} = DurableStreams.create("json-events", content_type: "application/json")
{:ok, _} = DurableStreams.append("json-events", ~s([{"event": "a"}, {"event": "b"}]))

# Use read_messages/3 via StreamManager for JSON streams
{:ok, result} = DurableStreams.StreamManager.read_messages("json-events", "-1")
# result.messages => [%{data: "{"event":"a"}", offset: "..."}, ...]

HTTP API Reference

MethodPathPurpose
PUT/:idCreate stream
POST/:idAppend data
GET/:id?offset=XRead from offset
GET/:id?offset=X&live=trueLong-poll for new data
GET/:id?offset=X&live=sseServer-Sent Events
DELETE/:idDelete stream
HEAD/:idGet metadata

See the Durable Streams specification for full protocol details.

Summary

Types

An opaque, lexicographically sortable offset string

Result of a read operation

A stream identifier (any string)

Functions

Appends data to a stream.

Closes a stream, preventing further appends.

Creates a new stream with the given ID and options.

Deletes a stream and all its data.

Gets the metadata for a stream.

Reads data from a stream starting after the given offset.

Types

offset()

@type offset() :: String.t()

An opaque, lexicographically sortable offset string

read_result()

@type read_result() :: %{
  data: binary(),
  offset: offset(),
  has_more: boolean(),
  closed: boolean()
}

Result of a read operation

stream_id()

@type stream_id() :: String.t()

A stream identifier (any string)

Functions

append(stream_id, data)

@spec append(stream_id(), binary()) ::
  {:ok, offset()} | {:error, :not_found | :closed | :seq_conflict}

Appends data to a stream.

Returns {:ok, offset} on success, where offset is the position of the appended data that can be used for subsequent reads.

Examples

{:ok, offset} = DurableStreams.append("my-stream", "Hello!")
{:error, :not_found} = DurableStreams.append("nonexistent", "data")
{:error, :closed} = DurableStreams.append("closed-stream", "data")

close(stream_id)

@spec close(stream_id()) :: :ok | {:error, :not_found}

Closes a stream, preventing further appends.

Reading from a closed stream will still work, but the response will include closed: true in the result.

Examples

:ok = DurableStreams.close("my-stream")

create(stream_id, opts \\ [])

@spec create(
  stream_id(),
  keyword()
) :: {:ok, stream_id()} | {:error, :already_exists}

Creates a new stream with the given ID and options.

Options

  • :content_type - The content type (default: "application/octet-stream")
  • :ttl - Time-to-live in seconds (default: nil)

Examples

{:ok, "my-stream"} = DurableStreams.create("my-stream")
{:ok, "json-stream"} = DurableStreams.create("json-stream", content_type: "application/json")
{:error, :already_exists} = DurableStreams.create("my-stream")

delete(stream_id)

@spec delete(stream_id()) :: :ok | {:error, :not_found}

Deletes a stream and all its data.

Examples

:ok = DurableStreams.delete("my-stream")
{:error, :not_found} = DurableStreams.delete("nonexistent")

get_metadata(stream_id)

@spec get_metadata(stream_id()) ::
  {:ok, DurableStreams.Stream.t()} | {:error, :not_found}

Gets the metadata for a stream.

Returns stream metadata including content type, creation time, and TTL settings.

Examples

{:ok, meta} = DurableStreams.get_metadata("my-stream")
# meta.id => "my-stream"
# meta.content_type => "text/plain"
# meta.created_at => DateTime
# meta.closed => false
# meta.ttl => nil

read(stream_id, offset, opts \\ [])

@spec read(stream_id(), offset(), keyword()) ::
  {:ok, read_result()} | {:error, :not_found}

Reads data from a stream starting after the given offset.

Use "-1" as the offset to read from the beginning.

Options

  • :live - If true, long-polls for new data (default: false)
  • :timeout - Timeout in milliseconds for live reads (default: 30_000)

Examples

{:ok, result} = DurableStreams.read("my-stream", "-1")
# result.data => binary data
# result.offset => next offset to use
# result.has_more => boolean
# result.closed => boolean