Slipstream behaviour (Slipstream v0.6.0) View Source

A websocket client for Phoenix channels

Slipstream is a websocket client for connection to Phoenix.Channels. Slipstream is a bit different from existing websocket implementations in that:

  • it's backed by :gun instead of :websocket_client
  • it has an await_* interface for performing actions synchronously
  • smart retry strategies for reconnection and rejoining work out-of-the-box
  • a testing framework for clients
  • high-level and low-level instrumentation with :telemetry

Basic Usage

The intended use for Slipstream is to write asynchronous, callback-oriented GenServer-like modules that define socket clients. This approach makes it easy to write socket clients that resemble state-machines. A minimalistic example usage might be like so:

defmodule MyApp.MySocketClient do
  @moduledoc """
  A socket client for connecting to that other Phoenix server

  Periodically sends pings and asks the other server for its metrics.
  """

  use Slipstream,
    restart: :temporary

  require Logger

  @topic "backend-service:money-server"

  def start_link(args) do
    Slipstream.start_link(__MODULE__, args, name: __MODULE__)
  end

  @impl Slipstream
  def init(config) do
    {:ok, connect!(config), {:continue, :start_ping}}
  end

  @impl Slipstream
  def handle_continue(:start_ping, socket) do
    timer = :timer.send_interval(self(), :request_metrics)

    {:noreply, assign(socket, :ping_timer, timer)}
  end

  @impl Slipstream
  def handle_connect(socket) do
    {:ok, join(socket, @topic)}
  end

  @impl Slipstream
  def handle_join(@topic, _join_response, socket) do
    # an asynchronous push with no reply:
    push(socket, @topic, "hello", %{})

    {:ok, socket}
  end

  @impl Slipstream
  def handle_info(:request_metrics, socket) do
    # we will asynchronously receive a reply and handle it in the
    # handle_reply/3 implementation below
    {:ok, ref} = push(socket, @topic, "get_metrics", %{format: "json"})

    {:noreply, assign(socket, :metrics_request, ref)}
  end

  @impl Slipstream
  def handle_reply(ref, metrics, socket) do
    if ref == socket.assigns.metrics_request do
      :ok = MyApp.MetricsPublisher.publish(metrics)
    end

    {:ok, socket}
  end

  @impl Slipstream
  def handle_message(@topic, event, message, socket) do
    Logger.error(
      "Was not expecting a push from the server. Heard: " <>
        inspect({@topic, event, message})
    )

    {:ok, socket}
  end

  @impl Slipstream
  def handle_disconnect(_reason, socket) do
    :timer.cancel(socket.assigns.ping_timer)

    {:stop, :normal, socket}
  end
end

Synchronicity

Slipstream is designed to work asynchronously by default. Requests such as connect/2, join/3, and push/4 are asynchronous requests. When the remote server replies, the associated callback will be invoked (handle_connect/1, handle_join/3, and handle_reply/3 in cases of success, respectively). For all of these operations, though, you may await the outcome of the asynchronous request with await_* functions. E.g.

iex> {:ok, ref} = push(socket, "room:lobby", "msg:new", %{user: 1, msg: "foo"})
iex> {:ok, %{"created" => true}} = await_reply(ref)

Note that all await_* functions must be called from the slipstream process that emitted the request, or else they will timeout.

While Slipstream provides a rich toolset for synchronicity, the asynchronous, callback-based workflow is recommended.

GenServer operations

Note that Slipstream is in many ways a simple wrapper around a GenServer. As such, all GenServer functionality is possible with Slipstream clients, such as Kernel.send/2 or GenServer.call/3. For example, assume you have a slipstream client written like so:

defmodule MyClient do
  use Slipstream

  require Logger

  def start_link(args) do
    Slipstream.start_link(__MODULE__, args, name: __MODULE__)
  end

  @impl Slipstream
  def init(config), do: connect(config)

  @impl Slipstream
  def handle_cast(:ping, socket) do
    Logger.info("pong")

    {:noreply, socket}
  end

  @impl Slipstream
  def handle_info(:hello, socket) do
    Logger.info("hello")

    {:noreply, socket}
  end

  @impl Slipstream
  def handle_call(:foo, _from, socket) do
    {:reply, {:ok, :bar}, socket}
  end

  ..
