y_process v0.2.2 YProcess behaviour
A behaviour module for implementing a server of a client-server relation with
publisher-subscriber capabilities. This is a generalized implementation of a
publisher subscriber using :pg2 library using a GenServer.
This project is heavily inspired by
Connection.
Solution using PG2
defmodule Consumer do
def subscribe(channel, callback) do
:pg2.join(channel, self)
loop(channel, callback)
end
defp loop(channel, callback) do
receive do
:stop -> :stopped
message ->
try do
callback.(message)
catch
_, _ -> :stopped
else
:ok -> loop(channel, callback)
:stop -> :stopped
end
end
end
end
defmodule Publisher do
def create(channel) do
:pg2.create(channel)
end
def publish(channel, message) do
channel
|> :pg2.get_members
|> Enum.map(&(send(&1, message)))
:ok
end
end
When testing this modules in iex:
iex(1)> Publisher.create("channel")
:ok
iex(2)> callback = fn message -> IO.inspect {self, message} end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(3)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.168.0>
iex(4)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.170.0>
iex(5)> Publisher.publish("channel", "hello")
{#PID<0.168.0>, "hello"}
{#PID<0.170.0>, "hello"}
:ok
iex(6)>
If we wrap this around a GenServer we would have a better way to handle
messages in the Consumers. Depending on the approach, the messages could be
received either in handle_call/3 (GenServer.call/3), handle_cast/2
(GenServer.cast/2) or handle_info/2 (send/2). YProcess wraps around
GenServer behaviour adding a new callback: handle_event/3 to handle the
messages coming from the processes the current YProcess is subscribed.
Callbacks
There are eight callbacks required to be implemented in a YProcess. Six of
them are the same as GenServer but with some other possible return values
to join, leave, create, delete and emit messages (ackknowledged or not) to
other processes. The remaining two are the following:
handle_event/3: It should be used to handle the messages coming from other process if the current process is subscribed to the channel they are broadcasting the messages.ready/3: It is called after the:joinor:createactions have been executed requested in theinit/1callback.
Example
Let’s define a Producer that every second generates a number between 1 and
10 and sends it to a Consumer that prints the number.
So the Producer is as follows:
defmodule Producer do
use YProcess
def start_link(channel) do
YProcess.start_link(__MODULE__, channel)
end
def stop(producer) do
YProcess.stop(producer)
end
defp get_number do
(:random.uniform * 10) |> :erlang.trunc
end
def init(channel) do
_ = :os.system_time(:seconds) |> :random.seed
{:create, [channel], [channel], 1000}
end
def handle_info(:timeout, channels) do
number = get_number
{:emit, channels, number, channels, 1000}
end
end
And the Consumer is as follows:
defmodule Consumer do
use YProcess
def start_link(:channel) do
YProcess.start_link(__MODULE__, :channel)
end
def stop(consumer) do
YProcess.stop(consumer)
end
def init(channel) do
{:join, [channel], channel}
end
def handle_event(channel, number, channel) when is_integer(number) do
IO.puts inspect {self, number}
{:noreply, channel}
end
end
So when we start a Producer and several Consumers we would have the
following output:
iex(1)> {:ok, producer} = Producer.start_link(:channel)
{:ok, #PID<0.189.0>}
iex(2)> {:ok, consumer0} = Consumer.start_link(:channel)
{:ok, #PID<0.192.0>}
{#PID<0.192.0>, 7}
{#PID<0.192.0>, 8}
iex(3)> {:ok, consumer1} = Consumer.start_link(:channel)
{:ok, #PID<0.194.0>}
{#PID<0.192.0>, 0}
{#PID<0.194.0>, 0}
{#PID<0.192.0>, 2}
{#PID<0.194.0>, 2}
{#PID<0.192.0>, 7}
{#PID<0.194.0>, 7}
iex(4)> Consumer.stop(consumer0)
{#PID<0.194.0>, 2}
{#PID<0.194.0>, 6}
{#PID<0.194.0>, 1}
iex(5)>
Backends
The backend behaviour defines the way the processes create, delete, join,
leave channels and emit messages. By default, the processes use the
YProcess.Backend.PG2 backend that uses :pg2, but it is possible to use the
YProcess.Backend.PhoenixPubSub that uses phoenix pubsub and also use any of
its adapters.
The backend behaviour needs to implement the following callbacks:
# Callback to create a `channel`.
@callback create(channel) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback to delete a `channel`.
@callback delete(channel) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to make a process with `pid` a `channel`.
@callback join(channel, pid) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to make a process with `pid` leave a `channel`.
@callback leave(channel, pid) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to send a `message` to a `channel`.
@callback emit(channel, message) :: :ok | {:error, reason}
when channel: YProcess.channel, message: term, reason: term
To use a backend, just modify the configuration of the application (explained in the next sections) or pass the backend in the module definition i.e:
defmodule Test do
use YProcess, backend: Backend.PhoenixPubSub
(...)
end
For the backends provided, there are two aliases for the modules:
:pg2forYProcess.Backend.PG2:phoenix_pub_subforYProcess.Backend.PhoenixPubSub
The shorter version of the module Test defined above would be:
defmodule Test do
use YProcess, backend: :phoenix_pub_sub
(...)
end
Backend Configuration
To configure the backend globally for all the YProcesses just set the
following:
-
config :y_process, backend: YProces.Backend.PG2 For
YProcess.Backend.PhoenixPubSubuse Mix.Config config :y_process, backend: YProcess.Backend.PhoenixPubSub name: MyApp.PubSub adapter: Phoenix.PubSub.PG2, options: [pool_size: 1]and then add the
Phoenix.PubSubsupervisor to your supervision tree:def start(_type, _args) do import Supervisor.Spec, warn: false children = [ supervisor(YProcess.PhoenixPubSub, []), (...) ] opts = (...) Supervisor.start_link(children, opts) end
Installation
Add YProcess as a dependency in your mix.exs file.
def deps do
[{:y_process, "~> 0.2.1"}]
end
After you’re done, run this in your shell to fetch the new dependency:
$ mix deps.get
Summary
Types
Asynchronous callback response
Channel. Shouldn’t be a list
List of channels
New message to be emitted by the YProcess.
New module state
Reply to the caller
Synchronous callback response
Functions
Sends a synchronous request to the YProcess process identified by conn
and waits for a reply. Times out after 5000 milliseconds if no response is
received
Sends a synchronous request ot the YProcess process identified by conn
and waits for a reply unless the call times out after timeout milliseconds.
A possible value for timeout is :infinity. The call will block
indefinitely
Sends an asynchronous request to the YProcess process identified by
conn
Sends a reply to a request sent by call/3
Starts a YProcess process without links (outside of a supervision tree)
Starts a YProcess process linked to the current process
Waits until the YProcess process identified by conn is ready. This call
is non-blocking and is possible to provided a timeout for it. By default
is 5000
Callbacks
Invoked to change the state of the YProcess when a different version of a
module is loaded (hot code swapping) and the state’s term structure should be
changed
Invoked to handle asynchronous cast/2 messages
Invoked to handle messages coming from a registered channel
Invoked to handle all other messages
Invoked when the server is started. start_link/3 (start/3) will block
until it returns
Invoked when the server joined or created the channels after executing
init/1
Invoked when the server is about to exit. It should do any cleanup required
Types
async_response :: {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:create, channels, new_state} | {:create, channels, new_state, timeout | :hibernate} | {:emit, channels, new_message, new_state} | {:emit, channels, new_message, new_state, timeout | :hibernate} | {:emit_ack, channels, new_message, new_state} | {:emit_ack, channels, new_message, new_state, timeout | :hibernate} | {:join, channels, new_state} | {:join, channels, new_state, timeout | :hibernate} | {:leave, channels, new_state} | {:leave, channels, new_state, timeout | :hibernate} | {:delete, channels, new_state} | {:delete, channels, new_state, timeout | :hibernate} | {:stop, reason :: term, new_state}
Asynchronous callback response.
Channel. Shouldn’t be a list.
New message to be emitted by the YProcess.
New module state.
Reply to the caller.
sync_response :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:rcreate, channels, reply, new_state} | {:rcreate, channels, reply, new_state, timeout | :hibernate} | {:remit, channels, new_message, reply, new_state} | {:remit, channels, new_message, reply, new_state, timeout | :hibernate} | {:remit_ack, channels, new_message, reply, new_state} | {:remit_ack, channels, new_message, reply, new_state, timeout | :hibernate} | {:rjoin, channels, reply, new_state} | {:rjoin, channels, reply, new_state, timeout | :hibernate} | {:rleave, channels, reply, new_state} | {:rleave, channels, reply, new_state, timeout | :hibernate} | {:rdelete, channels, reply, new_state} | {:rdelete, channels, reply, new_state, timeout | :hibernate} | {:stop, reason :: term, reply, new_state} | async_response
Synchronous callback response.
Functions
Sends a synchronous request to the YProcess process identified by conn
and waits for a reply. Times out after 5000 milliseconds if no response is
received.
See GenServer.call/2 for more information.
Sends a synchronous request ot the YProcess process identified by conn
and waits for a reply unless the call times out after timeout milliseconds.
A possible value for timeout is :infinity. The call will block
indefinitely.
See GenServer.call/3 for more information.
Sends an asynchronous request to the YProcess process identified by
conn.
See GenServer.cast/2 for more information.
Whether a YProcess process identified by conn is ready or not. A
YProcess process is ready when it joined or created the channels that
requested to join or create in the init/1 callback. This call is blocking
and is possible to provide a timeout for it. By default is 5000.
Sends a reply to a request sent by call/3.
It receives a from tuple and the response to a request.
See GenServer.reply/2 for more information.
Starts a YProcess process without links (outside of a supervision tree).
It receives the module where the callbacks are implemented, the arguments
args that will receive module.init/1 callback and the options for the
YProcess. This options are the same expected by a GenServer.
See start_link/3 for more information.
start_link(module, any, GenServer.options) :: GenServer.on_start
Starts a YProcess process linked to the current process.
This function is used to start a YProcess process in a supervision tree.
The process will be started by calling init/1 in the callback module with
the given argument.
This function will return after init/1 has returned in the spawned process.
The return values are controlled by the init/1 callback.
It receives the module where the callbacks are implemented, the arguments
args that will receive module.init/1 callback and the options for the
YProcess. This options are the same expected by a GenServer.
See GenServer.start_link/3 for more information.
stop(name :: pid, reason :: term, timeout) :: :ok
Stops the YProcess.
It receives the name or name of the server and optionally the reason for
termination (by default is :normal) and the timeout to wait for the
termination.
See GenServer.stop/3 for more information.
Waits until the YProcess process identified by conn is ready. This call
is non-blocking and is possible to provided a timeout for it. By default
is 5000.
Callbacks
code_change(old_vsn, state :: term, extra :: term) ::
{:ok, new_state :: term} |
{:error, reason :: term} when old_vsn: term | {:down, term}
Invoked to change the state of the YProcess when a different version of a
module is loaded (hot code swapping) and the state’s term structure should be
changed.
old_vsn is the previous version of the module (defined by the @vsn
attribute) when upgrading. When downgrading the previous version is wrapped
in a 2-tuple with first element :down. state is the current state of the
YProcess and extra is any extra data required to change the state.
Returning {:ok, state} changes the state to new_state and code is changed
succesfully.
Returning {:error, reason} fails the code change with reason and the
state remains as the previous state.
If code_change/3 raises an exception the code change fails and the loop
will continue with its previous state. Therefore this callback does not
usually contain side effects.
handle_call(request :: term, from :: {pid, term}, state :: term) :: sync_response
Invoked to handle synchronous call/3 messages. call/3 will block until a
reply is received (unless the call times out or nodes are disconnected).
request is the request message sent by a call/3, from is a 2-tuple
containing the caller’s pid and a term that uniquely identifies the call, and
state is the current state of the YProcess.
Returning {:reply, reply, new_state} send the response reply to the
caller and continues the loop with new state new_state.
Returning {:reply, reply, new_state, timeout} is similar to
{:reply, reply, new_state} except handle_info(:timeout, new_state) will
be called after timeout milliseconds if no messages are received.
Returning {:reply, reply, new_state, :hibernate} is similar to
{:reply, reply, new_state} except the process is hibernated and will
continue the loop once a messages is in its message queue. If a message is
already in the message queue this will immediately. Hibernating a YProcess
causes garbage collection and leaves a continuous heap that minimises the
memory used by the process.
Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.
Returning {:noreply, new_state} does not send a response to the caller and
continues the loop with new_state new_state. The response must be sent with
reply/2.
There are three main use cases for not replying using the return value:
- To reply before returning from the callback because the response is known before calling a slow function.
- To reply after returning from the callback because the response is not yet available.
- To reply from another process, such as a
Task.
Returning {:noreply, new_state, timeout | :hibernate} is similar to
{:noreply, new_state} except a timeout or hibernation occurs as with a
:reply tuple.
Returning {:rcreate, channels, reply, new_state} will create the channels
and reply back to call/3 caller.
Returning {:rcreate, channels, reply, new_state, timeout | :hibernate} is
similar to {:rcreate, channels, reply, new_state} except a timeout or
hibernation occurs as with the :reply tuple.
Returning {:remit, channels, message, reply, new_state} will emit a
message in several channels while replying back to the call/3 caller.
The YProcess does not receive confirmations for the messages sent.
Returning
{:remit, channels, message, reply, new_state, timeout | :hibernate}
is similar to {:remit, channels, message, new_state} except a timeout
or hibernation occurs as with the :reply tuple.
Returning {:remit_ack, channels, message, reply, new_state}
will emit a message in several channels while sending a reply back to
the call/3 caller. The YProcess receives confirmations from every
subscriber in handle_info/2. The confirmation message is
{:DELIVER, pid, channel, message} where pid is the PID of the subscriber.
Returning
{:remit_ack, channels, message, reply, new_state, timeout | :hibernate}
is similar to {:remit_ack, channels, message, reply, new_state}
except a timeout or hibernation occurs as with the :reply tuple.
Returning {:rjoin, channels, reply, new_state} will join the YProcess
to the channels and send a reply back to the call/3 caller.
Returning {:rjoin, channels, reply, new_state, timeout | :hibernate} is
similar to {:rjoin, channels, reply, new_state} except a timeout or
hibernation occurs as with the :reply tuple.
Returning {:rleave, channels, reply, new_state} the YProcess will
leave the channels and send a reply back to the call/3 caller.
Returning {:rleave, channels, reply, new_state, timeout | :hibernate} is
similar to {:rleave, channels, reply, new_state} except a timeout or
hibernation occurs as with the :reply tuple.
Returning {:rdelete, channels, reply, new_state} will delete the channels
and reply back to call/3 caller.
Returning {:rdelete, channels, reply, new_state, timeout | :hibernate}
is similar to {:rdelete, channels, reply, new_state} except a timeout or
hibernation occurs as with the :reply tuple.
Returning {:stop, reason, reply, new_state} stops the loop and
terminate/2 is called with reason reason and new_state. Then the
reply is sent as response to the call and the process exist with reason.
Returning {:stop, reason, new_state} is similar to
{:stop, reason, reply, new_state} except a reply is not sent.
It also returns the same tuples from handle_cast/2.
Invoked to handle asynchronous cast/2 messages.
request is the request message sent by a cast/2 and state is the current
state of the YProcess.
Returning {:noreply, new_state} updates the current state to the
new_state.
Returning {:noreply, new_state, timeout} is similar to
{:noreply, new_state} except handle_info(:timeout, new_state) will
be called after timeout milliseconds if no messages are received.
Returning {:noreply, new_state, :hibernate} is similar to
{:noreply, new_state} except the process is hibernated and will
continue the loop once a messages is in its message queue. If a message is
already in the message queue this will immediately. Hibernating a YProcess
causes garbage collection and leaves a continuous heap that minimises the
memory used by the process.
Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.
Returning {:create, channels, new_state} similar to
{:rcreate, channels, reply, new_state} in handle_call/3, but with no
reply to the caller.
| Returning `{:create, channels, new_state, timeout | :hibernate}` is similar to |
| `{:rcreate, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3, but with no reply to the caller.
Returning {:delete, channels, new_state} similar to
{:rdelete, channels, reply, new_state} in handle_call/3, but with no
reply to the caller.
| Returning `{:delete, channels, new_state, timeout | :hibernate}` is similar |
| to `{:rdelete, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3, but with no reply to the caller.
Returning {:join, channels, new_state} similar to
{:rjoin, channels, reply, new_state} in handle_call/3, but with no
reply to the caller.
| Returning `{:join, channels, new_state, timeout | :hibernate}` is similar to |
| `{:rjoin, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3, but with no reply to the caller.
Returning {:leave, channels, new_state} similar to
{:rleave, channels, reply, new_state} in handle_call/3, but with no
reply to the caller.
| Returning `{:leave, channels, new_state, timeout | :hibernate}` is similar to |
| `{:rleave, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3, but with no reply to the caller.
Returning {:emit, channels, message, new_state} similar to
{:remit, channels, message, reply, new_state} in handle_call/3, but
with no reply to the caller.
Returning {:emit, channels, message, new_state, timeout | :hibernate} is
similar to
{:remit, channels, message, reply, new_state, timeout | :hibernate}
in handle_call/3, but with no reply to the caller.
Returning {:emit_ack, channels, message, new_state} similar to
{:remit_ack, channels, message, reply, new_state} in handle_call/3,
but with no reply to the caller.
Returning {:emit_ack, channels, message, new_state, timeout | :hibernate} is
similar to
{:remit_ack, channels, message, reply, new_state, timeout | :hibernate}
in handle_call/3, but with no reply to the caller.
Returning {:stop, reason, new_state} stops the YProcess with reason.
handle_event(channel :: term, message :: term, state :: term) :: async_response
Invoked to handle messages coming from a registered channel.
channel is the channel from which the message is been received. state
is the current state of the YProcess.
Returns the same output as handle_cast/2.
handle_info(message :: term | :timeout, state :: term) :: async_response
Invoked to handle all other messages.
Receives the message and the current YProcess state. When a timeout
occurs the message is :timeout.
Returns the same output as handle_cast/2.
init(args :: term) ::
{:ok, state} |
{:ok, state, timeout | :hibernate} |
{:create, channels, state} |
{:join, channels, state} |
:ignore |
{:stop, reason :: any} when state: any, channels: list
Invoked when the server is started. start_link/3 (start/3) will block
until it returns.
args is the argument term (second argument) passed to start_link/3.
Returning {:ok, state} will cause start_link/3 to return {:ok, pid}
and the process to enter a loop.
Returning {:ok, state, timeout} is similar to {:ok, state} except
handle_info(:timeout, state) will be called after timeout milliseconds if
no messages are received within the timeout.
Returning {:ok, state, :hibernate} is similar to {:ok, state} except the
process is hibernated before entering the loop. See handle_call/3 for more
information on hibernation.
Returning {:create, channels, state} will create the channels listed in
channels where every channel can be an arbitrary term.
Returning {:join, channels, state} will subscribe to the channels
listed in channels where every channel can be an arbitrary term.
Returning :ignore will cause start_link/3 to return :ignore and the
process will exit normally without entering the loop or calling
terminate/2. If used when part of a supervision tree the parent supervisor
will not fail to start not immediately try to restart the YProcess. The
remainder of the supervision tree will be (re)started and so the YProcess
should not be required by other processes. It can be restarted later with
Supervisor.restart_child/2 as the child specification is saved in the
parent supervisor. The main use cases for this are:
- The
YProcessis disabled by configuration but might be enabled later. - An error occurred and it will be handled by a different mechanism than the
Supervisor. Likely this approach involves callingSupervisor.restart_child/2after a delay to attempt a restart.
Returning {:stop, reason} will cause start_link/3 to return
{:error, reason} and the process to exit with reason without entering the
loop or calling terminate/2
ready(action, channels, state :: term) :: async_response when action: :joined | :created
Invoked when the server joined or created the channels after executing
init/1.
action can be either :joined or :created depending on which action
executed to join or create the channels, repectively. state is the
current state of the YProcess.
Returns the same output as handle_cast/2.
terminate(reason, state :: term) :: term when reason: :normal | :shutdown | {:shutdown, term} | term
Invoked when the server is about to exit. It should do any cleanup required.
reason is the exit reason and state is the current state of the
YProcess. The return value is ignored.
terminate/2 is called if a callback except init/1 returns a :stop
tuple, raises an exception, calls Kernel.exit/1 or returns an invalid
value. It may also be called if the YProcess traps exits using
Process.flag/2 and the parent process sends an exit signal.