Connection behaviour (connection v1.1.0) View Source

A behaviour module for implementing connection processes.

The Connection behaviour is a superset of the GenServer behaviour. The additional return values and callbacks are designed to aid building a connection process that can handle a peer being (temporarily) unavailable.

An example Connection process:

defmodule TCPConnection do

  use Connection

  def start_link(host, port, opts, timeout \\ 5000) do
    Connection.start_link(__MODULE__, {host, port, opts, timeout})
  end

  def send(conn, data), do: Connection.call(conn, {:send, data})

  def recv(conn, bytes, timeout \\ 3000) do
    Connection.call(conn, {:recv, bytes, timeout})
  end

  def close(conn), do: Connection.call(conn, :close)

  def init({host, port, opts, timeout}) do
    s = %{host: host, port: port, opts: opts, timeout: timeout, sock: nil}
    {:connect, :init, s}
  end

  def connect(_, %{sock: nil, host: host, port: port, opts: opts,
  timeout: timeout} = s) do
    case :gen_tcp.connect(host, port, [active: false] ++ opts, timeout) do
      {:ok, sock} ->
        {:ok, %{s | sock: sock}}
      {:error, _} ->
        {:backoff, 1000, s}
    end
  end

  def disconnect(info, %{sock: sock} = s) do
    :ok = :gen_tcp.close(sock)
    case info do
      {:close, from} ->
        Connection.reply(from, :ok)
      {:error, :closed} ->
        :error_logger.format("Connection closed~n", [])
      {:error, reason} ->
        reason = :inet.format_error(reason)
        :error_logger.format("Connection error: ~s~n", [reason])
    end
    {:connect, :reconnect, %{s | sock: nil}}
  end

  def handle_call(_, _, %{sock: nil} = s) do
    {:reply, {:error, :closed}, s}
  end

  def handle_call({:send, data}, _, %{sock: sock} = s) do
    case :gen_tcp.send(sock, data) do
      :ok ->
       {:reply, :ok, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call({:recv, bytes, timeout}, _, %{sock: sock} = s) do
    case :gen_tcp.recv(sock, bytes, timeout) do
      {:ok, _} = ok ->
        {:reply, ok, s}
      {:error, :timeout} = timeout ->
        {:reply, timeout, s}
      {:error, _} = error ->
        {:disconnect, error, error, s}
    end
  end
  def handle_call(:close, from, s) do
    {:disconnect, {:close, from}, s}
  end
end

The example above follows a common pattern. Try to connect immediately, if that fails backoff and retry after a delay. If a retry fails make another attempt after another delay. If the process disconnects a reconnection attempt is made immediately, if that fails backoff begins.

Importantly when backing off requests will still be received by the process, which will need to be handled. In the above example the process replies with {:error, :closed} when it is disconnected.

Link to this section Summary

Functions

Sends a synchronous call to the Connection process and waits for a reply.

Sends a synchronous request to the Connection process and waits for a reply.

Sends a asynchronous request to the Connection process.

Sends a reply to a request sent by call/3.

Starts a Connection process without links (outside of a supervision tree).

Starts a Connection process linked to the current process.

Callbacks

This callback is the same as the GenServer equivalent and is used to change the state when loading a different version of the callback module.

Called when the process should try to connect. The first argument will either be the info term from {:connect, info, state} or {:connect, info, reply, state}, or :backoff if the connection attempt is triggered by backing off.

Called when the process should disconnect. The first argument will either be the info term from {:disconnect, info, state} or {:disconnect, info, reply, state}. This callback should do any cleaning up required to disconnect the connection and update the state of the process.

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are a few additional return values

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values

Called when the connection process is first started. start_link/3 will block until it returns.

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.

Link to this section Functions

Sends a synchronous call to the Connection process and waits for a reply.

See GenServer.call/2 for more information.

Link to this function

call(conn, req, timeout)

View Source

Sends a synchronous request to the Connection process and waits for a reply.

See GenServer.call/3 for more information.

Sends a asynchronous request to the Connection process.

See GenServer.cast/2 for more information.

Sends a reply to a request sent by call/3.

See GenServer.reply/2 for more information.

Link to this function

start(mod, args, opts \\ [])

View Source

Specs

Starts a Connection process without links (outside of a supervision tree).

See start_link/3 for more information.

Link to this function

start_link(mod, args, opts \\ [])

View Source

Specs

start_link(module(), any(), GenServer.options()) :: GenServer.on_start()

Starts a Connection process linked to the current process.

This function is used to start a Connection process in a supervision tree. The process will be started by calling init/1 in the callback module with the given argument.

This function will return after init/1 has returned in the spawned process. The return values are controlled by the init/1 callback.

See GenServer.start_link/3 for more information.

Link to this section Callbacks

Link to this callback

code_change(any, any, any)

View Source

Specs

code_change(any(), any(), any()) :: {:ok, any()}

This callback is the same as the GenServer equivalent and is used to change the state when loading a different version of the callback module.

Specs

connect(any(), any()) ::
  {:ok, any()}
  | {:ok, any(), timeout() | :hibernate}
  | {:backoff, timeout(), any()}
  | {:backoff, timeout(), any(), timeout() | :hibernate}
  | {:stop, any(), any()}

Called when the process should try to connect. The first argument will either be the info term from {:connect, info, state} or {:connect, info, reply, state}, or :backoff if the connection attempt is triggered by backing off.

It might be beneficial to do handshaking in this callback if connecting is successful.

Returning {:ok, state} or {:ok, state, timeout | :hibernate} will cause the process to continue its loop. This should be returned when the connection attempt was successful. In the later case handle_info(:timeout, state) is called after timeout if no message has been received, if the third element is a timeout. Otherwise if the third element is :hibernate the process hibernates.

Returning {:backoff, timeout, state} will cause the process to continue its loop but connect(:backoff, state) will be called after timeout if connect/2 or disconnect/2 is not called before that point.

This return value is used when a connection attempt fails but another attempt should be made after a delay. It might be beneficial to increase the delay up to a maximum if successive attempts fail to prevent unnecessary work. If several connection processes are connecting to the same peer it may also be beneficial to add some jitter (randomness) to the delays. This spreads out the connection attempts and helps prevent many attempts occurring at the same time.

Returning {:backoff, timeout, state, timeout2 | :hibernate} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) is called after timeout2 if no message has been received, or if :hibernate, the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

