Connection behaviour

A behaviour module for implementing connecton processes.

The Connection behaviour is a superset of the GenServer behaviour. The additional return values and callbacks are designed to aid building a connection process that can handle a peer being (temporarily) unavailable.

An example Connection process:

defmodule TCPConnection do

  use Connection

  def start_link(host, port, opts, timeout \\ 5000) do
    Connection.start_link(__MODULE__, {host, port, opts, timeout})
  end

  def send(conn, data), do: Connection.call(conn, {:send, data})

  def recv(conn, bytes, timeout \\ 3000) do
    Connection.call(conn, {:recv, bytes, timeout})
  end

  def close(conn), do: Connection.call(conn, :close)

  def init({host, port, opts, timeout}) do
    s = %{host: host, port: port, opts: opts, timeout: timeout, sock: nil}
    {:connect, :init, s}
  end

  def connect(_, %{sock: nil, host: host, port: port, opts: opts,
  timeout: timeout} = s) do
    case :gen_tcp.connect(host, port, [active: false] ++ opts, timeout) do
      {:ok, sock} ->
        {:ok, %{s | sock: sock}}
      {:error, _} ->
        {:backoff, 1000, s}
    end
  end

  def disconnect(info, %{sock: sock} = s) do
    :ok = :gen_tcp.close(sock)
    case info do
      {:close, from} ->
        Connection.reply(from, :ok)
      {:error, :closed} ->
        :error_logger.format("Connection closed~n", [])
      {:error, reason} ->
        reason = :inet.format_error(reason)
        :error_logger.format("Connection error: ~s~n", [reason])
    end
    {:connect, :reconnect, %{s | sock: nil}}
  end

  def handle_call(_, _, %{sock: nil} = s) do
    {:reply, {:error, :closed}, s}
  end

  def handle_call({:send, data}, _, %{sock: sock} = s) do
    case :gen_tcp.send(sock, data) do
      :ok ->
       {:reply, :ok, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call({:recv, bytes, timeout}, _, %{sock: sock} = s) do
    case :gen_tcp.recv(sock, bytes, timeout) do
      {:ok, _} = ok ->
        {:reply, ok, s}
      {:error, :timeout} = timeout ->
        {:reply, timeout, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call(:close, from, s) do
    {:disconnect, {:close, from}, s}
  end
end

The example above follows a common pattern. Try to connect immediately, if that fails backoff and retry after a delay. If a retry fails make another attempt after another delay. If the process disconnects a reconnection attempt is made immediately, if that fails backoff begins.

Importantly when backing off requests will still be received by the process, which will need to be handled. In the above example the process replies with {:error, :closed} when it is disconnected.

Source

Summary

call(conn, req)

Sends a synchronous call to the Connection process and waits for a reply

call(conn, req, timeout)

Sends a synchronous request to the Connection process and waits for a reply

cast(conn, req)

Sends a asynchronous request to the Connection process

reply(from, response)

Sends a reply to a request sent by call/3

start(mod, args, opts \\ [])

Starts a Connection process without links (outside of a supervision tree)

start_link(mod, args, opts \\ [])

Starts a Connection process linked to the current process

Functions

call(conn, req)

Sends a synchronous call to the Connection process and waits for a reply.

See GenServer.call/2 for more information.

Source
call(conn, req, timeout)

Sends a synchronous request to the Connection process and waits for a reply.

See GenServer.call/3 for more information.

Source
cast(conn, req)

Sends a asynchronous request to the Connection process.

See GenServer.cast/2 for more information.

Source
reply(from, response)

Sends a reply to a request sent by call/3.

See GenServer.reply/2 for more information.

Source
start(mod, args, opts \\ [])

Specs:

Starts a Connection process without links (outside of a supervision tree).

See start_link/3 for more information.

Source
start_link(mod, args, opts \\ [])

Specs:

Starts a Connection process linked to the current process.

This function is used to start a Connection process in a supervision tree. The process will be started by calling init/1 in the callback module with the given argument.

This function will return after init/1 has returned in the spawned process. The return values are controlled by the init/1 callback.

See GenServer.start_link/3 for more information.

Source

Callbacks

code_change/3

Specs:

  • code_change(any, any, any) :: {:ok, any}

This callback is the same as the GenServer equivalent and is used to change the state when loading a different version of the callback module.

Source
connect/2

Specs:

  • connect(any, any) :: {:ok, any} | {:ok, any, timeout | :hibernate} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | {:stop, any, any}

Called when the process should try to connect. The first argument will either be the info term from {:connect, info, state} or {:connect, info, reply, state}, or :backoff if the connection attempt is triggered by backing off.

It might be beneficial to do handshaking in this callback if connecting is successful.

Returning {:ok, state} or {:ok, state, timeout | :hibernate} will cause the process to continue its loop. This should be returned when the connection attempt was successful. In the later case handle_info(:timeout, state) is called after timeout if no message has been received, if the third element is a timeout. Otherwise if the third element is :hibernate the process hibernates.

Returning {:backoff, timeout, state} will cause the process to continue its loop but connect(:backoff, state) will be called after timeout if connect/2 or disconnect/2 is not called before that point.

This return value is used when a connection attempt fails but another attempt should be made after a delay. It might be beneficial to increase the delay up to a maximum if successive attempts fail to prevent unnecessary work. If several connection processes are connecting to the same peer it may also be beneficial to add some jitter (randomness) to the delays. This spreads out the connection attempts and helps prevent many attempts occuring at the same time.

Returning {:backoff, timeout, state, timeout2 | :hibernate} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) is called after timeout2 if no message has been received, or if :hibernate, the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

Source
disconnect/2

Specs:

  • disconnect(any, any) :: {:connect, any, any} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | {:noconnect, any} | {:noconnect, any, timeout | :hibernate} | {:stop, any, any}

Called when the process should disconnect. The first argument will either be the info term from {:disconnect, info, state} or {:disconnect, info, reply, state}. This callback should do any cleaning up required to disconnect the connection and update the state of the process.

Returning {:connect, info, state} will call connect(info, state) immediately - even if there are messages in the message queue.

Returning {:backoff, timeout, state} or {:backoff, timeout, state, timeout2 | :hibernate} starts a backoff timer and behaves the same as when returned from connect/2. See the connect/2 callback for more information.

Returning {:noconnect, state} or {:noconnect, state, timeout | :hibernate} will cause the process to continue is loop (and NOT call connect/2 to try to reconnect). In the later case a timeout is started or the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

Source
handle_call/3

Specs:

  • handle_call(any, {pid, any}, any) :: {:reply, any, any} | {:reply, any, any, timeout | :hibernate} | {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any, any} | {:disconnect | :connect, any, any} | {:stop, any, any} | {:stop, any, any, any}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are a few additional return values:

Returning {:connect, info, reply, state} will reply to the call with reply and immediately call connect(info, state). Similarly for {:disconnect, info, reply, state}, except disconnect/2 is called.

Returning {:connect, info, state} or {:disconnect, info, state} will call the relevant callback immediately without replying to the call. This might be useful when the call should block until the process has connected, failed to connect or disconnected. The second argument passed to this callback can be included in the info or state terms and a reply sent in the next or a later callback using reply/2.

Source
handle_cast/2

Specs:

  • handle_cast(any, any) :: {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any} | {:stop, any, any}

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, reply, state}, except disconnect/2 is called.

