ZenWebsocket.Client (ZenWebsocket v0.4.2)

Copy Markdown View Source

WebSocket client GenServer using Gun as transport layer.

Overview

The Client module is implemented as a GenServer to handle asynchronous Gun messages. Gun sends all WebSocket messages to the process that opens the connection, so the Client GenServer owns the Gun connection to receive these messages directly.

Public API

Despite being a GenServer internally, the public API returns struct-based responses for backward compatibility:

{:ok, client} = Client.connect("wss://example.com")
# client is a struct with gun_pid, stream_ref, and server_pid fields

:ok = Client.send_message(client, "hello")
Client.close(client)

Connection Ownership and Reconnection

Initial Connection

When you call connect/2, a new Client GenServer is started which:

  1. Opens a Gun connection from within the GenServer
  2. Receives all Gun messages (gun_ws, gun_up, gun_down, etc.)
  3. Returns a client struct containing the GenServer PID

Automatic Reconnection

On connection failure, the Client GenServer:

  1. Detects the failure via process monitoring
  2. Cleans up the old Gun connection
  3. Opens a new Gun connection from the same GenServer process
  4. Maintains Gun message ownership continuity
  5. Preserves the same Client GenServer PID throughout

This ensures that integrated heartbeat functionality continues to work seamlessly across reconnections without needing to track connection changes.

The Client GenServer handles all reconnection logic internally to maintain Gun message ownership throughout the connection lifecycle.

Core Functions

  • connect/2 - Establish connection
  • send_message/2 - Send messages
  • close/1 - Close connection
  • subscribe/2 - Subscribe to channels
  • get_state/1 - Get connection state

Configuration Options

The connect/2 function accepts all options from ZenWebsocket.Config:

# Customize reconnection behavior
{:ok, client} = Client.connect("wss://example.com",
  retry_count: 5,              # Try reconnecting 5 times
  retry_delay: 2000,           # Start with 2 second delay
  max_backoff: 60_000,         # Cap backoff at 1 minute
  reconnect_on_error: true     # Auto-reconnect on errors
)

# Disable auto-reconnection for critical operations
{:ok, client} = Client.connect("wss://example.com",
  reconnect_on_error: false
)

See ZenWebsocket.Config for all available options.

API Functions

FunctionArityDescriptionParam Kinds
reconnect1Force reconnection by closing and re-establishing the connection.client: value
get_latency_stats1Get latency statistics for request/response round-trip times.client: value
get_state_metrics1Get detailed metrics about the client's internal state.client: value
get_heartbeat_health1Get heartbeat health status.client: value
get_state1Get the current connection state.client: value
subscribe2Subscribe to WebSocket channels.client: value, channels: value
close1Close the WebSocket connection.client: value
send_message2Send a message through the WebSocket connection.client: value, message: value
connect2Establish a WebSocket connection.url_or_config: value, opts: value

Summary

Types

Function invoked for each inbound message. Return value is ignored.

Tuple shapes delivered to user-provided message handlers.

Internal GenServer state for the WebSocket client

t()

Functions

Returns a child specification for starting a Client under a supervisor.

Close the WebSocket connection.

Establish a WebSocket connection.

Get heartbeat health status.

Gets latency statistics for request/response round-trip times.

Returns the current connection state.

Gets detailed metrics about the client's internal state.

Force reconnection by closing and re-establishing the connection.

Sends a message through the WebSocket connection.

Starts a Client GenServer under a supervisor.

Subscribe to WebSocket channels.

Types

handler()

@type handler() :: (handler_message() -> any())

Function invoked for each inbound message. Return value is ignored.

handler_message()

@type handler_message() ::
  {:message, map() | binary()}
  | {:binary, binary()}
  | {:unmatched_response, map()}
  | {:protocol_error, term()}

Tuple shapes delivered to user-provided message handlers.

See USAGE_RULES.md "Handler Message Reference" for semantics and when each shape is emitted.

state()

@type state() :: %{
  optional(:retry_count) => non_neg_integer(),
  optional(:awaiting_connection) => GenServer.from(),
  gun_pid: pid() | nil,
  stream_ref: reference() | nil,
  state: :connecting | :connected | :disconnected,
  url: String.t(),
  monitor_ref: reference() | nil,
  config: ZenWebsocket.Config.t(),
  handler: handler(),
  subscriptions: MapSet.t(String.t()),
  pending_requests: %{
    optional(term()) => {GenServer.from(), reference(), integer()}
  },
  heartbeat_config: :disabled | map(),
  active_heartbeats: MapSet.t(term()),
  last_heartbeat_at: DateTime.t() | nil,
  heartbeat_failures: non_neg_integer(),
  heartbeat_timer: reference() | nil,
  connect_start_time: integer() | nil,
  latency_stats: ZenWebsocket.LatencyStats.t(),
  recorder_pid: pid() | nil,
  on_connect: (pid() -> any()) | nil,
  on_disconnect: (pid() -> any()) | nil,
  reconnector: function() | nil
}

Internal GenServer state for the WebSocket client

t()

@type t() :: %ZenWebsocket.Client{
  config: ZenWebsocket.Config.t() | nil,
  gun_pid: pid() | nil,
  monitor_ref: reference() | nil,
  reconnect_opts: keyword(),
  server_pid: pid() | nil,
  state: :connecting | :connected | :disconnected,
  stream_ref: reference() | nil,
  url: String.t() | nil
}