Specs

disconnect(any(), any()) ::
  {:connect, any(), any()}
  | {:backoff, timeout(), any()}
  | {:backoff, timeout(), any(), timeout() | :hibernate}
  | {:noconnect, any()}
  | {:noconnect, any(), timeout() | :hibernate}
  | {:stop, any(), any()}

Called when the process should disconnect. The first argument will either be the info term from {:disconnect, info, state} or {:disconnect, info, reply, state}. This callback should do any cleaning up required to disconnect the connection and update the state of the process.

Returning {:connect, info, state} will call connect(info, state) immediately - even if there are messages in the message queue.

Returning {:backoff, timeout, state} or {:backoff, timeout, state, timeout2 | :hibernate} starts a backoff timer and behaves the same as when returned from connect/2. See the connect/2 callback for more information.

Returning {:noconnect, state} or {:noconnect, state, timeout | :hibernate} will cause the process to continue is loop (and NOT call connect/2 to try to reconnect). In the later case a timeout is started or the process hibernates.

Returning {:stop, reason, state} will terminate the loop and call terminate(reason, state) before the process exits with reason reason.

Link to this callback

handle_call(any, {}, any)

View Source

Specs

handle_call(any(), {pid(), any()}, any()) ::
  {:reply, any(), any()}
  | {:reply, any(), any(), timeout() | :hibernate}
  | {:noreply, any()}
  | {:noreply, any(), timeout() | :hibernate}
  | {:disconnect | :connect, any(), any(), any()}
  | {:disconnect | :connect, any(), any()}
  | {:stop, any(), any()}
  | {:stop, any(), any(), any()}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are a few additional return values:

Returning {:connect, info, reply, state} will reply to the call with reply and immediately call connect(info, state). Similarly for {:disconnect, info, reply, state}, except disconnect/2 is called.

Returning {:connect, info, state} or {:disconnect, info, state} will call the relevant callback immediately without replying to the call. This might be useful when the call should block until the process has connected, failed to connect or disconnected. The second argument passed to this callback can be included in the info or state terms and a reply sent in the next or a later callback using reply/2.

Specs

handle_cast(any(), any()) ::
  {:noreply, any()}
  | {:noreply, any(), timeout() | :hibernate}
  | {:disconnect | :connect, any(), any()}
  | {:stop, any(), any()}

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, state}, except disconnect/2 is called.

Specs

handle_info(any(), any()) ::
  {:noreply, any()}
  | {:noreply, any(), timeout() | :hibernate}
  | {:disconnect | :connect, any(), any()}
  | {:stop, any(), any()}

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:connect, info, state} will immediately call connect(info, state). Similarly for {:disconnect, info, state}, except disconnect/2 is called.

Specs

init(any()) ::
  {:ok, any()}
  | {:ok, any(), timeout() | :hibernate}
  | {:connect, any(), any()}
  | {:backoff, timeout(), any()}
  | {:backoff, timeout(), any(), timeout() | :hibernate}
  | :ignore
  | {:stop, any()}

Called when the connection process is first started. start_link/3 will block until it returns.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter its loop with state state without calling connect/2.

This return value is useful when the process connects inside init/1 so that start_link/3 blocks until a connection is established.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout if no message arrives.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated awaiting a message.

Returning {:connect, info, state} will cause start_link/3 to return {:ok, pid} and connect(info, state) will be called immediately - even if messages are in the processes message queue. state contains the state of the process and info should contain any information not contained in the state that is needed to connect.

This return value is very useful because connecting in connect/2 will not block the parent process and a connection attempt is guaranteed to occur before any messages are handled, which is not possible when using a GenServer.

Returning {:backoff, timeout, state} will cause start_link/3 to return {:ok, pid} and the process to enter its normal loop with state state. connect(:backoff, state) is called after timeout if connect/2 or disconnect/2 is not called within the timeout.

This return value can be used to delay or stagger the initial connection attempt.

Returning {:backoff, timeout, state, timeout2} is similar to {:backoff, timeout, state} except handle_info(:timeout, state) will be called after timeout2 if no message arrives.

Returning {:backoff, timeout, state, :hibernate} is similar to {:backoff, timeout, state} except the process hibernates.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

Specs

terminate(any(), any()) :: any()

This callback is the same as the GenServer equivalent and is called when the process terminates. The first argument is the reason the process is about to exit with.