Streamkeeper

View Source

Hex.pm Documentation

An Elixir/OTP implementation of the Durable Streams protocol - a specification for append-only, URL-addressable byte logs.

Features

  • Full HTTP protocol compliance (PUT, POST, GET, DELETE, HEAD)
  • JSON mode with array flattening
  • Long-polling and Server-Sent Events (SSE) for live updates
  • Phoenix LiveView helper module for easy integration
  • Stream TTL and expiration
  • Sequence ordering enforcement
  • ETag-based caching
  • OTP supervision tree for fault tolerance

Requirements

  • Elixir 1.19+
  • Erlang/OTP 27+

Installation

Add streamkeeper to your dependencies in mix.exs:

def deps do
  [
    {:streamkeeper, "~> 0.3.0"},
    # Choose an HTTP server adapter:
    {:plug_cowboy, "~> 2.7"}  # or {:bandit, "~> 1.0"}
  ]
end

Quick Start

Standalone Server

Start the HTTP server on a specific port:

# Create a router module that mounts the protocol at /v1/stream
defmodule MyApp.StreamRouter do
  use Plug.Router
  plug :match
  plug :dispatch
  forward "/v1/stream", to: DurableStreams.Protocol.Plug
end

# In your application.ex or IEx
{:ok, _} = Plug.Cowboy.http(MyApp.StreamRouter, [], port: 4000)

Phoenix Integration

DurableStreams integrates seamlessly with Phoenix applications. The library starts its own supervision tree automatically when added as a dependency, so no additional setup is required beyond routing.

Step 1: Add the dependency

# mix.exs
def deps do
  [
    {:phoenix, "~> 1.7"},
    {:streamkeeper, "~> 0.3.0"},
    # ... other deps
  ]
end

Step 2: Add the route

# lib/my_app_web/router.ex
defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  # Your existing pipelines...

  # Forward durable streams requests (no pipeline needed - it handles its own parsing)
  forward "/v1/stream", DurableStreams.Protocol.Plug
end

Step 3: Use it!

# Create a stream
curl -X PUT http://localhost:4000/v1/stream/my-stream \
  -H "Content-Type: text/plain"

# Append data
curl -X POST http://localhost:4000/v1/stream/my-stream \
  -H "Content-Type: text/plain" \
  -d "Hello, Phoenix!"

# Read data
curl "http://localhost:4000/v1/stream/my-stream?offset=-1"

Using the programmatic API in Phoenix contexts

You can also use the DurableStreams module directly in your Phoenix controllers, channels, or LiveViews:

defmodule MyAppWeb.StreamController do
  use MyAppWeb, :controller

  def create(conn, %{"id" => stream_id}) do
    case DurableStreams.create(stream_id, content_type: "application/json") do
      {:ok, _} -> json(conn, %{status: "created", stream_id: stream_id})
      {:error, :already_exists} -> json(conn, %{status: "exists", stream_id: stream_id})
    end
  end

  def append(conn, %{"id" => stream_id, "data" => data}) do
    {:ok, offset} = DurableStreams.append(stream_id, Jason.encode!(data))
    json(conn, %{offset: offset})
  end
end

Note: DurableStreams uses its own Phoenix.PubSub instance (DurableStreams.PubSub) which does not conflict with your application's PubSub.

Phoenix LiveView Integration

For LiveView applications, use the DurableStreams.LiveView helper module to handle long-polling with automatic offset tracking and reconnection:

defmodule MyAppWeb.EventsLive do
  use Phoenix.LiveView
  alias DurableStreams.LiveView, as: DSLive

  def mount(_params, _session, socket) do
    {:ok, DSLive.init(socket)}
  end

  def handle_event("subscribe", %{"stream_id" => stream_id}, socket) do
    {:noreply, DSLive.listen(socket, stream_id)}
  end

  def handle_event("unsubscribe", _, socket) do
    {:noreply, DSLive.stop(socket)}
  end

  # Handle stream messages
  def handle_info(msg, socket) do
    if DSLive.stream_message?(msg) do
      case DSLive.handle_message(socket, msg) do
        {:data, messages, socket} ->
          {:noreply, process_messages(socket, messages)}

        {:status, _status, socket} ->
          {:noreply, socket}

        {:complete, socket} ->
          {:noreply, assign(socket, :finished, true)}

        {:error, reason, socket} ->
          {:noreply, assign(socket, :error, reason)}
      end
    else
      {:noreply, socket}
    end
  end

  defp process_messages(socket, messages) do
    Enum.reduce(messages, socket, fn msg, acc ->
      update(acc, :events, &[msg.data | &1])
    end)
  end
end

Available functions:

