View Source NLdoc.Stream.Message (NLdoc.Stream v1.0.0)
Stream Consumer
Summary
Functions
Retrieves the stream filter value from the provided headers.
Retrieves the stream offset from the provided headers.
Even though we set x-stream-filter to id, we still need to check the
x-stream-filter-value header to be sure, because
Types
@type t() :: map()
Functions
@spec get_stream_filter_value(t() | [NLdoc.Stream.Message.Header.t()]) :: String.t() | nil
Retrieves the stream filter value from the provided headers.
Examples
iex> headers = [{"x-stream-filter-value", :longstr, "value"}]
iex> NLdoc.Stream.Message.get_stream_filter_value(%{headers: headers})
"value"
iex> NLdoc.Stream.Message.get_stream_filter_value(%{headers: []})
nil
@spec get_stream_offset(t() | [NLdoc.Stream.Message.Header.t()]) :: integer() | nil
Retrieves the stream offset from the provided headers.
Examples
iex> headers = [{"x-stream-offset", :long, 42}]
iex> NLdoc.Stream.Message.get_stream_offset(%{headers: headers})
42
iex> NLdoc.Stream.Message.get_stream_offset(%{headers: []})
nil
Even though we set x-stream-filter to id, we still need to check the
x-stream-filter-value header to be sure, because:
... server-side filtering is probabilistic: messages that do not match the filter value
can still be sent to the consumer. The server uses a Bloom filter, a space-efficient
probabilistic data structure, where false positives are possible.See: RabbitMQ Docs on Streams Filtering
Returns true if the stream filter value in the message equals the given value.
Examples
iex> msg = %{headers: [{"x-stream-filter-value", :longstr,"uuid"}]}
iex> NLdoc.Stream.Message.matches_stream_filter?(msg, "uuid")
true
iex> NLdoc.Stream.Message.matches_stream_filter?(msg, "wrong-id")
false