y_process v0.2.2 YProcess behaviour

A behaviour module for implementing a server of a client-server relation with publisher-subscriber capabilities. This is a generalized implementation of a publisher subscriber using :pg2 library using a GenServer.

This project is heavily inspired by Connection.

Solution using PG2

defmodule Consumer do
  def subscribe(channel, callback) do
    :pg2.join(channel, self)
    loop(channel, callback)
  end

  defp loop(channel, callback) do
    receive do
      :stop -> :stopped
      message ->
        try do
          callback.(message)
        catch
          _, _ -> :stopped
        else
         :ok -> loop(channel, callback)
         :stop -> :stopped
        end
    end
  end
end

defmodule Publisher do
  def create(channel) do
    :pg2.create(channel)
  end

  def publish(channel, message) do
    channel
     |> :pg2.get_members
     |> Enum.map(&(send(&1, message)))
    :ok
  end
end

When testing this modules in iex:

iex(1)> Publisher.create("channel")
:ok
iex(2)> callback = fn message -> IO.inspect {self, message} end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(3)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.168.0>
iex(4)> spawn fn -> Consumer.subscribe("channel", callback) end
#PID<0.170.0>
iex(5)> Publisher.publish("channel", "hello")
{#PID<0.168.0>, "hello"}
{#PID<0.170.0>, "hello"}
:ok
iex(6)>

If we wrap this around a GenServer we would have a better way to handle messages in the Consumers. Depending on the approach, the messages could be received either in handle_call/3 (GenServer.call/3), handle_cast/2 (GenServer.cast/2) or handle_info/2 (send/2). YProcess wraps around GenServer behaviour adding a new callback: handle_event/3 to handle the messages coming from the processes the current YProcess is subscribed.

Callbacks

There are eight callbacks required to be implemented in a YProcess. Six of them are the same as GenServer but with some other possible return values to join, leave, create, delete and emit messages (ackknowledged or not) to other processes. The remaining two are the following:

  • handle_event/3: It should be used to handle the messages coming from other process if the current process is subscribed to the channel they are broadcasting the messages.

  • ready/3: It is called after the :join or :create actions have been executed requested in the init/1 callback.

Example

Let’s define a Producer that every second generates a number between 1 and 10 and sends it to a Consumer that prints the number.

So the Producer is as follows:

defmodule Producer do
  use YProcess

  def start_link(channel) do
    YProcess.start_link(__MODULE__, channel)
  end

  def stop(producer) do
    YProcess.stop(producer)
  end

  defp get_number do
    (:random.uniform * 10) |> :erlang.trunc
  end

  def init(channel) do
    _ = :os.system_time(:seconds) |> :random.seed
    {:create, [channel], [channel], 1000}
  end

  def handle_info(:timeout, channels) do
    number = get_number
    {:emit, channels, number, channels, 1000}
  end
end

And the Consumer is as follows:

defmodule Consumer do
  use YProcess

  def start_link(:channel) do
    YProcess.start_link(__MODULE__, :channel)
  end

  def stop(consumer) do
    YProcess.stop(consumer)
  end

  def init(channel) do
    {:join, [channel], channel}
  end

  def handle_event(channel, number, channel) when is_integer(number) do
    IO.puts inspect {self, number}
    {:noreply, channel}
  end
end

So when we start a Producer and several Consumers we would have the following output:

iex(1)> {:ok, producer} = Producer.start_link(:channel)
{:ok, #PID<0.189.0>}
iex(2)> {:ok, consumer0} = Consumer.start_link(:channel)
{:ok, #PID<0.192.0>}
{#PID<0.192.0>, 7}
{#PID<0.192.0>, 8}
iex(3)> {:ok, consumer1} = Consumer.start_link(:channel)
{:ok, #PID<0.194.0>}
{#PID<0.192.0>, 0}
{#PID<0.194.0>, 0}
{#PID<0.192.0>, 2}
{#PID<0.194.0>, 2}
{#PID<0.192.0>, 7}
{#PID<0.194.0>, 7}
iex(4)> Consumer.stop(consumer0)
{#PID<0.194.0>, 2}
{#PID<0.194.0>, 6}
{#PID<0.194.0>, 1}
iex(5)>

Backends

The backend behaviour defines the way the processes create, delete, join, leave channels and emit messages. By default, the processes use the YProcess.Backend.PG2 backend that uses :pg2, but it is possible to use the YProcess.Backend.PhoenixPubSub that uses phoenix pubsub and also use any of its adapters.

The backend behaviour needs to implement the following callbacks:

# Callback to create a `channel`.
@callback create(channel) :: :ok | {:error, reason}
  when channel: YProcess.channel, reason: term

# Callback to delete a `channel`.
@callback delete(channel) :: :ok | {:error, reason}
  when channel: YProcess.channel, reason: term

# Callback used to make a process with `pid` a `channel`.
@callback join(channel, pid) :: :ok | {:error, reason}
  when channel: YProcess.channel, reason: term

# Callback used to make a process with `pid` leave a `channel`.
@callback leave(channel, pid) :: :ok | {:error, reason}
  when channel: YProcess.channel, reason: term

# Callback used to send a `message` to a `channel`.
@callback emit(channel, message) :: :ok | {:error, reason}
  when channel: YProcess.channel, message: term, reason: term

To use a backend, just modify the configuration of the application (explained in the next sections) or pass the backend in the module definition i.e:

defmodule Test do
  use YProcess, backend: Backend.PhoenixPubSub
  (...)
end

For the backends provided, there are two aliases for the modules:

The shorter version of the module Test defined above would be:

defmodule Test do
  use YProcess, backend: :phoenix_pub_sub
  (...)
end

Backend Configuration

To configure the backend globally for all the YProcesses just set the following:

  • For YProcess.Backend.PG2

    config :y_process,
      backend: YProces.Backend.PG2
  • For YProcess.Backend.PhoenixPubSub

    use Mix.Config
    
    config :y_process,
      backend: YProcess.Backend.PhoenixPubSub
      name: MyApp.PubSub
      adapter: Phoenix.PubSub.PG2,
      options: [pool_size: 1]

    and then add the Phoenix.PubSub supervisor to your supervision tree:

    def start(_type, _args) do
      import Supervisor.Spec, warn: false
    
      children = [
        supervisor(YProcess.PhoenixPubSub, []),
        (...)
      ] 
    
      opts = (...)
      Supervisor.start_link(children, opts)
    end

Installation

Add YProcess as a dependency in your mix.exs file.

def deps do
    [{:y_process, "~> 0.2.1"}]
end

After you’re done, run this in your shell to fetch the new dependency:

$ mix deps.get

Summary

Types

Asynchronous callback response

Channel. Shouldn’t be a list

List of channels

New message to be emitted by the YProcess.

New module state

Reply to the caller

Synchronous callback response

Functions

Sends a synchronous request to the YProcess process identified by conn and waits for a reply. Times out after 5000 milliseconds if no response is received

Sends a synchronous request ot the YProcess process identified by conn and waits for a reply unless the call times out after timeout milliseconds. A possible value for timeout is :infinity. The call will block indefinitely

Sends an asynchronous request to the YProcess process identified by conn

Whether a YProcess process identified by conn is ready or not. A YProcess process is ready when it joined or created the channels that requested to join or create in the init/1 callback. This call is blocking and is possible to provide a timeout for it. By default is 5000

Sends a reply to a request sent by call/3

Starts a YProcess process without links (outside of a supervision tree)

Starts a YProcess process linked to the current process

Waits until the YProcess process identified by conn is ready. This call is non-blocking and is possible to provided a timeout for it. By default is 5000

Callbacks

Invoked to change the state of the YProcess when a different version of a module is loaded (hot code swapping) and the state’s term structure should be changed

Invoked to handle synchronous call/3 messages. call/3 will block until a reply is received (unless the call times out or nodes are disconnected)

Invoked to handle asynchronous cast/2 messages

Invoked to handle messages coming from a registered channel

Invoked to handle all other messages

Invoked when the server is started. start_link/3 (start/3) will block until it returns

Invoked when the server joined or created the channels after executing init/1

Invoked when the server is about to exit. It should do any cleanup required

Types

async_response()
async_response ::
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:create, channels, new_state} |
  {:create, channels, new_state, timeout | :hibernate} |
  {:emit, channels, new_message, new_state} |
  {:emit, channels, new_message, new_state, timeout | :hibernate} |
  {:emit_ack, channels, new_message, new_state} |
  {:emit_ack, channels, new_message, new_state, timeout | :hibernate} |
  {:join, channels, new_state} |
  {:join, channels, new_state, timeout | :hibernate} |
  {:leave, channels, new_state} |
  {:leave, channels, new_state, timeout | :hibernate} |
  {:delete, channels, new_state} |
  {:delete, channels, new_state, timeout | :hibernate} |
  {:stop, reason :: term, new_state}

Asynchronous callback response.

channel()
channel :: term

Channel. Shouldn’t be a list.

channels()

List of channels.

new_message()
new_message :: term

New message to be emitted by the YProcess.

new_state()
new_state :: term

New module state.

reply()
reply :: term

Reply to the caller.

sync_response()
sync_response ::
  {:reply, reply, new_state} |
  {:reply, reply, new_state, timeout | :hibernate} |
  {:rcreate, channels, reply, new_state} |
  {:rcreate, channels, reply, new_state, timeout | :hibernate} |
  {:remit, channels, new_message, reply, new_state} |
  {:remit, channels, new_message, reply, new_state, timeout | :hibernate} |
  {:remit_ack, channels, new_message, reply, new_state} |
  {:remit_ack, channels, new_message, reply, new_state, timeout | :hibernate} |
  {:rjoin, channels, reply, new_state} |
  {:rjoin, channels, reply, new_state, timeout | :hibernate} |
  {:rleave, channels, reply, new_state} |
  {:rleave, channels, reply, new_state, timeout | :hibernate} |
  {:rdelete, channels, reply, new_state} |
  {:rdelete, channels, reply, new_state, timeout | :hibernate} |
  {:stop, reason :: term, reply, new_state} |
  async_response

Synchronous callback response.

Functions

call(conn, request)

Sends a synchronous request to the YProcess process identified by conn and waits for a reply. Times out after 5000 milliseconds if no response is received.

See GenServer.call/2 for more information.

call(conn, request, timeout)

Sends a synchronous request ot the YProcess process identified by conn and waits for a reply unless the call times out after timeout milliseconds. A possible value for timeout is :infinity. The call will block indefinitely.

See GenServer.call/3 for more information.

cast(conn, request)

Sends an asynchronous request to the YProcess process identified by conn.

See GenServer.cast/2 for more information.

is_ready?(conn, timeout \\ 5000)

Whether a YProcess process identified by conn is ready or not. A YProcess process is ready when it joined or created the channels that requested to join or create in the init/1 callback. This call is blocking and is possible to provide a timeout for it. By default is 5000.

reply(from, response)

Sends a reply to a request sent by call/3.

It receives a from tuple and the response to a request.

See GenServer.reply/2 for more information.

start(module, args, options \\ [])
start(module, any, GenServer.options) :: GenServer.on_start

Starts a YProcess process without links (outside of a supervision tree).

It receives the module where the callbacks are implemented, the arguments args that will receive module.init/1 callback and the options for the YProcess. This options are the same expected by a GenServer.

See start_link/3 for more information.

start_link(module, args, options \\ [])
start_link(module, any, GenServer.options) :: GenServer.on_start

Starts a YProcess process linked to the current process.

This function is used to start a YProcess process in a supervision tree. The process will be started by calling init/1 in the callback module with the given argument.

This function will return after init/1 has returned in the spawned process. The return values are controlled by the init/1 callback.

It receives the module where the callbacks are implemented, the arguments args that will receive module.init/1 callback and the options for the YProcess. This options are the same expected by a GenServer.

See GenServer.start_link/3 for more information.

stop(pid, reason \\ :normal, timeout \\ :infinity)
stop(name :: pid, reason :: term, timeout) :: :ok

Stops the YProcess.

It receives the name or name of the server and optionally the reason for termination (by default is :normal) and the timeout to wait for the termination.

See GenServer.stop/3 for more information.

wait_ready(conn, timeout \\ 5000)

Waits until the YProcess process identified by conn is ready. This call is non-blocking and is possible to provided a timeout for it. By default is 5000.

Callbacks

code_change(old_vsn, state, extra)
code_change(old_vsn, state :: term, extra :: term) ::
  {:ok, new_state :: term} |
  {:error, reason :: term} when old_vsn: term | {:down, term}

Invoked to change the state of the YProcess when a different version of a module is loaded (hot code swapping) and the state’s term structure should be changed.

old_vsn is the previous version of the module (defined by the @vsn attribute) when upgrading. When downgrading the previous version is wrapped in a 2-tuple with first element :down. state is the current state of the YProcess and extra is any extra data required to change the state.

Returning {:ok, state} changes the state to new_state and code is changed succesfully.

Returning {:error, reason} fails the code change with reason and the state remains as the previous state.

If code_change/3 raises an exception the code change fails and the loop will continue with its previous state. Therefore this callback does not usually contain side effects.

handle_call(request, from, state)
handle_call(request :: term, from :: {pid, term}, state :: term) :: sync_response

Invoked to handle synchronous call/3 messages. call/3 will block until a reply is received (unless the call times out or nodes are disconnected).

request is the request message sent by a call/3, from is a 2-tuple containing the caller’s pid and a term that uniquely identifies the call, and state is the current state of the YProcess.

Returning {:reply, reply, new_state} send the response reply to the caller and continues the loop with new state new_state.

Returning {:reply, reply, new_state, timeout} is similar to {:reply, reply, new_state} except handle_info(:timeout, new_state) will be called after timeout milliseconds if no messages are received.

Returning {:reply, reply, new_state, :hibernate} is similar to {:reply, reply, new_state} except the process is hibernated and will continue the loop once a messages is in its message queue. If a message is already in the message queue this will immediately. Hibernating a YProcess causes garbage collection and leaves a continuous heap that minimises the memory used by the process.

Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.

Returning {:noreply, new_state} does not send a response to the caller and continues the loop with new_state new_state. The response must be sent with reply/2.

There are three main use cases for not replying using the return value:

  • To reply before returning from the callback because the response is known before calling a slow function.
  • To reply after returning from the callback because the response is not yet available.
  • To reply from another process, such as a Task.

Returning {:noreply, new_state, timeout | :hibernate} is similar to {:noreply, new_state} except a timeout or hibernation occurs as with a :reply tuple.

Returning {:rcreate, channels, reply, new_state} will create the channels and reply back to call/3 caller.

Returning {:rcreate, channels, reply, new_state, timeout | :hibernate} is similar to {:rcreate, channels, reply, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:remit, channels, message, reply, new_state} will emit a message in several channels while replying back to the call/3 caller. The YProcess does not receive confirmations for the messages sent.

Returning {:remit, channels, message, reply, new_state, timeout | :hibernate} is similar to {:remit, channels, message, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:remit_ack, channels, message, reply, new_state} will emit a message in several channels while sending a reply back to the call/3 caller. The YProcess receives confirmations from every subscriber in handle_info/2. The confirmation message is {:DELIVER, pid, channel, message} where pid is the PID of the subscriber.

Returning {:remit_ack, channels, message, reply, new_state, timeout | :hibernate} is similar to {:remit_ack, channels, message, reply, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:rjoin, channels, reply, new_state} will join the YProcess to the channels and send a reply back to the call/3 caller.

Returning {:rjoin, channels, reply, new_state, timeout | :hibernate} is similar to {:rjoin, channels, reply, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:rleave, channels, reply, new_state} the YProcess will leave the channels and send a reply back to the call/3 caller.

Returning {:rleave, channels, reply, new_state, timeout | :hibernate} is similar to {:rleave, channels, reply, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:rdelete, channels, reply, new_state} will delete the channels and reply back to call/3 caller.

Returning {:rdelete, channels, reply, new_state, timeout | :hibernate} is similar to {:rdelete, channels, reply, new_state} except a timeout or hibernation occurs as with the :reply tuple.

Returning {:stop, reason, reply, new_state} stops the loop and terminate/2 is called with reason reason and new_state. Then the reply is sent as response to the call and the process exist with reason.

Returning {:stop, reason, new_state} is similar to {:stop, reason, reply, new_state} except a reply is not sent.

It also returns the same tuples from handle_cast/2.

handle_cast(request, state)
handle_cast(request :: term, state :: term) :: async_response

Invoked to handle asynchronous cast/2 messages.

request is the request message sent by a cast/2 and state is the current state of the YProcess.

Returning {:noreply, new_state} updates the current state to the new_state.

Returning {:noreply, new_state, timeout} is similar to {:noreply, new_state} except handle_info(:timeout, new_state) will be called after timeout milliseconds if no messages are received.

Returning {:noreply, new_state, :hibernate} is similar to {:noreply, new_state} except the process is hibernated and will continue the loop once a messages is in its message queue. If a message is already in the message queue this will immediately. Hibernating a YProcess causes garbage collection and leaves a continuous heap that minimises the memory used by the process.

Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally, it should be only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.

Returning {:create, channels, new_state} similar to {:rcreate, channels, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning `{:create, channels, new_state, timeout:hibernate}` is similar to
`{:rcreate, channels, reply, new_state, timeout:hibernate}` in

handle_call/3, but with no reply to the caller.

Returning {:delete, channels, new_state} similar to {:rdelete, channels, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning `{:delete, channels, new_state, timeout:hibernate}` is similar
to `{:rdelete, channels, reply, new_state, timeout:hibernate}` in

handle_call/3, but with no reply to the caller.

Returning {:join, channels, new_state} similar to {:rjoin, channels, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning `{:join, channels, new_state, timeout:hibernate}` is similar to
`{:rjoin, channels, reply, new_state, timeout:hibernate}` in

handle_call/3, but with no reply to the caller.

Returning {:leave, channels, new_state} similar to {:rleave, channels, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning `{:leave, channels, new_state, timeout:hibernate}` is similar to
`{:rleave, channels, reply, new_state, timeout:hibernate}` in

handle_call/3, but with no reply to the caller.

Returning {:emit, channels, message, new_state} similar to {:remit, channels, message, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning {:emit, channels, message, new_state, timeout | :hibernate} is similar to {:remit, channels, message, reply, new_state, timeout | :hibernate} in handle_call/3, but with no reply to the caller.

Returning {:emit_ack, channels, message, new_state} similar to {:remit_ack, channels, message, reply, new_state} in handle_call/3, but with no reply to the caller.

Returning {:emit_ack, channels, message, new_state, timeout | :hibernate} is similar to {:remit_ack, channels, message, reply, new_state, timeout | :hibernate} in handle_call/3, but with no reply to the caller.

Returning {:stop, reason, new_state} stops the YProcess with reason.

handle_event(channel, message, state)
handle_event(channel :: term, message :: term, state :: term) :: async_response

Invoked to handle messages coming from a registered channel.

channel is the channel from which the message is been received. state is the current state of the YProcess.

Returns the same output as handle_cast/2.

handle_info(message, state)
handle_info(message :: term | :timeout, state :: term) :: async_response

Invoked to handle all other messages.

Receives the message and the current YProcess state. When a timeout occurs the message is :timeout.

Returns the same output as handle_cast/2.

init(args)
init(args :: term) ::
  {:ok, state} |
  {:ok, state, timeout | :hibernate} |
  {:create, channels, state} |
  {:join, channels, state} |
  :ignore |
  {:stop, reason :: any} when state: any, channels: list

Invoked when the server is started. start_link/3 (start/3) will block until it returns.

args is the argument term (second argument) passed to start_link/3.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter a loop.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout milliseconds if no messages are received within the timeout.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated before entering the loop. See handle_call/3 for more information on hibernation.

Returning {:create, channels, state} will create the channels listed in channels where every channel can be an arbitrary term.

Returning {:join, channels, state} will subscribe to the channels listed in channels where every channel can be an arbitrary term.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2. If used when part of a supervision tree the parent supervisor will not fail to start not immediately try to restart the YProcess. The remainder of the supervision tree will be (re)started and so the YProcess should not be required by other processes. It can be restarted later with Supervisor.restart_child/2 as the child specification is saved in the parent supervisor. The main use cases for this are:

  • The YProcess is disabled by configuration but might be enabled later.
  • An error occurred and it will be handled by a different mechanism than the Supervisor. Likely this approach involves calling Supervisor.restart_child/2 after a delay to attempt a restart.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason without entering the loop or calling terminate/2

ready(action, channels, state)
ready(action, channels, state :: term) :: async_response when action: :joined | :created

Invoked when the server joined or created the channels after executing init/1.

action can be either :joined or :created depending on which action executed to join or create the channels, repectively. state is the current state of the YProcess.

Returns the same output as handle_cast/2.

terminate(reason, state)
terminate(reason, state :: term) :: term when reason: :normal | :shutdown | {:shutdown, term} | term

Invoked when the server is about to exit. It should do any cleanup required.

reason is the exit reason and state is the current state of the YProcess. The return value is ignored.

terminate/2 is called if a callback except init/1 returns a :stop tuple, raises an exception, calls Kernel.exit/1 or returns an invalid value. It may also be called if the YProcess traps exits using Process.flag/2 and the parent process sends an exit signal.