View Source NLdoc.Stream.Message (NLdoc.Stream v1.0.0)

Stream Consumer

Summary

Functions

Retrieves the stream filter value from the provided headers.

Retrieves the stream offset from the provided headers.

Even though we set x-stream-filter to id, we still need to check the x-stream-filter-value header to be sure, because

Types

t()

@type t() :: map()

Functions

get_stream_filter_value(map)

@spec get_stream_filter_value(t() | [NLdoc.Stream.Message.Header.t()]) ::
  String.t() | nil

Retrieves the stream filter value from the provided headers.

Examples

iex> headers = [{"x-stream-filter-value", :longstr, "value"}]
iex> NLdoc.Stream.Message.get_stream_filter_value(%{headers: headers})
"value"
iex> NLdoc.Stream.Message.get_stream_filter_value(%{headers: []})
nil

get_stream_offset(map)

@spec get_stream_offset(t() | [NLdoc.Stream.Message.Header.t()]) :: integer() | nil

Retrieves the stream offset from the provided headers.

Examples

iex> headers = [{"x-stream-offset", :long, 42}]
iex> NLdoc.Stream.Message.get_stream_offset(%{headers: headers})
42
iex> NLdoc.Stream.Message.get_stream_offset(%{headers: []})
nil

matches_stream_filter?(msg, value)

@spec matches_stream_filter?(t(), String.t()) :: boolean()

Even though we set x-stream-filter to id, we still need to check the x-stream-filter-value header to be sure, because:

... server-side filtering is probabilistic: messages that do not match the filter value
can still be sent to the consumer. The server uses a Bloom filter, a space-efficient
probabilistic data structure, where false positives are possible.

See: RabbitMQ Docs on Streams Filtering

Returns true if the stream filter value in the message equals the given value.

Examples

iex> msg = %{headers: [{"x-stream-filter-value", :longstr,"uuid"}]}
iex> NLdoc.Stream.Message.matches_stream_filter?(msg, "uuid")
true
iex> NLdoc.Stream.Message.matches_stream_filter?(msg, "wrong-id")
false