DurableStreams (Streamkeeper v0.3.0)
View SourceElixir implementation of the Durable Streams protocol.
Durable Streams provides append-only, URL-addressable byte logs with support for long-polling and Server-Sent Events (SSE).
Getting Started
The library starts automatically as part of your OTP application. You can use the programmatic API immediately:
# Create a stream
{:ok, "events"} = DurableStreams.create("events", content_type: "text/plain")
# Append data
{:ok, offset} = DurableStreams.append("events", "user logged in")
# Read all data from the beginning
{:ok, result} = DurableStreams.read("events", "-1")
result.data # => "user logged in"Phoenix Integration
Forward requests from your Phoenix router to expose the HTTP API:
# In your router.ex
forward "/v1/stream", DurableStreams.Protocol.PlugStandalone HTTP Server
For standalone usage with Cowboy, create a router that forwards to Protocol.Plug:
defmodule MyApp.Router do
use Plug.Router
plug :match
plug :dispatch
forward "/v1/stream", to: DurableStreams.Protocol.Plug
end
{:ok, _} = Plug.Cowboy.http(MyApp.Router, [], port: 4000)JSON Mode
Streams with content_type: "application/json" operate in JSON mode,
where arrays are flattened and messages are stored separately:
{:ok, _} = DurableStreams.create("json-events", content_type: "application/json")
{:ok, _} = DurableStreams.append("json-events", ~s([{"event": "a"}, {"event": "b"}]))
# Use read_messages/3 via StreamManager for JSON streams
{:ok, result} = DurableStreams.StreamManager.read_messages("json-events", "-1")
# result.messages => [%{data: "{"event":"a"}", offset: "..."}, ...]HTTP API Reference
| Method | Path | Purpose |
|---|---|---|
PUT | /:id | Create stream |
POST | /:id | Append data |
GET | /:id?offset=X | Read from offset |
GET | /:id?offset=X&live=true | Long-poll for new data |
GET | /:id?offset=X&live=sse | Server-Sent Events |
DELETE | /:id | Delete stream |
HEAD | /:id | Get metadata |
See the Durable Streams specification for full protocol details.
Summary
Types
An opaque, lexicographically sortable offset string
Result of a read operation
A stream identifier (any string)
Functions
Appends data to a stream.
Closes a stream, preventing further appends.
Creates a new stream with the given ID and options.
Deletes a stream and all its data.
Gets the metadata for a stream.
Reads data from a stream starting after the given offset.
Types
Functions
@spec append(stream_id(), binary()) :: {:ok, offset()} | {:error, :not_found | :closed | :seq_conflict}
Appends data to a stream.
Returns {:ok, offset} on success, where offset is the position
of the appended data that can be used for subsequent reads.
Examples
{:ok, offset} = DurableStreams.append("my-stream", "Hello!")
{:error, :not_found} = DurableStreams.append("nonexistent", "data")
{:error, :closed} = DurableStreams.append("closed-stream", "data")
@spec close(stream_id()) :: :ok | {:error, :not_found}
Closes a stream, preventing further appends.
Reading from a closed stream will still work, but the response
will include closed: true in the result.
Examples
:ok = DurableStreams.close("my-stream")
Creates a new stream with the given ID and options.
Options
:content_type- The content type (default: "application/octet-stream"):ttl- Time-to-live in seconds (default: nil)
Examples
{:ok, "my-stream"} = DurableStreams.create("my-stream")
{:ok, "json-stream"} = DurableStreams.create("json-stream", content_type: "application/json")
{:error, :already_exists} = DurableStreams.create("my-stream")
@spec delete(stream_id()) :: :ok | {:error, :not_found}
Deletes a stream and all its data.
Examples
:ok = DurableStreams.delete("my-stream")
{:error, :not_found} = DurableStreams.delete("nonexistent")
@spec get_metadata(stream_id()) :: {:ok, DurableStreams.Stream.t()} | {:error, :not_found}
Gets the metadata for a stream.
Returns stream metadata including content type, creation time, and TTL settings.
Examples
{:ok, meta} = DurableStreams.get_metadata("my-stream")
# meta.id => "my-stream"
# meta.content_type => "text/plain"
# meta.created_at => DateTime
# meta.closed => false
# meta.ttl => nil
@spec read(stream_id(), offset(), keyword()) :: {:ok, read_result()} | {:error, :not_found}
Reads data from a stream starting after the given offset.
Use "-1" as the offset to read from the beginning.
Options
:live- If true, long-polls for new data (default: false):timeout- Timeout in milliseconds for live reads (default: 30_000)
Examples
{:ok, result} = DurableStreams.read("my-stream", "-1")
# result.data => binary data
# result.offset => next offset to use
# result.has_more => boolean
# result.closed => boolean