connection v1.0.4 Connection behaviour

A behaviour module for implementing connection processes.

The Connection behaviour is a superset of the GenServer behaviour. The additional return values and callbacks are designed to aid building a connection process that can handle a peer being (temporarily) unavailable.

An example Connection process:

defmodule TCPConnection do

  use Connection

  def start_link(host, port, opts, timeout \\ 5000) do
    Connection.start_link(__MODULE__, {host, port, opts, timeout})
  end

  def send(conn, data), do: Connection.call(conn, {:send, data})

  def recv(conn, bytes, timeout \\ 3000) do
    Connection.call(conn, {:recv, bytes, timeout})
  end

  def close(conn), do: Connection.call(conn, :close)

  def init({host, port, opts, timeout}) do
    s = %{host: host, port: port, opts: opts, timeout: timeout, sock: nil}
    {:connect, :init, s}
  end

  def connect(_, %{sock: nil, host: host, port: port, opts: opts,
  timeout: timeout} = s) do
    case :gen_tcp.connect(host, port, [active: false] ++ opts, timeout) do
      {:ok, sock} ->
        {:ok, %{s | sock: sock}}
      {:error, _} ->
        {:backoff, 1000, s}
    end
  end

  def disconnect(info, %{sock: sock} = s) do
    :ok = :gen_tcp.close(sock)
    case info do
      {:close, from} ->
        Connection.reply(from, :ok)
      {:error, :closed} ->
        :error_logger.format("Connection closed~n", [])
      {:error, reason} ->
        reason = :inet.format_error(reason)
        :error_logger.format("Connection error: ~s~n", [reason])
    end
    {:connect, :reconnect, %{s | sock: nil}}
  end

  def handle_call(_, _, %{sock: nil} = s) do
    {:reply, {:error, :closed}, s}
  end

  def handle_call({:send, data}, _, %{sock: sock} = s) do
    case :gen_tcp.send(sock, data) do
      :ok ->
       {:reply, :ok, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call({:recv, bytes, timeout}, _, %{sock: sock} = s) do
    case :gen_tcp.recv(sock, bytes, timeout) do
      {:ok, _} = ok ->
        {:reply, ok, s}
      {:error, :timeout} = timeout ->
        {:reply, timeout, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call(:close, from, s) do
    {:disconnect, {:close, from}, s}
  end
end

The example above follows a common pattern. Try to connect immediately, if that fails backoff and retry after a delay. If a retry fails make another attempt after another delay. If the process disconnects a reconnection attempt is made immediately, if that fails backoff begins.

Importantly when backing off requests will still be received by the process, which will need to be handled. In the above example the process replies with {:error, :closed} when it is disconnected.

Summary

Functions

Sends a synchronous call to the Connection process and waits for a reply

Sends a synchronous request to the Connection process and waits for a reply

Sends a asynchronous request to the Connection process

Sends a reply to a request sent by call/3

Starts a Connection process without links (outside of a supervision tree)

Starts a Connection process linked to the current process

Callbacks

This callback is the same as the GenServer equivalent and is used to change the state when loading a different version of the callback module

Called when the process should try to connect. The first argument will either be the info term from {:connect, info, state} or {:connect, info, reply, state}, or :backoff if the connection attempt is triggered by backing off

Called when the process should disconnect. The first argument will either be the info term from {:disconnect, info, state} or {:disconnect, info, reply, state}. This callback should do any cleaning up required to disconnect the connection and update the state of the process

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are a few additional return values

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values

Called when the connection process is first started. start_link/3 will block until it returns

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with

Functions

call(conn, req)

Sends a synchronous call to the Connection process and waits for a reply.

See GenServer.call/2 for more information.

call(conn, req, timeout)

Sends a synchronous request to the Connection process and waits for a reply.

See GenServer.call/3 for more information.

cast(conn, req)

Sends a asynchronous request to the Connection process.

See GenServer.cast/2 for more information.

reply(from, response)

Sends a reply to a request sent by call/3.

See GenServer.reply/2 for more information.

start(mod, args, opts \\ [])

Specs

start(module, any, GenServer.options) :: GenServer.on_start

Starts a Connection process without links (outside of a supervision tree).

See start_link/3 for more information.

start_link(mod, args, opts \\ [])

Specs

start_link(module, any, GenServer.options) :: GenServer.on_start

Starts a Connection process linked to the current process.

This function is used to start a Connection process in a supervision tree. The process will be started by calling init/1 in the callback module with the given argument.

This function will return after init/1 has returned in the spawned process. The return values are controlled by the init/1 callback.

See GenServer.start_link/3 for more information.

Callbacks

code_change(any, any, any)

Specs

code_change(any, any, any) :: {:ok, any}

This callback is the same as the GenServer equivalent and is used to change the state when loading a different version of the callback module.

connect(any, any)

Specs

connect(any, any) ::
  {:ok, any} |
  {:ok, any, timeout | :hibernate} |
  {:backoff, timeout, any} |
  {:backoff, timeout, any, timeout | :hibernate} |
  {:stop, any, any}

Called when the process should try to connect. The first argument will either be the info term from {:connect, info, state} or {:connect, info, reply, state}, or :backoff if the connection attempt is triggered by backing off.

It might be beneficial to do handshaking in this callback if connecting is successful.

Returning {:ok, state} or {:ok, state, timeout | :hibernate} will cause the process to continue its loop. This should be returned when the connection attempt was successful. In the later case handle_info(:timeout, state) is called after timeout if no message has been received, if the third element is a timeout. Otherwise if the third element is :hibernate the process hibernates.

Returning {:backoff, timeout, state} will cause the process to continue its loop but connect(:backoff, state) will be called after timeout if connect/2 or disconnect/2 is not called before that point.

This return value is used when a connection attempt fails but another attempt should be made after a delay. It might be beneficial to increase the delay up to a maximum if successive attempts fail to prevent unnecessary work. If several connection processes are connecting to the same peer it may also be beneficial to add some jitter (randomness) to the delays. This spreads out the connection attempts and helps prevent many attempts occuring at the same time.

Returning {:backoff, timeout, state, timeout2 | :hibernate} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) is called after timeout2 if no message has been received, or if :hibernate, the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

disconnect(any, any)

Specs

disconnect(any, any) ::
  {:connect, any, any} |
  {:backoff, timeout, any} |
  {:backoff, timeout, any, timeout | :hibernate} |
  {:noconnect, any} |
  {:noconnect, any, timeout | :hibernate} |
  {:stop, any, any}

Called when the process should disconnect. The first argument will either be the info term from {:disconnect, info, state} or {:disconnect, info, reply, state}. This callback should do any cleaning up required to disconnect the connection and update the state of the process.

Returning {:connect, info, state} will call connect(info, state) immediately - even if there are messages in the message queue.

Returning {:backoff, timeout, state} or {:backoff, timeout, state, timeout2 | :hibernate} starts a backoff timer and behaves the same as when returned from connect/2. See the connect/2 callback for more information.

Returning {:noconnect, state} or {:noconnect, state, timeout | :hibernate} will cause the process to continue is loop (and NOT call connect/2 to try to reconnect). In the later case a timeout is started or the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

handle_call(any, {}, any)

Specs

handle_call(any, {pid, any}, any) ::
  {:reply, any, any} |
  {:reply, any, any, timeout | :hibernate} |
  {:noreply, any} |
  {:noreply, any, timeout | :hibernate} |
  {:disconnect | :connect, any, any, any} |
  {:disconnect | :connect, any, any} |
  {:stop, any, any} |
  {:stop, any, any, any}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are a few additional return values:

Returning {:connect, info, reply, state} will reply to the call with reply and immediately call connect(info, state). Similarly for {:disconnect, info, reply, state}, except disconnect/2 is called.

Returning {:connect, info, state} or {:disconnect, info, state} will call the relevant callback immediately without replying to the call. This might be useful when the call should block until the process has connected, failed to connect or disconnected. The second argument passed to this callback can be included in the info or state terms and a reply sent in the next or a later callback using reply/2.

handle_cast(any, any)

Specs

handle_cast(any, any) ::
  {:noreply, any} |
  {:noreply, any, timeout | :hibernate} |
  {:disconnect | :connect, any, any} |
  {:stop, any, any}

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, state}, except disconnect/2 is called.