Functions

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for starting a Client under a supervisor.

Examples

# In your application's supervision tree
children = [
  {ZenWebsocket.Client, url: "wss://example.com", id: :my_client},
  # Or with full configuration
  {ZenWebsocket.Client, [
    url: "wss://example.com",
    heartbeat_config: %{type: :deribit, interval: 30_000},
    retry_count: 10
  ]}
]

Supervisor.start_link(children, strategy: :one_for_one)

close(client)

@spec close(t()) :: :ok

Close the WebSocket connection.

Parameters

  • client - Client struct from connect/2 (value)

Returns

Always succeeds (:ok)

# descripex:contract
%{
  params: %{
    client: %{description: "Client struct from connect/2", kind: :value}
  },
  returns: %{type: ":ok", description: "Always succeeds"}
}

connect(url_or_config, opts \\ [])

@spec connect(
  String.t() | ZenWebsocket.Config.t(),
  keyword()
) :: {:ok, t()} | {:error, term()}

Establish a WebSocket connection.

Parameters

  • url_or_config - WebSocket URL string or Config struct (value)
  • opts - Connection options keyword list (default: [], value)

Returns

Client struct or error ({:ok, t()} | {:error, term()})

Errors

  • :timeout
  • :invalid_url
  • :connection_refused
# descripex:contract
%{
  params: %{
    opts: %{
      default: [],
      description: "Connection options keyword list",
      kind: :value
    },
    url_or_config: %{
      description: "WebSocket URL string or Config struct",
      kind: :value
    }
  },
  errors: [:timeout, :invalid_url, :connection_refused],
  returns: %{
    type: "{:ok, t()} | {:error, term()}",
    description: "Client struct or error"
  }
}

get_heartbeat_health(client)

@spec get_heartbeat_health(t()) :: map() | nil

Get heartbeat health status.

Parameters

  • client - Client struct from connect/2 (value)

Returns

Heartbeat health map or nil if unavailable (map() | nil)

# descripex:contract
%{
  params: %{
    client: %{description: "Client struct from connect/2", kind: :value}
  },
  returns: %{
    type: "map() | nil",
    description: "Heartbeat health map or nil if unavailable"
  }
}

get_latency_stats(client)

@spec get_latency_stats(t()) ::
  %{
    p50: non_neg_integer(),
    p99: non_neg_integer(),
    last: non_neg_integer(),
    count: non_neg_integer()
  }
  | nil

Gets latency statistics for request/response round-trip times.

Returns a map with p50, p99, last sample, and count, or nil if no samples yet. Returns nil if the server process is no longer alive (see "Process-down safety" in send_message/2).

get_state(client)

@spec get_state(t()) :: :connecting | :connected | :disconnected

Returns the current connection state.

Returns :disconnected if the server process is no longer alive (see "Process-down safety" in send_message/2).

get_state_metrics(client)

@spec get_state_metrics(t()) :: map() | nil

Gets detailed metrics about the client's internal state.

Returns a map containing:

  • Data structure sizes (heartbeats, subscriptions, pending requests)
  • Memory usage information
  • Process statistics

Returns nil if the server process is no longer alive (see "Process-down safety" in send_message/2).

reconnect(client)

@spec reconnect(t()) :: {:ok, t()} | {:error, term()}

Force reconnection by closing and re-establishing the connection.

Parameters

  • client - Client struct from connect/2 (value)

Returns

New client struct or error ({:ok, t()} | {:error, term()})

Errors

  • :timeout
  • :connection_refused
# descripex:contract
%{
  params: %{
    client: %{description: "Client struct from connect/2", kind: :value}
  },
  errors: [:timeout, :connection_refused],
  returns: %{
    type: "{:ok, t()} | {:error, term()}",
    description: "New client struct or error"
  }
}

send_message(client, message)

@spec send_message(t(), binary()) :: :ok | {:ok, map()} | {:error, term()}

Sends a message through the WebSocket connection.

Returns :ok, {:ok, response}, or {:error, reason}.

Process-down safety

Client structs hold the GenServer PID by value. If the server process has exited, this function returns {:error, {:not_connected, :process_down}} instead of crashing the caller, including races where the process dies during the GenServer.call/3. For pool-level failover across multiple candidates, use ClientSupervisor.send_balanced/2 with the :client_discovery option.

start_link(url_or_config, opts \\ [])

@spec start_link(
  String.t() | ZenWebsocket.Config.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Starts a Client GenServer under a supervisor.

This function is designed to be called by a supervisor. For direct usage, prefer connect/2 which provides better error handling and connection establishment feedback.

subscribe(client, channels)

@spec subscribe(t(), list()) :: :ok | {:error, term()}

Subscribe to WebSocket channels.

Parameters

  • client - Client struct from connect/2 (value)
  • channels - List of channel names to subscribe to (value)

Returns

Success or error (:ok | {:error, term()})

Errors

  • :not_connected
# descripex:contract
%{
  params: %{
    client: %{description: "Client struct from connect/2", kind: :value},
    channels: %{
      description: "List of channel names to subscribe to",
      kind: :value
    }
  },
  errors: [:not_connected],
  returns: %{type: ":ok | {:error, term()}", description: "Success or error"}
}