end

This MyClient client is a GenServer, so the following are valid ways to interact with MyClient:

iex> GenServer.cast(MyClient, :ping)
[info] pong
:ok
iex> MyClient |> GenServer.whereis |> send(:hello)
[info] hello
:hello
iex> GenServer.call(MyClient, :foo)
{:ok, :bar}

Retry Mechanisms

Slipstream emulates the official phoenix.js package with its reconnection and re-join features. Slipstream.Configuration allows configuration of the back-off times with the :reconnect_after_msec and :rejoin_after_msec lists, respectively.

To take advantage of these built-in mechanisms, a client must be written in the asynchronous GenServer-like manner and must use the reconnect/1 and rejoin/3 functions in its Slipstream.handle_disconnect/2 and Slipstream.handle_topic_close/3 callbacks, respectively. Note that the default implementation of these callbacks invokes these functions, so a client which does not explicitly define these callbacks will retry connection and joins.

Take care to handle the :left case of Slipstream.handle_topic_close/3. In the case that a client attempts to leave a topic with leave/2, the callback will be invoked with a reason of :left. The default implementation of Slipstream.handle_topic_close/3 makes this distinction and simply no-ops on channel leaves.

defmodule MyClientWithRetry do
  use Slipstream

  def start_link(config) do
    Slipstream.start_link(__MODULE__, config, name: __MODULE__)
  end

  @impl Slipstream
  def init(config), do: connect(config)

  @impl Slipstream
  def handle_connect(socket) do
    {:ok, join(socket, "rooms:lobby", %{user_id: 1})}
  end

  @impl Slipstream
  def handle_disconnect(_reason, socket) do
    reconnect(socket)
  end

  @impl Slipstream
  def handle_topic_close(_topic, :left, socket) do
    {:ok, socket}
  end

  def handle_topic_close(topic, _reason, socket) do
    rejoin(socket, topic)
  end
end

Link to this section Summary

Types

Any data structure capable of being serialized as JSON

A reference to a message pushed by the client

A reply from a remote server to a push from the client

Synchronous Functions

Awaits a pending connection request synchronously

Awaits a pending connection request synchronously, raising on failure

Awaits a pending disconnection request synchronously

Awaits a pending disconnection request synchronously, raising on failure

Awaits a pending join request synchronously

Awaits a join request synchronously, raising on failure

Awaits a leave request synchronously

Awaits a leave request synchronously, raising on failure

Awaits the server's message push, raising on failure

Awaits the server's response to a message

Awaits the server's response to a message, exiting on timeout

Functions

Declares that a module is a Slipstream socket client

Requests that the connection process undergoe garbage collection

Requests connection to the remote endpoint

Same as connect/2 but raises on configuration validation error

Requests that an open connection be closed

Requests that a topic be joined in the current connection

Requests that the given topic be left

Creates a new socket without immediately connecting to a remote websocket

Requests that a message be pushed on the websocket connection

Pushes, raising if the topic is not joined or if the channel is dead

Request reconnection given the last-used connection configuration

Requests that the specified topic be joined again

Starts a slipstream client process

Callbacks

Invoked when a slipstream process receives a GenServer call

Invoked when a slipstream process receives a GenServer cast

Invoked when a connection has been established to a websocket server

Invoked as a continuation of another GenServer callback

Invoked when a connection has been terminated

Invoked when a slipstream process receives a message

Invoked when the websocket server replies to the request to join

Invoked when a message is received on the websocket connection

Invoked when a message is received on the websocket connection which references a push from this client process.

Invoked when a join has concluded

Invoked when the slipstream process in started

Invoked when a slipstream process is terminated

Link to this section Types

Link to this type

json_serializable()

View Source (since 0.1.0)

Specs

json_serializable() :: term()

Any data structure capable of being serialized as JSON

Any argument typed as Slipstream.json_serializable/0 must be able to be encoded with the JSON parser passed in configuration. See Slipstream.Configuration.

Link to this type

push_reference()

View Source (since 0.1.0)

Specs

push_reference() :: String.t()

