DurableStreams.Stream (Streamkeeper v0.3.0)

View Source

Represents a durable stream's metadata and state.

A stream is a URL-addressable, append-only byte log with:

  • A unique ID
  • A content type (defaults to application/octet-stream)
  • Creation timestamp
  • Optional TTL (time-to-live in seconds)
  • Closed state (once closed, no more appends allowed)
  • Optional retention policy for automatic compaction

Retention Policy

Streams can have a retention policy that automatically removes old messages:

DurableStreams.create("my-stream",
  retention: [
    max_age: :timer.hours(24),     # Remove messages older than 24h
    max_messages: 100_000,          # Keep at most 100k messages
    max_bytes: 50 * 1024 * 1024    # Keep at most 50MB
  ]
)

Whichever limit is exceeded first triggers compaction. When messages are removed, reads for those offsets return 410 Gone.

Summary

Functions

Returns true if two content-types are equivalent (same base type, ignoring charset and case).

Returns true if the stream is in JSON mode (content-type is application/json).

Creates a new stream with the given ID and options.

Normalizes a content-type for comparison (strips charset, lowercases).

Types

retention_policy()

@type retention_policy() :: %{
  optional(:max_age) => non_neg_integer(),
  optional(:max_messages) => non_neg_integer(),
  optional(:max_bytes) => non_neg_integer()
}

t()

@type t() :: %DurableStreams.Stream{
  closed: boolean(),
  content_type: String.t(),
  created_at: DateTime.t(),
  current_offset: String.t() | nil,
  earliest_offset: String.t() | nil,
  expires_at: DateTime.t() | nil,
  id: String.t(),
  message_count: non_neg_integer(),
  retention_policy: retention_policy() | nil,
  total_bytes: non_neg_integer(),
  ttl: non_neg_integer() | nil
}

Functions

content_type_matches?(type1, type2)

@spec content_type_matches?(String.t(), String.t()) :: boolean()

Returns true if two content-types are equivalent (same base type, ignoring charset and case).

json_mode?(arg1)

@spec json_mode?(t()) :: boolean()

Returns true if the stream is in JSON mode (content-type is application/json).

In JSON mode:

  • Each POST stores messages as distinct units
  • GET returns JSON array of all messages in range
  • Array POSTs are flattened one level

new(id, opts \\ [])

@spec new(
  String.t(),
  keyword()
) :: t()

Creates a new stream with the given ID and options.

Options

  • :content_type - The content type of the stream (default: "application/octet-stream")
  • :ttl - Time-to-live in seconds (default: nil, meaning no expiration)
  • :expires_at - Absolute expiration DateTime (default: nil)
  • :retention - Retention policy keyword list with :max_age, :max_messages, :max_bytes

normalize_content_type(content_type)

@spec normalize_content_type(String.t()) :: String.t()

Normalizes a content-type for comparison (strips charset, lowercases).