FunctionDescription
init/2Initialize stream assigns on socket
listen/3Start listening to a stream with offset tracking
stop/1Stop the listener, preserve stream ID and offset
reset/1Stop and clear all stream state
stream_message?/1Check if a message is from the stream listener
handle_message/2Process stream messages, returns {:data, messages, socket}, {:status, status, socket}, {:complete, socket}, or {:error, reason, socket}
status/1Get current status (:idle, :connecting, :streaming, :disconnected)
stream_id/1Get current stream ID
offset/1Get current offset
listening?/1Check if actively listening

The module uses ds_ prefixed assigns (e.g., @ds_status, @ds_stream_id, @ds_offset) to avoid conflicts with your application's assigns.

Note: Requires phoenix_live_view as an optional dependency. The module is only compiled when Phoenix.LiveView is available.

Programmatic API

Use the DurableStreams.StreamManager module directly:

# Create a stream
{:ok, "my-stream"} = DurableStreams.StreamManager.create("my-stream",
  content_type: "text/plain",
  ttl: 3600  # expires in 1 hour
)

# Append data
{:ok, offset} = DurableStreams.StreamManager.append("my-stream", "Hello, World!")

# Read data
{:ok, result} = DurableStreams.StreamManager.read("my-stream", "-1")
# result.data => "Hello, World!"
# result.offset => "0006478b4bce37b5-0001-98ee"

# Long-poll for new data
{:ok, result} = DurableStreams.StreamManager.read("my-stream", offset,
  live: true,
  timeout: 30_000
)

# Delete stream
:ok = DurableStreams.StreamManager.delete("my-stream")

Retention Policies

Streams can have automatic retention policies that compact old messages:

# Create a stream with retention policy
{:ok, _} = DurableStreams.StreamManager.create("log-stream",
  content_type: "text/plain",
  retention: [
    max_age: :timer.hours(24),     # Remove messages older than 24h
    max_messages: 100_000,          # Keep at most 100k messages
    max_bytes: 50 * 1024 * 1024    # Keep at most 50MB
  ]
)

# When messages are compacted, reading old offsets returns 410 Gone
# The response includes the earliest valid offset

JSON Mode

When a stream is created with content-type: application/json, it operates in JSON mode:

# Create JSON stream
{:ok, _} = DurableStreams.StreamManager.create("json-stream",
  content_type: "application/json"
)

# Arrays are flattened one level
# POST [{"a": 1}, {"b": 2}] stores two messages
{:ok, _} = DurableStreams.StreamManager.append("json-stream",
  DurableStreams.JSON.encode!([%{a: 1}, %{b: 2}])
)

# Read returns array of messages
{:ok, result} = DurableStreams.StreamManager.read_messages("json-stream", "-1")
# result.messages => [%{data: "{\"a\":1}", offset: "..."}, ...]

HTTP API

MethodPathDescription
PUT/:stream_idCreate a stream
POST/:stream_idAppend data
GET/:stream_idRead from offset
DELETE/:stream_idDelete stream
HEAD/:stream_idGet metadata

Request Headers

HeaderDescription
Content-TypeStream content type (required for POST, optional for PUT)
Stream-TTLTime-to-live in seconds
Stream-Expires-AtISO 8601 expiration timestamp
Stream-SeqSequence value for ordering
If-None-MatchETag for conditional GET

Response Headers

HeaderDescription
Stream-Next-OffsetOffset for resuming reads
Stream-Up-To-DateTrue when no more data available
Stream-CursorCursor for jitter handling
ETagEntity tag for caching
LocationStream URL (on 201)

Query Parameters

ParameterDescription
offsetStart reading after this offset (-1 for beginning)
liveEnable long-polling (true) or SSE (sse)
timeoutLong-poll timeout in seconds

Examples

The examples/ directory contains runnable demonstrations:

Simple Demo

A command-line script showing core API usage:

elixir examples/simple_demo.exs

Shows stream creation, appending, reading, long-polling, and JSON mode.

LLM Token Streaming

The flagship example demonstrating resumable AI token streaming — the primary use case from the Durable Streams announcement:

iex examples/llm_streaming.exs
# Open http://localhost:4000

# Optional: Set API keys for real AI responses (works without them in Demo mode)
export ANTHROPIC_API_KEY=your-key-here
export OPENAI_API_KEY=your-key-here

Demonstrates:

  • Resumable streaming — disconnect mid-response and resume without losing tokens
  • Multi-client broadcast — multiple tabs watch the same AI response
  • Replay capability — re-watch responses from the beginning
  • Multi-provider support — works with Claude, GPT, or Demo mode (no API keys needed)

See examples/README.md for more details.

Configuration

The library uses sensible defaults but can be configured:

# config/config.exs
config :streamkeeper,
  storage: DurableStreams.Storage.ETS,
  default_timeout: 30_000

Development

Setup

# Clone the repository
git clone https://github.com/errantsky/streamkeeper.git
cd streamkeeper

# Install dependencies
mix deps.get

# Compile
mix compile

Running Tests

# Run unit tests
mix test