Source
handle_info/2

Specs:

  • handle_info(any, any) :: {:noreply, any} | {:noreply, any, timeout | :hibernate} | {:disconnect | :connect, any, any} | {:stop, any, any}

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, reply, state}, except disconnect/2 is called.

Source
init/1

Specs:

  • init(any) :: {:ok, any} | {:ok, any, timeout | :hibernate} | {:connect, any, any} | {:backoff, timeout, any} | {:backoff, timeout, any, timeout | :hibernate} | :ignore | {:stop, any}

Called when the connection process is first started. start_link/3 will block until it returns.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter its loop with state state without calling connect/2.

This return value is useful when the process connects inside init/1 so that start_link/3 blocks until a connection is established.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout if no message arrives.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated awaiting a message.

Returning {:connect, info, state} will cause start_link/3 to return {:ok, pid} and connect(info, state) will be called immediately - even if messages are in the processes message queue. state contains the state of the process and info should contain any information not contained in the state that is needed to connect.

This return value is very useful because connecting in connect/2 will not block the parent process and a connection attempt is guaranteed to occur before any messages are handled, which is not possible when using a GenServer.

Returning {:backoff, timeout, state} will cause start_link/3 to return {:ok, pid} and the process to enter its normal loop with state state. connect(:backoff, state) is called after timeout if connect/2 or disconnect/2 is not called within the timeout.

This return value can be used to delay or stagger the initial connection attempt.

Returning {:backoff, timeout, state, timeout2} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) will be called after timeout2 if no message arrives.

Returning {:backoff, timeout, state, :hibernate} is similar to {:backoff, timeout, state} except the process hibernates.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

Source
terminate/2

Specs:

  • terminate(any, any) :: any

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.

Source