handle_info(any, any)

Specs

handle_info(any, any) ::
  {:noreply, any} |
  {:noreply, any, timeout | :hibernate} |
  {:disconnect | :connect, any, any} |
  {:stop, any, any}

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, state}, except disconnect/2 is called.

init(any)

Specs

init(any) ::
  {:ok, any} |
  {:ok, any, timeout | :hibernate} |
  {:connect, any, any} |
  {:backoff, timeout, any} |
  {:backoff, timeout, any, timeout | :hibernate} |
  :ignore |
  {:stop, any}

Called when the connection process is first started. start_link/3 will block until it returns.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter its loop with state state without calling connect/2.

This return value is useful when the process connects inside init/1 so that start_link/3 blocks until a connection is established.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout if no message arrives.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated awaiting a message.

Returning {:connect, info, state} will cause start_link/3 to return {:ok, pid} and connect(info, state) will be called immediately - even if messages are in the processes message queue. state contains the state of the process and info should contain any information not contained in the state that is needed to connect.

This return value is very useful because connecting in connect/2 will not block the parent process and a connection attempt is guaranteed to occur before any messages are handled, which is not possible when using a GenServer.

Returning {:backoff, timeout, state} will cause start_link/3 to return {:ok, pid} and the process to enter its normal loop with state state. connect(:backoff, state) is called after timeout if connect/2 or disconnect/2 is not called within the timeout.

This return value can be used to delay or stagger the initial connection attempt.

Returning {:backoff, timeout, state, timeout2} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) will be called after timeout2 if no message arrives.

Returning {:backoff, timeout, state, :hibernate} is similar to {:backoff, timeout, state} except the process hibernates.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

terminate(any, any)

Specs

terminate(any, any) :: any

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.