Parley behaviour (Parley v0.3.0)

Copy Markdown View Source

A WebSocket client built on gen_statem and Mint WebSocket.

Parley provides a callback-based API similar to GenServer. You define a module with use Parley, implement the callbacks you need, and interact with the connection through the functions in this module.

Usage

defmodule MyClient do
  use Parley

  @impl true
  def handle_connect(state) do
    IO.puts("Connected!")
    {:ok, state}
  end

  @impl true
  def handle_frame({:text, msg}, state) do
    IO.puts("Received: #{msg}")
    {:ok, state}
  end

  @impl true
  def handle_disconnect(_reason, state) do
    IO.puts("Disconnected")
    {:ok, state}
  end
end

Starting with a pid

{:ok, pid} = MyClient.start_link(%{}, url: "wss://example.com/ws")
Parley.send_frame(pid, {:text, "hello"})
Parley.disconnect(pid)

Starting with a registered name

{:ok, _pid} = MyClient.start_link(%{}, url: "wss://example.com/ws", name: MyClient)
Parley.send_frame(MyClient, {:text, "hello"})
Parley.disconnect(MyClient)

Starting under a supervisor

children = [
  {MyClient, {%{}, url: "wss://example.com/ws", name: MyClient}}
]

Supervisor.start_link(children, strategy: :one_for_one)

