DurableStreams.Storage.Behaviour behaviour (Streamkeeper v0.3.0)

View Source

Behaviour defining the storage interface for durable streams.

This behaviour allows for different storage backends:

  • ETS-based (single node)
  • Distributed (multi-node with :pg)
  • Custom implementations

Summary

Callbacks

Appends data to a stream with optional sequence enforcement. The seq parameter, if provided, must be lexicographically greater than the previous seq. Returns {:ok, offset} on success, or an error if the stream doesn't exist, is closed, or the sequence is out of order.

Closes a stream, preventing further appends. Returns :ok on success, or {:error, :not_found} if the stream doesn't exist.

Creates a new stream with the given metadata. Returns :ok on success, or {:error, :already_exists} if the stream already exists.

Gets the current (latest) offset for a stream. Returns {:ok, offset} on success, or {:error, :not_found} if the stream doesn't exist.

Deletes a stream and all its data. Returns :ok on success, or {:error, :not_found} if the stream doesn't exist.

Retrieves the metadata for a stream. Returns {:ok, stream} on success, or {:error, :not_found} if the stream doesn't exist.

Reads data from a stream starting after the given offset. Returns {:ok, read_result} on success, or {:error, :not_found} if the stream doesn't exist.

Reads messages from a stream as a list (for JSON mode). Each message is returned separately instead of concatenated.

Subscribes the calling process to notifications for a stream. Returns :ok on success.

Types

message()

@type message() :: %{data: binary(), offset: offset()}

offset()

@type offset() :: DurableStreams.Offset.t()

read_messages_result()

@type read_messages_result() :: %{
  messages: [message()],
  offset: offset(),
  has_more: boolean(),
  closed: boolean()
}

read_result()

@type read_result() :: %{
  data: binary(),
  offset: offset(),
  has_more: boolean(),
  closed: boolean()
}

stream_id()

@type stream_id() :: String.t()

Callbacks

append(stream_id, binary, arg3)

@callback append(stream_id(), binary(), String.t() | nil) ::
  {:ok, offset()} | {:error, :not_found | :closed | :seq_conflict}

Appends data to a stream with optional sequence enforcement. The seq parameter, if provided, must be lexicographically greater than the previous seq. Returns {:ok, offset} on success, or an error if the stream doesn't exist, is closed, or the sequence is out of order.

close(stream_id)

@callback close(stream_id()) :: :ok | {:error, :not_found}

Closes a stream, preventing further appends. Returns :ok on success, or {:error, :not_found} if the stream doesn't exist.

create(stream_id, t)

@callback create(stream_id(), DurableStreams.Stream.t()) ::
  :ok | {:error, :already_exists}

Creates a new stream with the given metadata. Returns :ok on success, or {:error, :already_exists} if the stream already exists.

current_offset(stream_id)

@callback current_offset(stream_id()) :: {:ok, offset()} | {:error, :not_found}

Gets the current (latest) offset for a stream. Returns {:ok, offset} on success, or {:error, :not_found} if the stream doesn't exist.

delete(stream_id)

@callback delete(stream_id()) :: :ok | {:error, :not_found}

Deletes a stream and all its data. Returns :ok on success, or {:error, :not_found} if the stream doesn't exist.

get_metadata(stream_id)

@callback get_metadata(stream_id()) ::
  {:ok, DurableStreams.Stream.t()} | {:error, :not_found}

Retrieves the metadata for a stream. Returns {:ok, stream} on success, or {:error, :not_found} if the stream doesn't exist.

read(stream_id, offset)

@callback read(stream_id(), offset()) :: {:ok, read_result()} | {:error, :not_found}

Reads data from a stream starting after the given offset. Returns {:ok, read_result} on success, or {:error, :not_found} if the stream doesn't exist.

read_messages(stream_id, offset)

@callback read_messages(stream_id(), offset()) ::
  {:ok, read_messages_result()} | {:error, :not_found}

Reads messages from a stream as a list (for JSON mode). Each message is returned separately instead of concatenated.

subscribe(stream_id)

@callback subscribe(stream_id()) :: :ok

Subscribes the calling process to notifications for a stream. Returns :ok on success.