Connection behaviour
A behaviour module for implementing connecton processes.
The Connection
behaviour is a superset of the GenServer
behaviour. The
additional return values and callbacks are designed to aid building a
connection process that can handle a peer being (temporarily) unavailable.
An example Connection
process:
defmodule TCPConnection do
use Connection
def start_link(host, port, opts, timeout \\ 5000) do
Connection.start_link(__MODULE__, {host, port, opts, timeout})
end
def send(conn, data), do: Connection.call(conn, {:send, data})
def recv(conn, bytes, timeout \\ 3000) do
Connection.call(conn, {:recv, bytes, timeout})
end
def close(conn), do: Connection.call(conn, :close)
def init({host, port, opts, timeout}) do
s = %{host: host, port: port, opts: opts, timeout: timeout, sock: nil}
{:connect, :init, s}
end
def connect(_, %{sock: nil, host: host, port: port, opts: opts,
timeout: timeout} = s) do
case :gen_tcp.connect(host, port, [active: false] ++ opts, timeout) do
{:ok, sock} ->
{:ok, %{s | sock: sock}}
{:error, _} ->
{:backoff, 1000, s}
end
end
def disconnect(info, %{sock: sock} = s) do
:ok = :gen_tcp.close(sock)
case info do
{:close, from} ->
Connection.reply(from, :ok)
{:error, :closed} ->
:error_logger.format("Connection closed~n", [])
{:error, reason} ->
reason = :inet.format_error(reason)
:error_logger.format("Connection error: ~s~n", [reason])
end
{:connect, :reconnect, %{s | sock: nil}}
end
def handle_call(_, _, %{sock: nil} = s) do
{:reply, {:error, :closed}, s}
end
def handle_call({:send, data}, _, %{sock: sock} = s) do
case :gen_tcp.send(sock, data) do
:ok ->
{:reply, :ok, s}
{:error, _} = error ->
{:disconnect, error, error, s}
end
end
def handle_call({:recv, bytes, timeout}, _, %{sock: sock} = s) do
case :gen_tcp.recv(sock, bytes, timeout) do
{:ok, _} = ok ->
{:reply, ok, s}
{:error, :timeout} = timeout ->
{:reply, timeout, s}
{:error, _} = error ->
{:disconnect, error, error, s}
end
end
def handle_call(:close, from, s) do
{:disconnect, {:close, from}, s}
end
end
The example above follows a common pattern. Try to connect immediately, if that fails backoff and retry after a delay. If a retry fails make another attempt after another delay. If the process disconnects a reconnection attempt is made immediately, if that fails backoff begins.
Importantly when backing off requests will still be received by the process,
which will need to be handled. In the above example the process replies with
{:error, :closed}
when it is disconnected.
Summary↑
call(conn, req) | Sends a synchronous call to the |
call(conn, req, timeout) | Sends a synchronous request to the |
cast(conn, req) | Sends a asynchronous request to the |
reply(from, response) | Sends a reply to a request sent by |
start(mod, args, opts \\ []) | Starts a |
start_link(mod, args, opts \\ []) | Starts a |
Functions
Sends a synchronous call to the Connection
process and waits for a reply.
See GenServer.call/2
for more information.
Sends a synchronous request to the Connection
process and waits for a reply.
See GenServer.call/3
for more information.
Sends a asynchronous request to the Connection
process.
See GenServer.cast/2
for more information.
Sends a reply to a request sent by call/3
.
See GenServer.reply/2
for more information.
Specs:
- start(module, any, GenServer.options) :: GenServer.on_start
Starts a Connection
process without links (outside of a supervision tree).
See start_link/3
for more information.
Specs:
- start_link(module, any, GenServer.options) :: GenServer.on_start
Starts a Connection
process linked to the current process.
This function is used to start a Connection
process in a supervision tree.
The process will be started by calling init/1
in the callback module with
the given argument.
This function will return after init/1
has returned in the spawned process.
The return values are controlled by the init/1
callback.
See GenServer.start_link/3
for more information.
Callbacks
Specs:
- code_change(any, any, any) :: {:ok, any}
This callback is the same as the GenServer
equivalent and is used to change
the state when loading a different version of the callback module.
Specs:
- connect(any, any) :: {:ok, any} | {:ok, any, timeout | :hibernate} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | {:stop, any, any}
Called when the process should try to connect. The first argument will either
be the info
term from {:connect, info, state}
or
{:connect, info, reply, state}
, or :backoff
if the connection attempt is
triggered by backing off.
It might be beneficial to do handshaking in this callback if connecting is successful.
Returning {:ok, state}
or {:ok, state, timeout | :hibernate}
will cause
the process to continue its loop. This should be returned when the connection
attempt was successful. In the later case handle_info(:timeout, state)
is
called after timeout
if no message has been received, if the third element
is a timeout. Otherwise if the third element is :hibernate
the process
hibernates.
Returning {:backoff, timeout, state}
will cause the process to continue
its loop but connect(:backoff, state)
will be called after timeout
if
connect/2
or disconnect/2
is not called before that point.
This return value is used when a connection attempt fails but another attempt should be made after a delay. It might be beneficial to increase the delay up to a maximum if successive attempts fail to prevent unnecessary work. If several connection processes are connecting to the same peer it may also be beneficial to add some jitter (randomness) to the delays. This spreads out the connection attempts and helps prevent many attempts occuring at the same time.
Returning {:backoff, timeout, state, timeout2 | :hibernate}
is similar to
{:backoff, timeout, state}
except handle_info(:timeout, state)
is called
after timeout2
if no message has been received, or if :hibernate
, the
process hibernates.
Returning {:stop, reason, state}
will terminate the loop and call
terminate(reason, state)
before the process exits with reason reason
.
Specs:
- disconnect(any, any) :: {:connect, any, any} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | {:noconnect, any} | {:noconnect, any, timeout | :hibernate} | {:stop, any, any}
Called when the process should disconnect. The first argument will either
be the info
term from {:disconnect, info, state}
or
{:disconnect, info, reply, state}
. This callback should do any cleaning up
required to disconnect the connection and update the state of the process.
Returning {:connect, info, state}
will call connect(info, state)
immediately - even if there are messages in the message queue.
Returning {:backoff, timeout, state}
or
{:backoff, timeout, state, timeout2 | :hibernate}
starts a backoff timer and
behaves the same as when returned from connect/2
. See the connect/2
callback for more information.
Returning {:noconnect, state}
or {:noconnect, state, timeout | :hibernate}
will cause the process to continue is loop (and NOT call connect/2
to
try to reconnect). In the later case a timeout is started or the process
hibernates.
Returning {:stop, reason, state}
will terminate the loop and call
terminate(reason, state)
before the process exits with reason reason
.
Specs:
- handle_call(any, {pid, any}, any) :: {:reply, any, any} | {:reply, any, any, timeout | :hibernate} | {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any, any} | {:disconnect | :connect, any, any} | {:stop, any, any} | {:stop, any, any, any}
Called when the process receives a call message sent by call/3
. This
callback has the same arguments as the GenServer
equivalent and the
:reply
, :noreply
and :stop
return tuples behave the same. However
there are a few additional return values:
Returning {:connect, info, reply, state}
will reply to the call with reply
and immediately call connect(info, state)
. Similarly for
{:disconnect, info, reply, state}
, except disconnect/2
is called.
Returning {:connect, info, state}
or {:disconnect, info, state}
will
call the relevant callback immediately without replying to the call. This
might be useful when the call should block until the process has connected,
failed to connect or disconnected. The second argument passed to this callback
can be included in the info
or state
terms and a reply sent in the next
or a later callback using reply/2
.
Specs:
- handle_cast(any, any) :: {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any} | {:stop, any, any}
Called when the process receives a cast message sent by cast/3
. This
callback has the same arguments as the GenServer
equivalent and the
:noreply
and :stop
return tuples behave the same. However
there are two additional return values:
Returning {:connect, info, state}
will immediately call
connect(info, state)
. Similarly for {:disconnect, info, reply, state}
,
except disconnect/2
is called.
Specs:
- handle_info(any, any) :: {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any} | {:stop, any, any}
Called when the process receives a message that is not a call or cast. This
callback has the same arguments as the GenServer
equivalent and the :noreply
and :stop
return tuples behave the same. However there are two additional
return values:
Returning {:connect, info, state}
will immediately call
connect(info, state)
. Similarly for {:disconnect, info, reply, state}
,
except disconnect/2
is called.
Specs:
- init(any) :: {:ok, any} | {:ok, any, timeout | :hibernate} | {:connect, any, any} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | :ignore | {:stop, any}
Called when the connection process is first started. start_link/3
will block
until it returns.
Returning {:ok, state}
will cause start_link/3
to return
{:ok, pid}
and the process to enter its loop with state state
without
calling connect/2
.
This return value is useful when the process connects inside init/1
so that
start_link/3
blocks until a connection is established.
Returning {:ok, state, timeout}
is similar to {:ok, state}
except handle_info(:timeout, state)
will be called after timeout
if no
message arrives.
Returning {:ok, state, :hibernate}
is similar to
{:ok, state}
except the process is hibernated awaiting a message.
Returning {:connect, info, state}
will cause start_link/3
to return
{:ok, pid}
and connect(info, state)
will be called immediately - even if
messages are in the processes message queue. state
contains the state of the
process and info
should contain any information not contained in the state
that is needed to connect.
This return value is very useful because connecting in connect/2
will not
block the parent process and a connection attempt is guaranteed to occur
before any messages are handled, which is not possible when using a
GenServer
.
Returning {:backoff, timeout, state}
will cause start_link/3
to return
{:ok, pid}
and the process to enter its normal loop with state state
.
connect(:backoff, state)
is called after timeout
if connect/2
or
disconnect/2
is not called within the timeout.
This return value can be used to delay or stagger the initial connection attempt.
Returning {:backoff, timeout, state, timeout2}
is similar to
{:backoff, timeout, state}
except handle_info(:timeout, state)
will be
called after timeout2
if no message arrives.
Returning {:backoff, timeout, state, :hibernate}
is similar to
{:backoff, timeout, state}
except the process hibernates.
Returning :ignore
will cause start_link/3
to return :ignore
and the
process will exit normally without entering the loop or calling
terminate/2
.
Returning {:stop, reason}
will cause start_link/3
to return
{:error, reason}
and the process to exit with reason reason
without
entering the loop or calling terminate/2
.
Specs:
- terminate(any, any) :: any
This callback is the same as the GenServer
equivalent and is called when the
process terminates. The first argument is the reason the process is about
to exit with.