A reference to a message pushed by the client

These references are returned by calls to push/4 and may be matched on in handle_reply/3. They are also used to match messages for await_reply/2.

Synchronicity

This approach treats the websocket connection as an RPC: some other process in the service does a GenServer.call/3 to the slipstream client process, which sends a push to the remote websocket server, waits for a reply (synchronously) and then sends that back to the caller. All-in-all, this appears completely synchronous for the caller.

@impl Slipstream
def handle_call({:new_message, params}, _from, socket) do
  {:ok, ref} = push(socket, "rooms:lobby", "msg:new", params)

  {:reply, await_reply(ref), socket}
end

This approach is written in a more asynchronous fashion. An info message arriving from any other process triggers the slipstream client to push a work message to the remote websocket server. When the remote websocket server replies with the result, the slipstream client sends off the result to be dealt with else-where. No process in this scenario blocks, so they are all capable of receiving other messages while the work is being completed.

@impl Slipstream
def handle_info(:do_work, socket) do
  ref = push(socket, "worker_queue:foo", "do_work", %{})

  {:noreply, assign(socket, :work_ref, ref)}
end

@impl Slipstream
def handle_reply(ref, result, %{assigns: %{work_ref: ref}} = socket) do
  IO.inspect(result, label: "work complete!")

  {:ok, socket}
end

Specs

reply() ::
  :ok | :error | {:ok, json_serializable()} | {:error, json_serializable()}

A reply from a remote server to a push from the client

Replies may be any of

  • :ok
  • :error
  • {:ok, any()}
  • {:error, any()}

depending on how the remote server's reply is written.

Note that the empty map is removed in ok and error tuples, so a reply written like so on the server-side:

def handle_in(_event, _params, socket) do
  {:reply, {:ok, %{}}, socket}
end

will translate to a reply of :ok (and the same for {:error, %{}}).

Examples

# on the Phoenix.Channel (server) side:
def handle_in(_event, _params, socket) do
  {:reply, {:ok, %{created?: true}}, socket}
end

# on the Slipstream (client) side:
def handle_reply(_ref, {:ok, %{"created?" => true}} = _reply, socket) do
  ..

Link to this section Synchronous Functions

Link to this function