# Run with coverage
mix test --cover

Conformance Testing

This library passes 100% of the official Durable Streams conformance tests.

Prerequisites:

  • Node.js 18+ (for the conformance test suite)

Running conformance tests:

# Use the mix task (recommended)
mix durable_streams.conformance

Current conformance: 131/131 tests passing (100%)

Code Quality

# Format code
mix format

# Run static analysis (if credo is installed)
mix credo

Architecture

Component Overview

graph TB
    subgraph "HTTP Layer"
        Plug[Protocol.Plug]
        Handlers[HTTP Handlers]
    end

    subgraph "Business Logic"
        SM[StreamManager]
        SS[StreamServer GenServer]
    end

    subgraph "Storage"
        ETS[ETS Storage]
        PubSub[Phoenix.PubSub]
    end

    subgraph "OTP Supervision"
        App[Application]
        Sup[StreamSupervisor]
        Reg[Registry]
    end

    Plug --> Handlers
    Handlers --> SM
    SM --> SS
    SS --> ETS
    SS --> PubSub
    App --> Sup
    App --> Reg
    App --> ETS
    Sup --> SS

OTP Supervision Tree

graph TB
    App[DurableStreams.Application]
    Sup[DurableStreams.StreamSupervisor<br/>DynamicSupervisor]
    Reg[DurableStreams.Registry<br/>Registry]
    ETS[DurableStreams.Storage.ETS<br/>GenServer]
    PS[Phoenix.PubSub]
    SS1[StreamServer 1]
    SS2[StreamServer 2]
    SS3[StreamServer N]

    App --> Sup
    App --> Reg
    App --> ETS
    App --> PS
    Sup --> SS1
    Sup --> SS2
    Sup --> SS3

Each stream is managed by its own GenServer process, providing:

  • Process isolation
  • Independent failure handling
  • Concurrent access
  • Automatic cleanup on TTL expiration

Request Flow

sequenceDiagram
    participant C as Client
    participant P as Protocol.Plug
    participant H as Handler
    participant SM as StreamManager
    participant SS as StreamServer
    participant S as Storage.ETS

    C->>P: PUT /v1/stream/:id
    P->>H: Create Handler
    H->>SM: create(id, opts)
    SM->>SS: start_link
    SS->>S: create(id, stream)
    S-->>SS: :ok
    SS-->>SM: {:ok, pid}
    SM-->>H: {:ok, id}
    H-->>C: 201 Created

    C->>P: POST /v1/stream/:id
    P->>H: Append Handler
    H->>SM: append(id, data)
    SM->>SS: GenServer.call(:append)
    SS->>S: append(id, data)
    S-->>SS: {:ok, offset}
    SS-->>SM: {:ok, offset}
    SM-->>H: {:ok, offset}
    H-->>C: 200 OK + offset

Long-Polling Flow

sequenceDiagram
    participant C as Client
    participant H as Read Handler
    participant SS as StreamServer
    participant S as Storage
    participant PS as PubSub

    C->>H: GET ?offset=X&live=true
    H->>SS: read(offset, live: true)
    SS->>S: read(offset)
    S-->>SS: {:ok, %{data: <<>>}}
    Note over SS: No data, register waiter
    SS->>SS: Add to waiters list

    Note over C,PS: ... time passes ...

    C->>SS: Another client appends
    SS->>S: append(data)
    S-->>SS: {:ok, new_offset}
    SS->>PS: broadcast(:stream_append)
    PS-->>SS: Notify waiters
    SS->>S: read(offset)
    S-->>SS: {:ok, %{data: ...}}
    SS-->>H: {:ok, result}
    H-->>C: 200 OK + data

Protocol Implementation Notes

This library implements the Durable Streams protocol specification. Below are implementation-specific behaviors and minor differences from the reference specification.

Cursor Format

The protocol suggests using 20-second time intervals from a fixed epoch for cursor values. This implementation uses millisecond timestamps instead, which still ensures:

  • Monotonically increasing values
  • Uniqueness across requests
  • Proper jitter handling when client echoes cursor back

Retention Policy Implementation

Streams can have automatic retention policies that remove old messages based on:

  • max_age: Remove messages older than a specified duration (in milliseconds)
  • max_messages: Keep at most N messages in the stream
  • max_bytes: Keep at most N bytes of data in the stream

When messages are compacted, requests for those offsets return 410 Gone with a Stream-Earliest-Offset header indicating where valid data begins.

Features Not Implemented

FeatureStatusNotes
429 Too Many RequestsNot implementedRate limiting is left to infrastructure (reverse proxy, load balancer)
501 Not ImplementedNot neededAll protocol operations are supported

Storage Backend

Currently only ETS (in-memory) storage is provided. For production use with persistence requirements, a custom storage backend implementing DurableStreams.Storage.Behaviour should be used.

License

MIT License - see LICENSE for details.