WebSocket client GenServer using Gun as transport layer.
Overview
The Client module is implemented as a GenServer to handle asynchronous Gun messages. Gun sends all WebSocket messages to the process that opens the connection, so the Client GenServer owns the Gun connection to receive these messages directly.
Public API
Despite being a GenServer internally, the public API returns struct-based responses for backward compatibility:
{:ok, client} = Client.connect("wss://example.com")
# client is a struct with gun_pid, stream_ref, and server_pid fields
:ok = Client.send_message(client, "hello")
Client.close(client)Connection Ownership and Reconnection
Initial Connection
When you call connect/2, a new Client GenServer is started which:
- Opens a Gun connection from within the GenServer
- Receives all Gun messages (gun_ws, gun_up, gun_down, etc.)
- Returns a client struct containing the GenServer PID
Automatic Reconnection
On connection failure, the Client GenServer:
- Detects the failure via process monitoring
- Cleans up the old Gun connection
- Opens a new Gun connection from the same GenServer process
- Maintains Gun message ownership continuity
- Preserves the same Client GenServer PID throughout
This ensures that integrated heartbeat functionality continues to work seamlessly across reconnections without needing to track connection changes.
The Client GenServer handles all reconnection logic internally to maintain Gun message ownership throughout the connection lifecycle.
Core Functions
- connect/2 - Establish connection
- send_message/2 - Send messages
- close/1 - Close connection
- subscribe/2 - Subscribe to channels
- get_state/1 - Get connection state
Configuration Options
The connect/2 function accepts all options from ZenWebsocket.Config:
# Customize reconnection behavior
{:ok, client} = Client.connect("wss://example.com",
retry_count: 5, # Try reconnecting 5 times
retry_delay: 2000, # Start with 2 second delay
max_backoff: 60_000, # Cap backoff at 1 minute
reconnect_on_error: true # Auto-reconnect on errors
)
# Disable auto-reconnection for critical operations
{:ok, client} = Client.connect("wss://example.com",
reconnect_on_error: false
)See ZenWebsocket.Config for all available options.
API Functions
| Function | Arity | Description | Param Kinds |
|---|---|---|---|
reconnect | 1 | Force reconnection by closing and re-establishing the connection. | client: value |
get_latency_stats | 1 | Get latency statistics for request/response round-trip times. | client: value |
get_state_metrics | 1 | Get detailed metrics about the client's internal state. | client: value |
get_heartbeat_health | 1 | Get heartbeat health status. | client: value |
get_state | 1 | Get the current connection state. | client: value |
subscribe | 2 | Subscribe to WebSocket channels. | client: value, channels: value |
close | 1 | Close the WebSocket connection. | client: value |
send_message | 2 | Send a message through the WebSocket connection. | client: value, message: value |
connect | 2 | Establish a WebSocket connection. | url_or_config: value, opts: value |
Summary
Types
Function invoked for each inbound message. Return value is ignored.
Tuple shapes delivered to user-provided message handlers.
Internal GenServer state for the WebSocket client
Functions
Returns a child specification for starting a Client under a supervisor.
Close the WebSocket connection.
Establish a WebSocket connection.
Get heartbeat health status.
Gets latency statistics for request/response round-trip times.
Returns the current connection state.
Gets detailed metrics about the client's internal state.
Force reconnection by closing and re-establishing the connection.
Sends a message through the WebSocket connection.
Starts a Client GenServer under a supervisor.
Subscribe to WebSocket channels.
Types
@type handler() :: (handler_message() -> any())
Function invoked for each inbound message. Return value is ignored.
@type handler_message() :: {:message, map() | binary()} | {:binary, binary()} | {:unmatched_response, map()} | {:protocol_error, term()}
Tuple shapes delivered to user-provided message handlers.
See USAGE_RULES.md "Handler Message Reference" for semantics and when each
shape is emitted.
@type state() :: %{ optional(:retry_count) => non_neg_integer(), optional(:awaiting_connection) => GenServer.from(), gun_pid: pid() | nil, stream_ref: reference() | nil, state: :connecting | :connected | :disconnected, url: String.t(), monitor_ref: reference() | nil, config: ZenWebsocket.Config.t(), handler: handler(), subscriptions: MapSet.t(String.t()), pending_requests: %{ optional(term()) => {GenServer.from(), reference(), integer()} }, heartbeat_config: :disabled | map(), active_heartbeats: MapSet.t(term()), last_heartbeat_at: DateTime.t() | nil, heartbeat_failures: non_neg_integer(), heartbeat_timer: reference() | nil, connect_start_time: integer() | nil, latency_stats: ZenWebsocket.LatencyStats.t(), recorder_pid: pid() | nil, on_connect: (pid() -> any()) | nil, on_disconnect: (pid() -> any()) | nil, reconnector: function() | nil }
Internal GenServer state for the WebSocket client
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child specification for starting a Client under a supervisor.
Examples
# In your application's supervision tree
children = [
{ZenWebsocket.Client, url: "wss://example.com", id: :my_client},
# Or with full configuration
{ZenWebsocket.Client, [
url: "wss://example.com",
heartbeat_config: %{type: :deribit, interval: 30_000},
retry_count: 10
]}
]
Supervisor.start_link(children, strategy: :one_for_one)
@spec close(t()) :: :ok
Close the WebSocket connection.
Parameters
client- Client struct from connect/2 (value)
Returns
Always succeeds (:ok)
# descripex:contract
%{
params: %{
client: %{description: "Client struct from connect/2", kind: :value}
},
returns: %{type: ":ok", description: "Always succeeds"}
}
@spec connect( String.t() | ZenWebsocket.Config.t(), keyword() ) :: {:ok, t()} | {:error, term()}
Establish a WebSocket connection.
Parameters
url_or_config- WebSocket URL string or Config struct (value)opts- Connection options keyword list (default:[], value)
Returns
Client struct or error ({:ok, t()} | {:error, term()})
Errors
:timeout:invalid_url:connection_refused
# descripex:contract
%{
params: %{
opts: %{
default: [],
description: "Connection options keyword list",
kind: :value
},
url_or_config: %{
description: "WebSocket URL string or Config struct",
kind: :value
}
},
errors: [:timeout, :invalid_url, :connection_refused],
returns: %{
type: "{:ok, t()} | {:error, term()}",
description: "Client struct or error"
}
}
Get heartbeat health status.
Parameters
client- Client struct from connect/2 (value)
Returns
Heartbeat health map or nil if unavailable (map() | nil)
# descripex:contract
%{
params: %{
client: %{description: "Client struct from connect/2", kind: :value}
},
returns: %{
type: "map() | nil",
description: "Heartbeat health map or nil if unavailable"
}
}
@spec get_latency_stats(t()) :: %{ p50: non_neg_integer(), p99: non_neg_integer(), last: non_neg_integer(), count: non_neg_integer() } | nil
Gets latency statistics for request/response round-trip times.
Returns a map with p50, p99, last sample, and count, or nil if no samples yet.
Returns nil if the server process is no longer alive (see
"Process-down safety" in send_message/2).
@spec get_state(t()) :: :connecting | :connected | :disconnected
Returns the current connection state.
Returns :disconnected if the server process is no longer alive (see
"Process-down safety" in send_message/2).
Gets detailed metrics about the client's internal state.
Returns a map containing:
- Data structure sizes (heartbeats, subscriptions, pending requests)
- Memory usage information
- Process statistics
Returns nil if the server process is no longer alive (see
"Process-down safety" in send_message/2).
Force reconnection by closing and re-establishing the connection.
Parameters
client- Client struct from connect/2 (value)
Returns
New client struct or error ({:ok, t()} | {:error, term()})
Errors
:timeout:connection_refused
# descripex:contract
%{
params: %{
client: %{description: "Client struct from connect/2", kind: :value}
},
errors: [:timeout, :connection_refused],
returns: %{
type: "{:ok, t()} | {:error, term()}",
description: "New client struct or error"
}
}
Sends a message through the WebSocket connection.
Returns :ok, {:ok, response}, or {:error, reason}.
Process-down safety
Client structs hold the GenServer PID by value. If the server process has
exited, this function returns {:error, {:not_connected, :process_down}}
instead of crashing the caller, including races where the process dies during
the GenServer.call/3. For pool-level failover across multiple candidates,
use ClientSupervisor.send_balanced/2 with the :client_discovery option.
@spec start_link( String.t() | ZenWebsocket.Config.t(), keyword() ) :: {:ok, pid()} | {:error, term()}
Starts a Client GenServer under a supervisor.
This function is designed to be called by a supervisor. For direct usage,
prefer connect/2 which provides better error handling and connection
establishment feedback.
Subscribe to WebSocket channels.
Parameters
client- Client struct from connect/2 (value)channels- List of channel names to subscribe to (value)
Returns
Success or error (:ok | {:error, term()})
Errors
:not_connected
# descripex:contract
%{
params: %{
client: %{description: "Client struct from connect/2", kind: :value},
channels: %{
description: "List of channel names to subscribe to",
kind: :value
}
},
errors: [:not_connected],
returns: %{type: ":ok | {:error, term()}", description: "Success or error"}
}