Elixir MOM v0.5.3 MOM.Channel.PointToPoint

Special channel on which only one competing consumer consume messages.

Each consumer is called in order, and MUST return :ok or :nok if :nok, next in line will try. If none consumes its sent to :deadletter. If any consumer throws an exceptionis sent to :invalid messages.

If sent to :deadletter or :invalid, returns :nok, else returns :ok. This eases composability.

Example

To use it, use normal Channel methods (send, subscribe, unsubscribe) on a PointToPoint.start_link.

iex> require Logger
iex> alias MOM.{Channel, Message, RPC}
iex> {:ok, ch} = Channel.PointToPoint.start_link
iex> Channel.subscribe(ch, fn msg ->
...>  case msg.payload do
...>    %RPC.Message{ method: "ping", params: [_] } ->
...>      Logger.info("Consumed ping message")
...>      :ok
...>    _ ->
...>      :nok
...>  end
...>end)
iex> Channel.subscribe(ch, fn msg ->
...>  case msg.payload do
...>    %RPC.Message{ method: "pong", params: [_] } ->
...>      Logger.info("Consumed pong message")
...>      :ok
...>    _ ->
...>      :nok
...>  end
...>end)
iex> Channel.send(ch, %Message{ id: 0, payload: %RPC.Message{ method: "ping", params: ["Hello"]}} )
:ok
iex> Channel.send(ch, %Message{ id: 0, payload: %RPC.Message{ method: "pong", params: ["Hello"]}} )
:ok
iex> Channel.send(ch, %Message{ id: 0, payload: %RPC.Message{ method: "other", params: ["Hello"]}} )
:nok

Channels can self-unsubscribe returning :unsubscribe from the called function.

iex> alias MOM.{Channel, Message}
iex> {:ok, a} = Channel.PointToPoint.start_link
iex> {:ok, data} = Agent.start_link(fn -> 0 end)
iex> Channel.subscribe(a, fn _ ->
...>   Logger.info("Called")
...>   Agent.update(data, &(&1 + 1))
...>  :unsubscribe
...> end)
iex> Channel.send(a, %Message{})
:unsubscribe
iex> Channel.send(a, %Message{})
:empty
iex> :timer.sleep(100) #send is async, wait for it
iex> Agent.get(data, &(&1))
1

Summary

Functions

Dispatchs a message to a list of subscribers

Calls all subscribers in order until one returns :ok. If none returns :ok, returns :nok

state is just the subscriber list callbacks, it will call each function with the message

Sends a message to the channel

Starts the link

Subscribes to a channel

Unsubscribes to a channel

Functions

dispatch(list, msg)

Dispatchs a message to a list of subscribers.

Returns a list of subscribers that must be removed as they have exitted.

handle_call(msg, from, state)

Calls all subscribers in order until one returns :ok. If none returns :ok, returns :nok.

Returns :empty if there are no subscribers.

If no subscribers returns :empty

init(args)

state is just the subscriber list callbacks, it will call each function with the message.

send(channel, message, options, timeout)

Sends a message to the channel.

Avaialble options are:

  • :all — Sends to all channels, behaving as a broadcast channel.
start_link()

Starts the link

stop(pid)
stop(pid, reason)
subscribe(channel, subscriber, options)

Subscribes to a channel.

unsubscribe(channel, subscriber)

Unsubscribes to a channel