Options

  • :url (required) — the WebSocket URL to connect to (e.g. "wss://example.com/ws")
  • :name — used for name registration, see the "Name registration" section below
  • :headers — custom headers sent with the WebSocket upgrade request (e.g. [{"authorization", "Bearer token"}]). Default: []
  • :connect_timeout — timeout in milliseconds for the WebSocket upgrade handshake (default: 10_000)
  • :transport_opts — options passed to the transport layer (:gen_tcp for ws://, :ssl for wss://). Use this for TLS configuration such as certificate pinning, custom CAs, or TCP-level timeouts (e.g. [timeout: 5_000, cacertfile: "path/to/ca.pem"])
  • :protocols — Mint HTTP protocols to use for the connection (default: [:http1])
  • :reconnect — controls automatic reconnection with exponential backoff. Accepts false (default, no reconnection), true (enable with defaults: base_delay: 1_000, max_delay: 30_000, max_retries: :infinity), or a keyword list with custom values for :base_delay, :max_delay, and :max_retries

Name registration

The :name option supports the same values as GenServer:

  • an atom — registered locally with {:local, atom}
  • {:global, term} — registered with :global
  • {:via, module, term} — registered with a custom registry

Connection lifecycle

The connection is managed as a state machine with three states:

stateDiagram-v2
    [*] --> init: start_link/3
    init --> disconnected: {:ok, state}
    init --> [*]: {:stop, reason}

    disconnected --> connecting: TCP connect + WS upgrade
    disconnected --> [*]: connect / upgrade failure (reconnect: false)

    connecting --> connected: upgrade success
    connecting --> disconnected: error / timeout

    connected --> disconnected: error / close / disconnect/1 / {:disconnect, ...}
    connected --> [*]: callback {:stop, ...}

    state disconnected {
        [*] --> handle_disconnect
        handle_disconnect --> reconnecting: {:ok, state} / {:reconnect, state}
        handle_disconnect --> stay_disconnected: {:disconnect, state}
    }

    state connected {
        [*] --> handle_connect
        handle_connect --> waiting
        waiting --> handle_frame: frame received
        handle_frame --> waiting
        waiting --> handle_ping: ping received
        handle_ping --> waiting
        waiting --> handle_info: process message
        handle_info --> waiting
    }

    note right of connecting
        send_frame/2 calls are queued
        and replayed on connect
    end note

    note right of connected
        Pings auto-ponged before
        handle_ping/2 is called
    end note

    note left of disconnected
        handle_info/2 runs in all states.
        {:disconnect, ...} transitions to
        disconnected from any callback.
        {:stop, ...} terminates the process
        from any state.
    end note
  • disconnected — initial state. On process start, immediately attempts to connect. Calls handle_disconnect/2 when entering from another state. If reconnection is enabled, schedules a reconnect attempt with exponential backoff.
  • connecting — TCP connection established, waiting for the WebSocket upgrade handshake to complete. Frames sent via send_frame/2 during this state are automatically queued and delivered once connected.
  • connected — WebSocket upgrade complete. Calls handle_connect/1 on entry, then handle_frame/2 for each frame received from the server. Resets the reconnect attempt counter to 0.

Callbacks

All callbacks are optional and have default implementations. Override only the ones you need.

  • init/1 — called when the process starts, before connecting. Transforms the init_arg into user state (default: passes it through)
  • handle_connect/1 — called when the WebSocket handshake completes
  • handle_frame/2 — called when a frame is received from the server
  • handle_ping/2 — called when a ping frame is received (pong is sent automatically)
  • handle_info/2 — called when the process receives a non-WebSocket message
  • handle_disconnect/2 — called when the connection is lost or closed

handle_connect/1, handle_frame/2, handle_ping/2, and handle_info/2 also support {:push, frame, state} to send a frame from within the callback, {:disconnect, reason, state} to gracefully close the connection while keeping the process alive, and {:stop, reason, state} to stop the process. See the callback docs for details.

Summary

Connection

Gracefully disconnects from the WebSocket server.

Sends a WebSocket frame to the server.

Starts a Parley process without a link (outside of a supervision tree).

Starts a Parley process linked to the current process.

Callbacks

Called when the WebSocket handshake completes.

Called when the connection is lost or closed.

Called when a frame is received from the server.

Called when the process receives a message that is not a WebSocket frame.

Called when a ping frame is received.

Called when the process starts, before connecting to the server.

Types

A WebSocket frame.

The user-managed state passed through all callbacks.

Connection

disconnect(server)

@spec disconnect(:gen_statem.server_ref()) :: :ok

Gracefully disconnects from the WebSocket server.

Sends a WebSocket close frame and transitions the process to the :disconnected state. The process remains alive after disconnecting. If a reconnect timer is pending, it is cancelled.

Examples

:ok = Parley.disconnect(pid)

send_frame(server, frame)

@spec send_frame(:gen_statem.server_ref(), frame()) :: :ok | {:error, term()}

Sends a WebSocket frame to the server.

Returns :ok if the frame was sent successfully, or {:error, reason} if the send failed (e.g. the process is in the :disconnected state).

Examples

:ok = Parley.send_frame(pid, {:text, "hello"})
:ok = Parley.send_frame(pid, {:binary, <<1, 2, 3>>})

start(module, init_arg, opts \\ [])

@spec start(module(), state(), keyword()) :: :gen_statem.start_ret()

Starts a Parley process without a link (outside of a supervision tree).

Accepts the same arguments and options as start_link/3. Useful for interactive or scripted use where you don't want the calling process to be linked.

start_link(module, init_arg, opts \\ [])

@spec start_link(module(), state(), keyword()) :: :gen_statem.start_ret()

Starts a Parley process linked to the current process.

This is often used to start the process as part of a supervision tree.

module is the module that implements the Parley callbacks. init_arg is passed as the initial user state accessible in callbacks.

Options

  • :url (required) — the WebSocket URL to connect to (e.g. "wss://example.com/ws")
  • :name — used for name registration, see the "Name registration" section in the module documentation
  • :headers — custom headers sent with the WebSocket upgrade request (e.g. [{"authorization", "Bearer token"}]). Default: []
  • :connect_timeout — timeout in milliseconds for the WebSocket upgrade handshake (default: 10_000)
  • :transport_opts — options passed to the transport layer (:gen_tcp for ws://, :ssl for wss://). Use this for TLS configuration such as certificate pinning, custom CAs, or TCP-level timeouts (e.g. [timeout: 5_000, cacertfile: "path/to/ca.pem"])
  • :protocols — Mint HTTP protocols to use for the connection (default: [:http1])
  • :reconnect — controls automatic reconnection with exponential backoff. Accepts false (default), true (enable with defaults), or a keyword list with :base_delay, :max_delay, and :max_retries

Return values

See :gen_statem.start_link/3 for return values.

Callbacks

handle_connect(state)

@callback handle_connect(state()) ::
  {:ok, state()}
  | {:push, frame(), state()}
  | {:disconnect, reason :: term(), state()}
  | {:stop, reason :: term(), state()}

Called when the WebSocket handshake completes.

Return values

  • {:ok, state} — update state, remain connected
  • {:push, frame, state} — send a frame immediately after connecting (useful for auth or subscribe messages)
  • {:disconnect, reason, state} — close the connection gracefully but keep the process alive. The reason is passed to handle_disconnect/2
  • {:stop, reason, state} — reject the connection, stop the process

handle_disconnect(reason, state)

@callback handle_disconnect(reason :: term(), state()) ::
  {:ok, state()} | {:reconnect, state()} | {:disconnect, state()}

Called when the connection is lost or closed.

The reason indicates why the connection ended:

  • :closed — graceful disconnect via disconnect/1
  • {:remote_close, code, reason} — server-initiated close frame
  • {:error, reason} — stream or decode error
  • :connect_timeout — WebSocket upgrade handshake timed out
  • any user-provided term — from a {:disconnect, reason, state} callback return

Return values

  • {:ok, state} — defer to the configured :reconnect option. Reconnects if the option is set, stays disconnected otherwise
  • {:reconnect, state} — force reconnect regardless of the :reconnect option. Uses default backoff values if no option was configured
  • {:disconnect, state} — force stay disconnected, overriding the :reconnect option

handle_frame(frame, state)

@callback handle_frame(frame(), state()) ::
  {:ok, state()}
  | {:push, frame(), state()}
  | {:disconnect, reason :: term(), state()}
  | {:stop, reason :: term(), state()}

Called when a frame is received from the server.

Return values

  • {:ok, state} — update state
  • {:push, frame, state} — send a frame back to the server
  • {:disconnect, reason, state} — close the connection gracefully but keep the process alive. The reason is passed to handle_disconnect/2
  • {:stop, reason, state} — close the connection and stop the process

handle_info(message, state)

@callback handle_info(message :: term(), state()) ::
  {:ok, state()}
  | {:push, frame(), state()}
  | {:disconnect, reason :: term(), state()}
  | {:stop, reason :: term(), state()}

Called when the process receives a message that is not a WebSocket frame.

This is the equivalent of GenServer's handle_info/2. Use it to handle timer messages (Process.send_after/3), inter-process messages, and any other messages sent directly to the Parley process.

This callback is invoked in all states (connected, connecting, and disconnected). However, {:push, frame, state} is only effective while connected — in other states the push is ignored and a warning is logged.

Return values

  • {:ok, state} — update state
  • {:push, frame, state} — send a frame to the server (connected only)
  • {:disconnect, reason, state} — close the connection gracefully but keep the process alive. The reason is passed to handle_disconnect/2. While already disconnected, the state is updated but no transition occurs
  • {:stop, reason, state} — stop the process

handle_ping(payload, state)

@callback handle_ping(payload :: binary(), state()) ::
  {:ok, state()}
  | {:push, frame(), state()}
  | {:disconnect, reason :: term(), state()}
  | {:stop, reason :: term(), state()}

Called when a ping frame is received.

The pong response is always sent automatically before this callback is invoked, so the WebSocket protocol is never violated. Use this callback to observe pings for heartbeat monitoring, latency tracking, or logging.

Return values

  • {:ok, state} — continue with updated state
  • {:push, frame, state} — send a frame and continue
  • {:disconnect, reason, state} — close the connection gracefully but keep the process alive. The reason is passed to handle_disconnect/2
  • {:stop, reason, state} — gracefully stop the connection

init(init_arg)

@callback init(init_arg :: term()) :: {:ok, state()} | {:stop, reason :: term()}

Called when the process starts, before connecting to the server.

Receives the init_arg passed to start_link/3 and returns the initial user state. Use this to validate arguments, build structs, create ETS tables, or start linked processes.

Return values

  • {:ok, state} — proceed with the transformed state
  • {:stop, reason} — stop the process before connecting

Types

frame()

@type frame() ::
  {:text, String.t()}
  | {:binary, binary()}
  | {:ping, binary()}
  | {:pong, binary()}

A WebSocket frame.

state()

@type state() :: term()

The user-managed state passed through all callbacks.