await_connect(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_connect(socket :: Slipstream.Socket.t(), timeout()) ::
  {:ok, Slipstream.Socket.t()} | {:error, term()}

Awaits a pending connection request synchronously

Link to this function

await_connect!(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_connect!(socket :: Slipstream.Socket.t(), timeout()) ::
  Slipstream.Socket.t()

Awaits a pending connection request synchronously, raising on failure

Link to this function

await_disconnect(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_disconnect(socket :: Slipstream.Socket.t(), timeout()) ::
  {:ok, Slipstream.Socket.t()} | {:error, term()}

Awaits a pending disconnection request synchronously

Link to this function

await_disconnect!(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_disconnect!(socket :: Slipstream.Socket.t(), timeout()) ::
  Slipstream.Socket.t()

Awaits a pending disconnection request synchronously, raising on failure

Link to this function

await_join(socket, topic, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_join(socket :: Slipstream.Socket.t(), topic :: String.t(), timeout()) ::
  {:ok, Slipstream.Socket.t()} | {:error, term()}

Awaits a pending join request synchronously

Link to this function

await_join!(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_join!(socket :: Slipstream.Socket.t(), timeout()) :: Slipstream.Socket.t()

Awaits a join request synchronously, raising on failure

Link to this function

await_leave(socket, topic, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_leave(socket :: Slipstream.Socket.t(), topic :: String.t(), timeout()) ::
  {:ok, Slipstream.Socket.t()} | {:error, term()}

Awaits a leave request synchronously

Link to this function

await_leave!(socket, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_leave!(socket :: Slipstream.Socket.t(), timeout()) ::
  Slipstream.Socket.t()

Awaits a leave request synchronously, raising on failure

Link to this macro

await_message(topic_expr, event_expr, payload_expr, timeout \\ 5000)

View Source (macro) (since 0.1.0)

Specs

await_message(
  topic_expr :: Macro.t(),
  event_expr :: Macro.t(),
  payload_expr :: Macro.t(),
  timeout()
) ::
  {:ok, topic :: String.t(), event :: String.t(),
   payload :: json_serializable()}
  | {:error, :timeout}

Awaits the server's message push

Note that unlike the other await_* functions, await_message/4 is a macro. This allows an author to match on patterns in the topic, event, and/or payload parts of a message.

Examples

iex> event = "msg:new"
iex> await_message("rooms:lobby", ^event, %{"user_id" => 5})
{:ok, "rooms:lobby", "msg:new", %{"user_id" => 5, body: "hello"}}
Link to this macro

await_message!(topic_expr, event_expr, payload_expr, timeout \\ 5000)

View Source (macro) (since 0.1.0)

Specs

await_message!(
  topic_expr :: Macro.t(),
  event_expr :: Macro.t(),
  payload_expr :: Macro.t(),
  timeout()
) :: {topic :: String.t(), event :: String.t(), payload :: json_serializable()}

Awaits the server's message push, raising on failure

See await_message/4

Link to this function

await_reply(push_reference, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_reply(push_reference(), timeout()) :: reply() | {:error, :timeout}

Awaits the server's response to a message

Link to this function

await_reply!(push_reference, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await_reply!(push_reference(), timeout()) :: reply()

Awaits the server's response to a message, exiting on timeout

See await_reply/2 for more information.

Link to this section Functions

Link to this macro

__using__(opts)

View Source (macro) (since 0.1.0)

Specs

__using__(Keyword.t()) :: Macro.t()

Declares that a module is a Slipstream socket client

Slipstream provides a use Slipstream macro that behaves similar to GenServer's use GenServer. This does a few things:

  • a default implementation of child_spec/1, which is used to start the module as a GenServer. This may be overridden.
  • imports for all documented functions in Slipstream and Slipstream.Socket
  • a GenServer.handle_info/2 function clause which matches incoming events from the connection process and dispatches them to the various Slipstream callbacks
  • a GenServer.handle_info/2 function clause which matches Slipstream commands. This is used to implement back-off retry mechanisms for reconnect/1 and rejoin/3.

This provides a familiar and sleek interface for the common case of using Slipstream: an asynchronous callback-based GenServer module.

It's not required to use this macro, though. Slipstream can be used in synchronous mode (via the await_* family of functions).

Options

Any options passed as the opts argument to __using__/1 are passed along to Supervisor.child_spec/2, as is done in the default child_spec/1 implementation for GenServer. Most notably, you may control the restart strategy of the client with the :restart option. See the example below.

Examples

defmodule MyApp.MySocketClient do
  # one crash/shutdown/exit will permanently terminate the server
  use Slipstream, restart: :temporary

  def start_link(args) do
    Slipstream.start_link(__MODULE__, args)
  end

  ..
end
Link to this function

collect_garbage(socket)

View Source (since 0.1.0)

Specs

collect_garbage(socket :: Slipstream.Socket.t()) :: :ok

Requests that the connection process undergoe garbage collection

If you're using Slipstream to send large messages, you may wish to flush the process memory of the connection process between large messages. This can be achieved through garbage collection.

Examples

iex> collect_garbage(socket)
:ok
Link to this function

connect(socket \\ new_socket(), opts)

View Source (since 0.1.0)

Specs

connect(socket :: Slipstream.Socket.t(), opts :: Keyword.t()) ::
  {:ok, Slipstream.Socket.t()}
  | {:error,
     %NimbleOptions.ValidationError{
       __exception__: term(),
       key: term(),
       keys_path: term(),
       message: term(),
       value: term()
     }}

Requests connection to the remote endpoint

opts are passed to Slipstream.Configuration.validate/1 before sending.

Note that this request for connection is asynchronous. A return value of {:ok, socket} does not mean that a connection has successfully been established.

Examples

{:ok, socket} = connect(uri: "ws://localhost:4000/socket/websocket")
Link to this function

connect!(socket \\ new_socket(), opts)

View Source (since 0.1.0)

Specs

connect!(socket :: Slipstream.Socket.t(), opts :: Keyword.t()) ::
  Slipstream.Socket.t()

Same as connect/2 but raises on configuration validation error

Note that connect!/2 will not necessarily raise an error on failure to connect. The ! only pertains to the potential for raising when the configuration is invalid.

Link to this function

disconnect(socket)

View Source (since 0.1.0)

Specs

disconnect(socket :: Slipstream.Socket.t()) :: Slipstream.Socket.t()

Requests that an open connection be closed

This function will no-op when the socket is not currently connected to any remote websocket server.

Note that you do not need to use disconnect/1 to clean up a connection. The connection process monitors the slipstream client process and will shut down when it detects that the process has terminated.

Disconnection may be awaited synchronously with await_disconnect/2

Examples

@impl Slipstream
def handle_info(:chaos_monkey, socket) do
  {:ok, socket} =
    socket
    |> disconnect()
    |> await_disconnect()

  {:noreply, reconnect(socket)}
end
Link to this function

join(socket, topic, params \\ %{})

View Source (since 0.1.0)

Specs

join(
  socket :: Slipstream.Socket.t(),
  topic :: String.t(),
  params :: json_serializable()
) :: Slipstream.Socket.t()

Requests that a topic be joined in the current connection

Multiple topics may be joined by one Slipstream client, but each topic may only be joined once. Despite this, join/3 may be called on the same topic multiple times, but the result will be idempotent. The client will not request to join unless it has not yet joined that topic. In cases where you wish to begin a new session with a topic, you must first leave/2 and then join/3 again.

The request to join will not error-out if the client is not connected to a remote server. In that case, the join/3 function will act as a no-op.

A join can be awaited in a blocking fashion with await_join/3.

Examples

@impl Slipstream
def handle_connect(socket) do
  {:ok, join(socket, "rooms:lobby", %{user: 1})}
end
Link to this function

leave(socket, topic)

View Source (since 0.1.0)

Specs

leave(socket :: Slipstream.Socket.t(), topic :: String.t()) ::
  Slipstream.Socket.t()

Requests that the given topic be left

Note that like joining, leaving is an asynchronous request and can be awaited with await_leave/3.

Also similar to join/3, leave/2 is idempotent and will not raise an error if the provided topic is not currently joined.

Examples

iex> {:ok, socket} = leave(socket, "room:lobby") |> await_leave("rooms:lobby")
iex> join(socket, "rooms:specific")
Link to this function

new_socket()

View Source (since 0.1.0)

Specs

new_socket() :: Slipstream.Socket.t()

Creates a new socket without immediately connecting to a remote websocket

This can be useful if you do not wish to request connection with connect/2 during the init/1 callback (because the init/1 callback requires that you return a Slipstream.Socket.t/0).

Examples

defmodule MySocketClient do
  use Slipstream

  ..

  @impl Slipstream
  def init(args) do
    {:ok, new_socket() |> assign(:init_args, args)}
  end

  ..
end

iex> new_socket()
#Slipstream.Socket<assigns: %{}, ...>
Link to this function

push(socket, topic, event, params, timeout \\ 5000)

View Source (since 0.1.0)

Specs

push(
  socket :: Slipstream.Socket.t(),
  topic :: String.t(),
  event :: String.t(),
  params :: json_serializable(),
  timeout :: timeout()
) :: {:ok, push_reference()} | {:error, reason :: term()}

Requests that a message be pushed on the websocket connection

A channel has the ability to reply directly to a message, but this reply is asynchronous. Handle replies using the handle_reply/3 callback or by awaiting them synchronously with await_reply/2.

Although this request to the remote server is asynchronous, the call to the transport process to transmit the push is synchronous and will exert back-pressure on calls to push/4, as push/4 blocks until the message has been sent by the transport.

If you are pushing especially large messages, you may need to adjust the timeout argument so that the GenServer call does not exit with :timeout. The default value is 5_000 msec (5 seconds).

A phoenix channel may decide to reply to a message sent with push/2. In order to link push requests to their replies, store the ref string returned from the call to push/4 and match on it in handle_reply/3.

Examples

@impl Slipstream
def handle_join(:success, _response, state) do
  {:ok, hello_request} = push(socket, "rooms:lobby", "new:msg", %{user: 1, body: "hello"})

  {:ok, Map.put(state, :hello_request, hello_request)}
end

@impl Slipstream
def handle_reply(ref, reply, %{hello_request: ref} = state) do
  IO.inspect(reply, label: "nice, a response.")

  {:ok, state}
end
Link to this function

push!(socket, topic, event, params)

View Source (since 0.1.0)

Specs

push!(
  socket :: Slipstream.Socket.t(),
  topic :: String.t(),
  event :: String.t(),
  params :: json_serializable()
) :: push_reference()

Pushes, raising if the topic is not joined or if the channel is dead

Same as push/4, but raises in cases of failure.

This can be useful for pipeing into await_reply/2

Examples

iex> {:ok, result} = push!(socket, "rooms:lobby", "msg:new", params) |> await_reply()
{:ok, %{"created?" => true}}
Link to this function

reconnect(socket)

View Source (since 0.1.0)

Specs

reconnect(socket :: Slipstream.Socket.t()) ::
  {:ok, Slipstream.Socket.t()} | :error

Request reconnection given the last-used connection configuration

Note that when reconnect/1 is used to re-connect instead of connect/2 (or connect!/2), the slipstream process will attempt to reconnect with a retry mechanism with backoff. The process will wait an interval between reconnection attempts following the list of milliseconds provided in the :reconnect_after_msec key of configuration passed to connect/2 (or connect!/2).

The handle_disconnect/2 callback will be invoked for each failure to re-connect, however, so an implementation of that callback which will simply retry with backoff can be achieved like so:

@impl Slipstream
def handle_disconnect(_reason, socket) do
  {:ok, socket} = reconnect(socket)
end

reconnect/1 may return an :error tuple in the case that the socket passed does not contain any connection information (which is added to the socket with connect/2 or connect!/2), or if the socket is currently connected. For a reconnect/1 call without configuration, the return pattern is {:error, :no_config}, and for a socket that is already connected, the pattern is {:error, :connected}.

A reconnect may be awaited with await_connect/2.

Link to this function

rejoin(socket, topic, params \\ nil)

View Source (since 0.1.0)

Specs

rejoin(
  socket :: Slipstream.Socket.t(),
  topic :: String.t(),
  params :: json_serializable()
) :: {:ok, Slipstream.Socket.t()} | {:error, :never_joined}

Requests that the specified topic be joined again

If params is not provided, the previously used value will be sent.

In the case that the specified topic has not been joined before, rejoin/3 will return {:error, :never_joined}.

Note that a rejoin may be awaited with await_join/3.

Dealing with crashes

When attempting to re-join a disconnected topic with rejoin/3, the Slipstream process will attempt to use backoff governed by the :rejoin_after_msec list in configuration passed to connect/2 (or connect!/2).

The handle_topic_close/3 callback will be invoked with the for each crash, however, so a minimal implementation of that callback which achieves the backoff retry is like so:

Examples

@impl Slipstream
def handle_topic_close(topic, _reason, socket) do
  {:ok, _socket} = rejoin(socket, topic)
end
Link to this function

start_link(module, init_arg, options \\ [])

View Source

Specs

start_link(module(), any(), GenServer.options()) :: GenServer.on_start()

Starts a slipstream client process

This function delegates to GenServer.start_link/3, so all options passable to that function are passable here as well. Most notably, you may name your slipstream clients using the same naming rules as GenServers, as in the example below.

Examples

defmodule MySlipstreamClient do
  use Slipstream

  def start_link(args) do
    Slipstream.start_link(__MODULE__, args, name: __MODULE__)
  end

  ..
end

Link to this section Callbacks

Link to this callback

handle_call(request, from, socket)

View Source (optional) (since 0.1.0)

Specs

handle_call(
  request :: term(),
  from :: GenServer.from(),
  socket :: Slipstream.Socket.t()
) ::
  {:reply, reply, new_socket}
  | {:reply, reply, new_socket, timeout() | :hibernate | {:continue, term()}}
  | {:noreply, new_socket}
  | {:noreply, new_socket, timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason, new_socket}
  | {:stop, reason, reply, new_socket}
when new_socket: Slipstream.Socket.t(), reply: term(), reason: term()

Invoked when a slipstream process receives a GenServer call

Behaves the same as GenServer.handle_call/3

Link to this callback

handle_cast(msg, socket)

View Source (optional) (since 0.1.0)

Specs

handle_cast(msg :: term(), socket :: Slipstream.Socket.t()) ::
  {:noreply, new_socket}
  | {:noreply, new_socket, timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a slipstream process receives a GenServer cast

Behaves the same as GenServer.handle_cast/2

Link to this callback

handle_connect(socket)

View Source (optional) (since 0.1.0)

Specs

handle_connect(socket :: Slipstream.Socket.t()) ::
  {:ok, new_socket} | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a connection has been established to a websocket server

This callback provides a good place to join/3.

Examples

@impl Slipstream
def handle_connect(socket) do
  {:noreply, socket}
end
Link to this callback

handle_continue(continue, state)

View Source (optional) (since 0.1.0)

Specs

handle_continue(continue :: term(), state :: Slipstream.Socket.t()) ::
  {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason :: term(), new_state}
when new_state: Slipstream.Socket.t()

Invoked as a continuation of another GenServer callback

GenServer callbacks may end with signatures that declare that the next function invoked should be a continuation. E.g.

def init(state) do
  {:ok, state, {:continue, :my_continue}}
end

# this will be invoked immediately after `init/1`
def handle_continue(:my_continue, state) do
  # do something with state

  {:norelpy, state}
end

This provides a way to schedule work to occur immediately after successful initialization or to break work across multiple callbacks, which can be useful for clients which are state-machine-like.

See GenServer.handle_continue/2 for more information.

Link to this callback

handle_disconnect(reason, socket)

View Source (optional) (since 0.1.0)

Specs

handle_disconnect(reason :: term(), socket :: Slipstream.Socket.t()) ::
  {:ok, new_socket} | {:stop, stop_reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a connection has been terminated

The default implementation of this callback requests reconnection

Examples

@impl Slipstream
def handle_disconnect(_reason, socket) do
  {:ok, reconnect(socket)}
end
Link to this callback

handle_info(msg, socket)

View Source (optional) (since 0.1.0)

Specs

handle_info(msg :: term(), socket :: Slipstream.Socket.t()) ::
  {:noreply, new_socket}
  | {:noreply, new_socket, timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a slipstream process receives a message

Behaves the same as GenServer.handle_info/2

Link to this callback

handle_join(topic, response, socket)

View Source (optional) (since 0.1.0)

Specs

handle_join(
  topic :: String.t(),
  response :: json_serializable(),
  socket :: Slipstream.Socket.t()
) :: {:ok, new_socket} | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when the websocket server replies to the request to join

Examples

@impl Slipstream
def handle_join("rooms:echo", %{}, socket) do
  push(socket, "rooms:echo", "echo", %{"ping" => 1})

  {:ok, socket}
end
Link to this callback

handle_message(topic, event, message, socket)

View Source (optional) (since 0.1.0)

Specs

handle_message(
  topic :: String.t(),
  event :: String.t(),
  message :: any(),
  socket :: Slipstream.Socket.t()
) :: {:ok, new_socket} | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a message is received on the websocket connection

This callback will not be invoked for a message which is a reply. Those messages will be handled in handle_reply/3.

Note that while replying is supported on the server-side of the Phoenix Channel protocol, it is not supported by a client. Messages sent from the server cannot be directly replied to.

Examples

@impl Slipstream
def handle_message("room:lobby", "new:msg", params, socket) do
  MyApp.Msg.create(params)

  {:ok, socket}
end
Link to this callback

handle_reply(ref, message, socket)

View Source (optional) (since 0.1.0)

Specs

handle_reply(
  ref :: push_reference(),
  message :: reply(),
  socket :: Slipstream.Socket.t()
) :: {:ok, new_socket} | {:stop, reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a message is received on the websocket connection which references a push from this client process.

ref is the string reference returned from the push/2 which resulted in this reply.

Examples

@impl Slipstream
def handle_join(topic, _params, socket) do
  my_req = push(socket, topic, "msg:new", %{"foo" => "bar"})

  {:ok, assign(socket, :request, my_req)}
end

@impl Slipstream
def handle_reply(ref, reply, %{assigns: %{request: ref}} = socket) do
  IO.inspect(reply, label: "reply to my request")

  {:ok, socket}
end
Link to this callback

handle_topic_close(topic, reason, socket)

View Source (optional) (since 0.1.0)

Specs

handle_topic_close(
  topic :: String.t(),
  reason :: term(),
  socket :: Slipstream.Socket.t()
) :: {:ok, new_socket} | {:stop, stop_reason :: term(), new_socket}
when new_socket: Slipstream.Socket.t()

Invoked when a join has concluded

This callback will be invoked in a few cases:

  • the remote Phoenix.Channel crashes, e.g. by a raised error
  • the client successfully leaves the topic with leave/2

In the case that the client has left the topic, reason will simply be :left. If the remote channel crashes, the reason will be an error tuple {:error, params :: json_serializable()} where params is the message sent from the remote channel on chrash.

The default implementation of this callback attempts to re-join the last-joined topic whenever reason != :left. If the reason is :left, the default implementation will no-op by returning {:ok, socket}.

Examples

@impl Slipstream
def handle_topic_close(topic, _reason, socket) do
  {:ok, socket} = rejoin(socket, topic)
end
Link to this callback

init(init_arg)

View Source (optional) (since 0.1.0)

Specs

init(init_arg :: any()) ::
  {:ok, state}
  | {:ok, state, timeout() | :hibernate | {:continue, term()}}
  | :ignore
  | {:stop, reason :: any()}
when state: Slipstream.Socket.t()

Invoked when the slipstream process in started

Behaves the same as GenServer.init/1, but the return state must be a new Slipstream.Socket.t/0. Values from init/1 that you'd like to keep in state can be stored with Slipstream.Socket.assign/3.

This callback is a good place to request connection with connect/2. Note that connect/2 is an asynchronous request for connection. Awaiting connection with await_connect/2 is unwise in many scenarios, however, because failure to connect may result in an exit from the process, crashing the supervision tree that started the process. If you wish to connect synchronously upon init, a better approach could be:

@impl Slipstream
def init(_args) do
  config = Application.fetch_env!(:my_app, __MODULE__)
  socket = new_socket() |> assign(:connect_config, config)

  {:ok, socket, {:continue, :connect}}
end

@impl Slipstream
def handle_continue(:connect, socket) do
  {:ok, socket} = connect(socket, socket.assigns.connect_config)

  {:ok, socket} = await_connect(socket)

  {:noreply, socket}
end

But a more minimalistic approach that still provides safety in cases of configuration validation failures would be:

defmodule MySocketClient do
  use Slipstream

  def start_link(args) do
    Slipstream.start_link(__MODULE__, args, name: __MODULE__)
  end

  @impl Slipstream
  def init(_args) do
    config = Application.fetch_env!(:my_app, __MODULE__)

    case connect(config) do
      {:ok socket} ->
        {:ok, socket}

      {:error, reason} ->
        Logger.error("Could not start #{__MODULE__} because of " <>
          "validation failure: #{inspect(reason)}")

        :ignore
    ned
  end

  ..
end

The configuration could be stored in application config:

# config/<env>.exs
config :my_app, MySocketClient,
  uri: "ws://example.org/socket/websocket",
  reconnect_after_msec: [200, 500, 1_000, 2_000]

And in cases where the configuration validation fails, the MySocketClient process will not crash the application's supervision tree.

Examples

@impl Slipstream
def init(config) do
  connect(config)
end
Link to this callback

terminate(reason, socket)

View Source (optional) (since 0.1.0)

Specs

terminate(reason :: term(), socket :: Slipstream.Socket.t()) :: term()

Invoked when a slipstream process is terminated

Note that this callback is not always invoked as the process shuts down. See GenServer.terminate/2 for more information.

It is wise to disconnect/1 in this callback (and such is the default implementation). This will gracefully end the websocket connection. This is the behavior of the default implementation of terminate/2.

Examples

@impl Slipstream
def terminate(reason, socket) do
  Logger.debug("shutting down: " <> inspect(reason))

  disconnect(socket)
end