y_process v0.2.2 YProcess behaviour
A behaviour module for implementing a server of a client-server relation with
publisher-subscriber capabilities. This is a generalized implementation of a
publisher subscriber using :pg2
library using a GenServer
.
This project is heavily inspired by
Connection
.
Solution using PG2
defmodule Consumer do
def subscribe(channel, callback) do
:pg2.join(channel, self)
loop(channel, callback)
end
defp loop(channel, callback) do
receive do
:stop -> :stopped
message ->
try do
callback.(message)
catch
_, _ -> :stopped
else
:ok -> loop(channel, callback)
:stop -> :stopped
end
end
end
end
defmodule Publisher do
def create(channel) do
:pg2.create(channel)
end
def publish(channel, message) do
channel
|> :pg2.get_members
|> Enum.map(&(send(&1, message)))
:ok
end
end
When testing this modules in iex
:
iex(1)> Publisher.create("channel")
:ok
iex(2)> callback = fn message -> IO.inspect {self, message} end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(3)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.168.0>
iex(4)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.170.0>
iex(5)> Publisher.publish("channel", "hello")
{#PID<0.168.0>, "hello"}
{#PID<0.170.0>, "hello"}
:ok
iex(6)>
If we wrap this around a GenServer
we would have a better way to handle
messages in the Consumers
. Depending on the approach, the messages could be
received either in handle_call/3
(GenServer.call/3
), handle_cast/2
(GenServer.cast/2
) or handle_info/2
(send/2
). YProcess
wraps around
GenServer
behaviour adding a new callback: handle_event/3
to handle the
messages coming from the processes the current YProcess
is subscribed.
Callbacks
There are eight callbacks required to be implemented in a YProcess
. Six of
them are the same as GenServer
but with some other possible return values
to join, leave, create, delete and emit messages (ackknowledged or not) to
other processes. The remaining two are the following:
handle_event/3
: It should be used to handle the messages coming from other process if the current process is subscribed to the channel they are broadcasting the messages.ready/3
: It is called after the:join
or:create
actions have been executed requested in theinit/1
callback.
Example
Let’s define a Producer
that every second generates a number between 1 and
10 and sends it to a Consumer
that prints the number.
So the Producer
is as follows:
defmodule Producer do
use YProcess
def start_link(channel) do
YProcess.start_link(__MODULE__, channel)
end
def stop(producer) do
YProcess.stop(producer)
end
defp get_number do
(:random.uniform * 10) |> :erlang.trunc
end
def init(channel) do
_ = :os.system_time(:seconds) |> :random.seed
{:create, [channel], [channel], 1000}
end
def handle_info(:timeout, channels) do
number = get_number
{:emit, channels, number, channels, 1000}
end
end
And the Consumer
is as follows:
defmodule Consumer do
use YProcess
def start_link(:channel) do
YProcess.start_link(__MODULE__, :channel)
end
def stop(consumer) do
YProcess.stop(consumer)
end
def init(channel) do
{:join, [channel], channel}
end
def handle_event(channel, number, channel) when is_integer(number) do
IO.puts inspect {self, number}
{:noreply, channel}
end
end
So when we start a Producer
and several Consumer
s we would have the
following output:
iex(1)> {:ok, producer} = Producer.start_link(:channel)
{:ok, #PID<0.189.0>}
iex(2)> {:ok, consumer0} = Consumer.start_link(:channel)
{:ok, #PID<0.192.0>}
{#PID<0.192.0>, 7}
{#PID<0.192.0>, 8}
iex(3)> {:ok, consumer1} = Consumer.start_link(:channel)
{:ok, #PID<0.194.0>}
{#PID<0.192.0>, 0}
{#PID<0.194.0>, 0}
{#PID<0.192.0>, 2}
{#PID<0.194.0>, 2}
{#PID<0.192.0>, 7}
{#PID<0.194.0>, 7}
iex(4)> Consumer.stop(consumer0)
{#PID<0.194.0>, 2}
{#PID<0.194.0>, 6}
{#PID<0.194.0>, 1}
iex(5)>
Backends
The backend behaviour defines the way the processes create, delete, join,
leave channels and emit messages. By default, the processes use the
YProcess.Backend.PG2
backend that uses :pg2
, but it is possible to use the
YProcess.Backend.PhoenixPubSub
that uses phoenix pubsub and also use any of
its adapters.
The backend behaviour needs to implement the following callbacks:
# Callback to create a `channel`.
@callback create(channel) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback to delete a `channel`.
@callback delete(channel) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to make a process with `pid` a `channel`.
@callback join(channel, pid) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to make a process with `pid` leave a `channel`.
@callback leave(channel, pid) :: :ok | {:error, reason}
when channel: YProcess.channel, reason: term
# Callback used to send a `message` to a `channel`.
@callback emit(channel, message) :: :ok | {:error, reason}
when channel: YProcess.channel, message: term, reason: term
To use a backend, just modify the configuration of the application (explained in the next sections) or pass the backend in the module definition i.e:
defmodule Test do
use YProcess, backend: Backend.PhoenixPubSub
(...)
end
For the backends provided, there are two aliases for the modules:
:pg2
forYProcess.Backend.PG2
:phoenix_pub_sub
forYProcess.Backend.PhoenixPubSub
The shorter version of the module Test
defined above would be:
defmodule Test do
use YProcess, backend: :phoenix_pub_sub
(...)
end
Backend Configuration
To configure the backend globally for all the YProcess
es just set the
following:
-
config :y_process, backend: YProces.Backend.PG2
For
YProcess.Backend.PhoenixPubSub
use Mix.Config config :y_process, backend: YProcess.Backend.PhoenixPubSub name: MyApp.PubSub adapter: Phoenix.PubSub.PG2, options: [pool_size: 1]
and then add the
Phoenix.PubSub
supervisor to your supervision tree:def start(_type, _args) do import Supervisor.Spec, warn: false children = [ supervisor(YProcess.PhoenixPubSub, []), (...) ] opts = (...) Supervisor.start_link(children, opts) end
Installation
Add YProcess
as a dependency in your mix.exs
file.
def deps do
[{:y_process, "~> 0.2.1"}]
end
After you’re done, run this in your shell to fetch the new dependency:
$ mix deps.get
Summary
Types
Asynchronous callback response
Channel. Shouldn’t be a list
List of channels
New message to be emitted by the YProcess.
New module state
Reply to the caller
Synchronous callback response
Functions
Sends a synchronous request
to the YProcess
process identified by conn
and waits for a reply. Times out after 5000
milliseconds if no response is
received
Sends a synchronous request
ot the YProcess
process identified by conn
and waits for a reply unless the call times out after timeout
milliseconds.
A possible value for timeout
is :infinity
. The call will block
indefinitely
Sends an asynchronous request
to the YProcess
process identified by
conn
Sends a reply to a request sent by call/3
Starts a YProcess
process without links (outside of a supervision tree)
Starts a YProcess
process linked to the current process
Waits until the YProcess
process identified by conn
is ready. This call
is non-blocking and is possible to provided a timeout
for it. By default
is 5000
Callbacks
Invoked to change the state of the YProcess
when a different version of a
module is loaded (hot code swapping) and the state’s term structure should be
changed
Invoked to handle asynchronous cast/2
messages
Invoked to handle messages coming from a registered channel
Invoked to handle all other messages
Invoked when the server is started. start_link/3
(start/3
) will block
until it returns
Invoked when the server joined or created the channels after executing
init/1
Invoked when the server is about to exit. It should do any cleanup required
Types
async_response :: {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:create, channels, new_state} | {:create, channels, new_state, timeout | :hibernate} | {:emit, channels, new_message, new_state} | {:emit, channels, new_message, new_state, timeout | :hibernate} | {:emit_ack, channels, new_message, new_state} | {:emit_ack, channels, new_message, new_state, timeout | :hibernate} | {:join, channels, new_state} | {:join, channels, new_state, timeout | :hibernate} | {:leave, channels, new_state} | {:leave, channels, new_state, timeout | :hibernate} | {:delete, channels, new_state} | {:delete, channels, new_state, timeout | :hibernate} | {:stop, reason :: term, new_state}
Asynchronous callback response.
Channel. Shouldn’t be a list.
New message to be emitted by the YProcess.
New module state.
Reply to the caller.
sync_response :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:rcreate, channels, reply, new_state} | {:rcreate, channels, reply, new_state, timeout | :hibernate} | {:remit, channels, new_message, reply, new_state} | {:remit, channels, new_message, reply, new_state, timeout | :hibernate} | {:remit_ack, channels, new_message, reply, new_state} | {:remit_ack, channels, new_message, reply, new_state, timeout | :hibernate} | {:rjoin, channels, reply, new_state} | {:rjoin, channels, reply, new_state, timeout | :hibernate} | {:rleave, channels, reply, new_state} | {:rleave, channels, reply, new_state, timeout | :hibernate} | {:rdelete, channels, reply, new_state} | {:rdelete, channels, reply, new_state, timeout | :hibernate} | {:stop, reason :: term, reply, new_state} | async_response
Synchronous callback response.
Functions
Sends a synchronous request
to the YProcess
process identified by conn
and waits for a reply. Times out after 5000
milliseconds if no response is
received.
See GenServer.call/2
for more information.
Sends a synchronous request
ot the YProcess
process identified by conn
and waits for a reply unless the call times out after timeout
milliseconds.
A possible value for timeout
is :infinity
. The call will block
indefinitely.
See GenServer.call/3
for more information.
Sends an asynchronous request
to the YProcess
process identified by
conn
.
See GenServer.cast/2
for more information.
Whether a YProcess
process identified by conn
is ready or not. A
YProcess
process is ready
when it joined or created the channels that
requested to join or create in the init/1
callback. This call is blocking
and is possible to provide a timeout
for it. By default is 5000.
Sends a reply to a request sent by call/3
.
It receives a from
tuple and the response
to a request.
See GenServer.reply/2
for more information.
Starts a YProcess
process without links (outside of a supervision tree).
It receives the module
where the callbacks are implemented, the arguments
args
that will receive module.init/1
callback and the options
for the
YProcess
. This options
are the same expected by a GenServer
.
See start_link/3
for more information.
start_link(module, any, GenServer.options) :: GenServer.on_start
Starts a YProcess
process linked to the current process.
This function is used to start a YProcess
process in a supervision tree.
The process will be started by calling init/1
in the callback module with
the given argument.
This function will return after init/1
has returned in the spawned process.
The return values are controlled by the init/1
callback.
It receives the module
where the callbacks are implemented, the arguments
args
that will receive module.init/1
callback and the options
for the
YProcess
. This options
are the same expected by a GenServer
.
See GenServer.start_link/3
for more information.
stop(name :: pid, reason :: term, timeout) :: :ok
Stops the YProcess
.
It receives the name
or name of the server and optionally the reason
for
termination (by default is :normal
) and the timeout
to wait for the
termination.
See GenServer.stop/3
for more information.
Waits until the YProcess
process identified by conn
is ready. This call
is non-blocking and is possible to provided a timeout
for it. By default
is 5000.
Callbacks
code_change(old_vsn, state :: term, extra :: term) :: {:ok, new_state :: term} | {:error, reason :: term} when old_vsn: term | {:down, term}
Invoked to change the state of the YProcess
when a different version of a
module is loaded (hot code swapping) and the state’s term structure should be
changed.
old_vsn
is the previous version of the module (defined by the @vsn
attribute) when upgrading. When downgrading the previous version is wrapped
in a 2-tuple with first element :down
. state
is the current state of the
YProcess
and extra
is any extra data required to change the state.
Returning {:ok, state}
changes the state to new_state
and code is changed
succesfully.
Returning {:error, reason}
fails the code change with reason
and the
state remains as the previous state.
If code_change/3
raises an exception the code change fails and the loop
will continue with its previous state. Therefore this callback does not
usually contain side effects.
handle_call(request :: term, from :: {pid, term}, state :: term) :: sync_response
Invoked to handle synchronous call/3
messages. call/3
will block until a
reply is received (unless the call times out or nodes are disconnected).
request
is the request message sent by a call/3
, from
is a 2-tuple
containing the caller’s pid and a term that uniquely identifies the call, and
state
is the current state of the YProcess
.
Returning {:reply, reply, new_state}
send the response reply
to the
caller and continues the loop with new state new_state
.
Returning {:reply, reply, new_state, timeout}
is similar to
{:reply, reply, new_state}
except handle_info(:timeout, new_state)
will
be called after timeout
milliseconds if no messages are received.
Returning {:reply, reply, new_state, :hibernate}
is similar to
{:reply, reply, new_state}
except the process is hibernated and will
continue the loop once a messages is in its message queue. If a message is
already in the message queue this will immediately. Hibernating a YProcess
causes garbage collection and leaves a continuous heap that minimises the
memory used by the process.
Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.
Returning {:noreply, new_state}
does not send a response to the caller and
continues the loop with new_state new_state
. The response must be sent with
reply/2
.
There are three main use cases for not replying using the return value:
- To reply before returning from the callback because the response is known before calling a slow function.
- To reply after returning from the callback because the response is not yet available.
- To reply from another process, such as a
Task
.
Returning {:noreply, new_state, timeout | :hibernate}
is similar to
{:noreply, new_state}
except a timeout or hibernation occurs as with a
:reply
tuple.
Returning {:rcreate, channels, reply, new_state}
will create the channels
and reply
back to call/3
caller.
Returning {:rcreate, channels, reply, new_state, timeout | :hibernate}
is
similar to {:rcreate, channels, reply, new_state}
except a timeout or
hibernation occurs as with the :reply
tuple.
Returning {:remit, channels, message, reply, new_state}
will emit a
message
in several channels
while replying back to the call/3
caller.
The YProcess
does not receive confirmations for the messages sent.
Returning
{:remit, channels, message, reply, new_state, timeout | :hibernate}
is similar to {:remit, channels, message, new_state}
except a timeout
or hibernation occurs as with the :reply
tuple.
Returning {:remit_ack, channels, message, reply, new_state}
will emit a message
in several channels
while sending a reply
back to
the call/3
caller. The YProcess
receives confirmations from every
subscriber in handle_info/2
. The confirmation message is
{:DELIVER, pid, channel, message}
where pid
is the PID of the subscriber.
Returning
{:remit_ack, channels, message, reply, new_state, timeout | :hibernate}
is similar to {:remit_ack, channels, message, reply, new_state}
except a timeout or hibernation occurs as with the :reply
tuple.
Returning {:rjoin, channels, reply, new_state}
will join the YProcess
to the channels
and send a reply
back to the call/3
caller.
Returning {:rjoin, channels, reply, new_state, timeout | :hibernate}
is
similar to {:rjoin, channels, reply, new_state}
except a timeout or
hibernation occurs as with the :reply
tuple.
Returning {:rleave, channels, reply, new_state}
the YProcess
will
leave the channels
and send a reply
back to the call/3
caller.
Returning {:rleave, channels, reply, new_state, timeout | :hibernate}
is
similar to {:rleave, channels, reply, new_state}
except a timeout or
hibernation occurs as with the :reply
tuple.
Returning {:rdelete, channels, reply, new_state}
will delete the channels
and reply
back to call/3
caller.
Returning {:rdelete, channels, reply, new_state, timeout | :hibernate}
is similar to {:rdelete, channels, reply, new_state}
except a timeout or
hibernation occurs as with the :reply
tuple.
Returning {:stop, reason, reply, new_state}
stops the loop and
terminate/2
is called with reason reason
and new_state
. Then the
reply
is sent as response to the call and the process exist with reason
.
Returning {:stop, reason, new_state}
is similar to
{:stop, reason, reply, new_state}
except a reply is not sent.
It also returns the same tuples from handle_cast/2
.
Invoked to handle asynchronous cast/2
messages.
request
is the request message sent by a cast/2
and state
is the current
state of the YProcess
.
Returning {:noreply, new_state}
updates the current state to the
new_state
.
Returning {:noreply, new_state, timeout}
is similar to
{:noreply, new_state}
except handle_info(:timeout, new_state)
will
be called after timeout
milliseconds if no messages are received.
Returning {:noreply, new_state, :hibernate}
is similar to
{:noreply, new_state}
except the process is hibernated and will
continue the loop once a messages is in its message queue. If a message is
already in the message queue this will immediately. Hibernating a YProcess
causes garbage collection and leaves a continuous heap that minimises the
memory used by the process.
Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.
Returning {:create, channels, new_state}
similar to
{:rcreate, channels, reply, new_state}
in handle_call/3
, but with no
reply to the caller.
Returning `{:create, channels, new_state, timeout | :hibernate}` is similar to |
`{:rcreate, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3
, but with no reply to the caller.
Returning {:delete, channels, new_state}
similar to
{:rdelete, channels, reply, new_state}
in handle_call/3
, but with no
reply to the caller.
Returning `{:delete, channels, new_state, timeout | :hibernate}` is similar |
to `{:rdelete, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3
, but with no reply to the caller.
Returning {:join, channels, new_state}
similar to
{:rjoin, channels, reply, new_state}
in handle_call/3
, but with no
reply to the caller.
Returning `{:join, channels, new_state, timeout | :hibernate}` is similar to |
`{:rjoin, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3
, but with no reply to the caller.
Returning {:leave, channels, new_state}
similar to
{:rleave, channels, reply, new_state}
in handle_call/3
, but with no
reply to the caller.
Returning `{:leave, channels, new_state, timeout | :hibernate}` is similar to |
`{:rleave, channels, reply, new_state, timeout | :hibernate}` in |
handle_call/3
, but with no reply to the caller.
Returning {:emit, channels, message, new_state}
similar to
{:remit, channels, message, reply, new_state}
in handle_call/3
, but
with no reply to the caller.
Returning {:emit, channels, message, new_state, timeout | :hibernate}
is
similar to
{:remit, channels, message, reply, new_state, timeout | :hibernate}
in handle_call/3
, but with no reply to the caller.
Returning {:emit_ack, channels, message, new_state}
similar to
{:remit_ack, channels, message, reply, new_state}
in handle_call/3
,
but with no reply to the caller.
Returning {:emit_ack, channels, message, new_state, timeout | :hibernate}
is
similar to
{:remit_ack, channels, message, reply, new_state, timeout | :hibernate}
in handle_call/3
, but with no reply to the caller.
Returning {:stop, reason, new_state}
stops the YProcess
with reason
.
handle_event(channel :: term, message :: term, state :: term) :: async_response
Invoked to handle messages coming from a registered channel.
channel
is the channel from which the message
is been received. state
is the current state of the YProcess
.
Returns the same output as handle_cast/2
.
handle_info(message :: term | :timeout, state :: term) :: async_response
Invoked to handle all other messages.
Receives the message
and the current YProcess
state
. When a timeout
occurs the message is :timeout
.
Returns the same output as handle_cast/2
.
init(args :: term) :: {:ok, state} | {:ok, state, timeout | :hibernate} | {:create, channels, state} | {:join, channels, state} | :ignore | {:stop, reason :: any} when state: any, channels: list
Invoked when the server is started. start_link/3
(start/3
) will block
until it returns.
args
is the argument term (second argument) passed to start_link/3
.
Returning {:ok, state}
will cause start_link/3
to return {:ok, pid}
and the process to enter a loop.
Returning {:ok, state, timeout}
is similar to {:ok, state}
except
handle_info(:timeout, state)
will be called after timeout
milliseconds if
no messages are received within the timeout.
Returning {:ok, state, :hibernate}
is similar to {:ok, state}
except the
process is hibernated before entering the loop. See handle_call/3
for more
information on hibernation.
Returning {:create, channels, state}
will create the channels listed in
channels
where every channel can be an arbitrary term.
Returning {:join, channels, state}
will subscribe to the channels
listed in channels
where every channel can be an arbitrary term.
Returning :ignore
will cause start_link/3
to return :ignore
and the
process will exit normally without entering the loop or calling
terminate/2
. If used when part of a supervision tree the parent supervisor
will not fail to start not immediately try to restart the YProcess
. The
remainder of the supervision tree will be (re)started and so the YProcess
should not be required by other processes. It can be restarted later with
Supervisor.restart_child/2
as the child specification is saved in the
parent supervisor. The main use cases for this are:
- The
YProcess
is disabled by configuration but might be enabled later. - An error occurred and it will be handled by a different mechanism than the
Supervisor
. Likely this approach involves callingSupervisor.restart_child/2
after a delay to attempt a restart.
Returning {:stop, reason}
will cause start_link/3
to return
{:error, reason}
and the process to exit with reason
without entering the
loop or calling terminate/2
ready(action, channels, state :: term) :: async_response when action: :joined | :created
Invoked when the server joined or created the channels after executing
init/1
.
action
can be either :joined
or :created
depending on which action
executed to join or create the channels
, repectively. state
is the
current state of the YProcess
.
Returns the same output as handle_cast/2
.
terminate(reason, state :: term) :: term when reason: :normal | :shutdown | {:shutdown, term} | term
Invoked when the server is about to exit. It should do any cleanup required.
reason
is the exit reason and state
is the current state of the
YProcess
. The return value is ignored.
terminate/2
is called if a callback except init/1
returns a :stop
tuple, raises an exception, calls Kernel.exit/1
or returns an invalid
value. It may also be called if the YProcess
traps exits using
Process.flag/2
and the parent process sends an exit signal.