Core Principles
- Start Simple: Use direct connection for development, add supervision for production
- 5 Essential Functions: The core API is 5 functions (
connect,send_message,subscribe,get_state,close), plus monitoring functions (get_latency_stats,get_heartbeat_health,get_state_metrics,reconnect) - Real API Testing: Always test against real endpoints, never mock WebSocket behavior
Quick Start Pattern
# Simplest possible usage - connect and send
{:ok, client} = ZenWebsocket.Client.connect("wss://test.deribit.com/ws/api/v2")
ZenWebsocket.Client.send_message(client, Jason.encode!(%{method: "public/test"}))The 5 Essential Functions
# 1. Connect to WebSocket
{:ok, client} = ZenWebsocket.Client.connect(url, opts)
# 2. Send messages (must be binary — use Jason.encode!/1 for maps)
:ok = ZenWebsocket.Client.send_message(client, Jason.encode!(%{method: "public/test"}))
# 3. Subscribe to channels
:ok = ZenWebsocket.Client.subscribe(client, channels)
# 4. Check connection state
state = ZenWebsocket.Client.get_state(client) # :connected, :connecting, :disconnected
# 5. Close connection
:ok = ZenWebsocket.Client.close(client)Additional Monitoring Functions
# Latency percentiles (p50/p99/last/count — all integers in ms)
stats = ZenWebsocket.Client.get_latency_stats(client)
# Heartbeat health (failure_count, last_heartbeat_at, config, timer_active)
health = ZenWebsocket.Client.get_heartbeat_health(client)
# Connection metrics (subscriptions_size, pending_requests_size, state_memory, ...)
metrics = ZenWebsocket.Client.get_state_metrics(client)
# Explicit reconnection using the stored connection contract
{:ok, new_client} = ZenWebsocket.Client.reconnect(client)Common Patterns
Pattern 1: Development/Testing (No Supervision)
# Direct connection - crashes won't restart
{:ok, client} = ZenWebsocket.Client.connect(url)
# Use the client...
ZenWebsocket.Client.close(client)Pattern 2: Production with Dynamic Connections
# Add to your supervision tree
children = [
ZenWebsocket.ClientSupervisor,
# ... other children
]
# Start connections dynamically
{:ok, client} = ZenWebsocket.ClientSupervisor.start_client(url, opts)Pattern 3: Production with Fixed Connections
# Add specific clients to supervision tree
children = [
{ZenWebsocket.Client, [
url: "wss://api.example.com/ws",
id: :main_websocket,
heartbeat_config: %{type: :ping_pong, interval: 30_000}
]}
]Configuration Options
opts = [
# Connection
timeout: 5000, # Connection timeout in ms
headers: [], # Additional headers
debug: false, # Enable verbose debug logging
# Reconnection
retry_count: 3, # Max reconnection attempts
retry_delay: 1000, # Initial retry delay (exponential backoff)
reconnect_on_error: true, # Auto-reconnect on errors
restore_subscriptions: true, # Restore subscriptions after reconnect
# Heartbeat
heartbeat_config: %{
type: :ping_pong, # :ping_pong, :deribit, :binance
interval: 30_000, # Heartbeat interval in ms
message: nil # Reserved for future custom heartbeat support
},
# Session Recording
record_to: "/tmp/session.jsonl", # Enable message recording (nil to disable)
# Latency Monitoring
latency_buffer_size: 100 # Samples for p50/p99 calculations
]Custom Client Discovery (Distributed Applications)
For distributed applications using :pg, Horde, or other registries, ZenWebsocket provides hooks to integrate with your registry of choice.
Lifecycle Callbacks
Register clients with your registry using on_connect/on_disconnect:
{:ok, client} = ZenWebsocket.ClientSupervisor.start_client(
"wss://api.example.com/ws",
on_connect: fn pid -> :pg.join(:ws_pool, pid) end,
on_disconnect: fn pid -> :pg.leave(:ws_pool, pid) end
)Important: on_disconnect is called during terminate/2, which requires a graceful shutdown. It will NOT be called if the process is killed with :kill signal.
Custom Discovery for Load Balancing
Use client_discovery to route messages across nodes:
ZenWebsocket.ClientSupervisor.send_balanced(
message,
client_discovery: fn -> :pg.get_members(:ws_pool) end
)Without client_discovery, send_balanced/2 defaults to list_clients/0 (local connections only).
Example: Multi-Node Setup with pg
# Node A and Node B both run:
:pg.start_link()
# Define callbacks once
defmodule MyApp.WSCallbacks do
def on_connect(pid), do: :pg.join(:ws_pool, pid)
def on_disconnect(pid), do: :pg.leave(:ws_pool, pid)
end
# Start clients with pg callbacks
{:ok, _} = ZenWebsocket.ClientSupervisor.start_client(url,
on_connect: &MyApp.WSCallbacks.on_connect/1,
on_disconnect: &MyApp.WSCallbacks.on_disconnect/1
)
# Route to any healthy client across all nodes
ZenWebsocket.ClientSupervisor.send_balanced(msg,
client_discovery: fn -> :pg.get_members(:ws_pool) end
)Example: Horde Registry
# With Horde for distributed process registry
{:ok, _} = ZenWebsocket.ClientSupervisor.start_client(url,
on_connect: fn pid ->
Horde.Registry.register(MyApp.WSRegistry, {:ws_client, pid}, pid)
end,
on_disconnect: fn pid ->
Horde.Registry.unregister(MyApp.WSRegistry, {:ws_client, pid})
end
)
# Custom discovery using Horde
ZenWebsocket.ClientSupervisor.send_balanced(msg,
client_discovery: fn ->
Horde.Registry.select(MyApp.WSRegistry, [{{:ws_client, :_}, :_, :"$1"}, [], [:"$1"]}])
end
)Callback Error Handling
Callback errors are caught and logged - they won't crash the client or prevent connection/termination:
# This won't crash the client
on_connect: fn _pid -> raise "intentional error" end
# Warning logged: "Lifecycle callback error: %RuntimeError{message: \"intentional error\"}"Session Recording
Record WebSocket sessions for debugging, testing, and replay:
# Enable recording when connecting
{:ok, client} = ZenWebsocket.Client.connect(url, record_to: "/tmp/debug.jsonl")
# Use the connection normally - all messages are recorded
ZenWebsocket.Client.send_message(client, Jason.encode!(%{action: "subscribe", channel: "trades"}))
# Close to flush remaining buffer
ZenWebsocket.Client.close(client)
# Get session metadata (count, duration, timestamps)
{:ok, meta} = ZenWebsocket.Recorder.metadata("/tmp/debug.jsonl")
# => %{count: 42, inbound: 30, outbound: 12, duration_ms: 5000, ...}
# Replay the recorded session
ZenWebsocket.Recorder.replay("/tmp/debug.jsonl", fn entry ->
IO.inspect(entry, label: "#{entry.dir} at #{entry.ts}")
end)Recording format: JSONL (one JSON object per line) for streaming writes. Binary frames are base64-encoded.
Platform-Specific Rules
Deribit Integration
# Use the Deribit adapter for complete integration
{:ok, adapter} = ZenWebsocket.Examples.DeribitAdapter.start_link([
url: "wss://test.deribit.com/ws/api/v2",
client_id: System.get_env("DERIBIT_CLIENT_ID"),
client_secret: System.get_env("DERIBIT_CLIENT_SECRET")
])
# The adapter handles:
# - Authentication flow
# - Heartbeat/test_request
# - Subscription management
# - Cancel-on-disconnectReconnection Behavior
ZenWebsocket supports two reconnect paths with different preservation semantics.
Automatic Reconnect (reconnect_on_error: true)
When a connection drops and reconnect_on_error: true (the default), the same
Client GenServer reconnects with exponential backoff.
Preserved Across Automatic Reconnect
| State | Details |
|---|---|
| Config struct | Full validated ZenWebsocket.Config struct |
| Handler callback | Same function reference — no need to re-register |
| Heartbeat config | Timer restarted with original interval after reconnect |
| Subscriptions | Restored automatically if restore_subscriptions: true (default) |
| Latency stats | Historical measurements accumulate across reconnects |
| Session recorder | Continues recording to the same file |
| on_disconnect callback | Same function reference |
Reset on Automatic Reconnect
| State | Details |
|---|---|
| Runtime retry counter | state.retry_count resets to 0 after successful reconnect; config retry_count stays unchanged |
| Heartbeat failures | Counter reset to 0 |
| Heartbeat timer | Cancelled on disconnect, restarted on reconnect |
| Gun PID / stream ref | New connection process and stream |
Pending Requests on Automatic Reconnect
Pending RPC requests (pending_requests) remain in state across reconnects.
Responses on the new connection may not match original request IDs — callers
should handle request timeouts gracefully.
Explicit Reconnect (Client.reconnect/1)
Client.reconnect/1 starts a fresh Client process using the stored connection
contract from the original client struct returned by connect/2 or
ClientSupervisor.start_client/2.
If the original client was started under ClientSupervisor.start_client/2,
explicit reconnect goes back through ClientSupervisor.start_client/2 so the
replacement client stays supervised and reruns :on_connect.
Preserved Across Explicit Reconnect
| State | Details |
|---|---|
| Config struct | Full validated ZenWebsocket.Config struct, including headers/timeouts/retry settings |
| Handler callback | Same function reference |
| Heartbeat config | Same heartbeat configuration for the new client |
| Supervision mode | Supervised clients reconnect as supervised clients; direct clients reconnect directly |
| on_connect callback | Rerun for supervised reconnect so registries can re-register the new PID |
| on_disconnect callback | Same function reference |
Reset on Explicit Reconnect
| State | Details |
|---|---|
| Subscriptions | Fresh client state — resubscribe after reconnect if needed |
| Pending requests | Fresh client state — in-flight requests from the old client do not carry over |
| Latency stats / heartbeat state | Fresh client state with new counters and timers |
| Server / Gun PIDs | New Client GenServer, Gun process, and stream ref |
Error Handling
# All functions return tagged tuples
case ZenWebsocket.Client.connect(url) do
{:ok, client} ->
# Success path
client
{:error, reason} ->
# Get human-readable explanation with fix suggestion
explanation = ZenWebsocket.ErrorHandler.explain(reason)
Logger.error("#{explanation.message}. #{explanation.suggestion}")
# Errors are passed raw from Gun/WebSocket
# Common errors: :timeout, :connection_refused, :protocol_error
endHandler Message Reference
Your handler function (passed via connect/2 as :handler) receives one of the tuple shapes below. This section is the complete contract — matched against ZenWebsocket.Client.handler_message/0.
Input Shapes
The tuples delivered to your handler:
| Shape | When emitted | Payload |
|---|---|---|
{:message, map} | Decoded JSON frame (including subscription updates) | Decoded map |
{:message, binary} | Text frame that did not decode as JSON | Raw text binary |
{:binary, binary} | WebSocket binary frame | Raw bytes |
{:unmatched_response, map} | JSON-RPC response whose "id" did not match any pending request | Decoded response map |
{:protocol_error, reason} | Fatal, unrecoverable frame error (client will stop) | Unwrapped reason |
Ping/pong/close control frames are handled transparently by the client and never reach your handler. Any frame decode error is fatal — {:protocol_error, _} is the only error shape you receive.
Custom Handler Example
A pattern-matching handler that distinguishes every shape:
handler = fn
{:message, %{} = json} ->
# Decoded JSON frame (subscription update or general message)
MyApp.Router.route(json)
{:message, text} when is_binary(text) ->
# Text frame that was not valid JSON
MyApp.TextStream.receive(text)
{:binary, bin} ->
MyApp.BinaryStream.receive(bin)
{:unmatched_response, response} ->
# Late reply after RequestCorrelator already timed it out, or an ID collision
Logger.warning("unmatched response: #{inspect(response)}")
{:protocol_error, reason} ->
Logger.error("fatal protocol error: #{inspect(reason)}")
end
{:ok, client} = ZenWebsocket.Client.connect(url, handler: handler)Handler return values are ignored.
Default Handler Translation
If you do not pass :handler, a default handler forwards messages to the parent process as {:websocket_*, _} tuples. The translation:
| Input shape | Message sent to parent |
|---|---|
{:message, data} | {:websocket_message, data} |
{:binary, data} | {:websocket_message, data} (same tag) |
{:unmatched_response, response} | {:websocket_unmatched_response, response} |
{:protocol_error, reason} | {:websocket_protocol_error, reason} |
Note that {:message, _} and {:binary, _} both collapse to :websocket_message, so the default handler cannot distinguish text from binary frames. If you need to tell them apart, supply a custom handler.
Testing Rules
# Use the Testing module for controlled tests
alias ZenWebsocket.Testing
# Start a mock server
{:ok, server} = Testing.start_mock_server()
{:ok, client} = ZenWebsocket.Client.connect(server.url)
# Inject messages from server to client
Testing.inject_message(server, ~s({"type": "hello"}))
# Assert client sent expected message (supports string, regex, map, or function matchers)
assert Testing.assert_message_sent(server, %{"type" => "ping"}, 1000)
# Simulate disconnects for error handling tests
Testing.simulate_disconnect(server, :going_away)
# Cleanup
Testing.stop_server(server)ExUnit Integration Pattern
defmodule MyTest do
use ExUnit.Case
alias ZenWebsocket.Testing
setup do
{:ok, server} = Testing.start_mock_server()
on_exit(fn -> Testing.stop_server(server) end)
{:ok, server: server}
end
test "client handles server message", %{server: server} do
{:ok, client} = ZenWebsocket.Client.connect(server.url)
Testing.inject_message(server, ~s({"type": "pong"}))
assert_receive {:websocket_message, _}, 1000
ZenWebsocket.Client.close(client)
end
endReal API Testing
# For integration tests against real endpoints
@tag :integration
test "real WebSocket behavior" do
{:ok, client} = ZenWebsocket.Client.connect("wss://test.deribit.com/ws/api/v2")
# Test against real API...
endDO NOT
- Don't create wrapper modules - Use the Client functions directly
- Don't mock WebSocket behavior - Test against real endpoints or use Testing module
- Don't add custom reconnection - Use built-in retry options
- Don't transform errors - Handle raw Gun/WebSocket errors
- Don't avoid GenServers - Client uses GenServer appropriately for state
Architecture Notes
- Gun Transport: Built on Gun for HTTP/2 and WebSocket
- GenServer State: Client maintains connection state in GenServer
- ETS Registry: Fast connection lookups via ETS
- Exponential Backoff: Smart reconnection with backoff
- Real API Testing: All tests use real APIs or Testing module (no mocks)
Monitoring and Observability
Latency Statistics
# Get latency metrics (p50/p99/last/count — all integers in ms)
stats = ZenWebsocket.Client.get_latency_stats(client)
# => %{p50: 12, p99: 45, last: 10, count: 100}Heartbeat Health
# Check heartbeat status
health = ZenWebsocket.Client.get_heartbeat_health(client)
# => %{failure_count: 0, last_heartbeat_at: -576460748, config: :disabled, timer_active: false}
# Note: last_heartbeat_at is a monotonic timestamp (System.monotonic_time(:millisecond)), not a wall-clock DateTimeState Metrics
# Get connection state metrics
metrics = ZenWebsocket.Client.get_state_metrics(client)
# => %{subscriptions_size: 12, pending_requests_size: 5, state_memory: 1024, ...}Rate Limiter Status
# Check rate limiter pressure
status = ZenWebsocket.RateLimiter.status(limiter)
# => %{tokens: 85, queue_size: 5, pressure_level: :low, suggested_delay_ms: 0}
# pressure_level: :none (<25%), :low (25-50%), :medium (50-75%), :high (>75%)Key Telemetry Events
| Event | Measurements | When |
|---|---|---|
[:zen_websocket, :connection, :upgrade] | connect_time_ms | WebSocket upgrade complete |
[:zen_websocket, :heartbeat, :pong] | rtt_ms | Heartbeat response received |
[:zen_websocket, :rate_limiter, :consume] | tokens_remaining, cost | Token consumed |
[:zen_websocket, :rate_limiter, :refill] | tokens_before, tokens_after, refill_rate | Bucket refilled |
[:zen_websocket, :rate_limiter, :queue] | queue_size, cost | Request queued |
[:zen_websocket, :rate_limiter, :queue_full] | queue_size | Queue at capacity |
[:zen_websocket, :rate_limiter, :pressure] | queue_size, ratio | Pressure threshold crossed |
[:zen_websocket, :request_correlator, :track] | count | Request tracked |
[:zen_websocket, :request_correlator, :resolve] | count, round_trip_ms | Response correlated |
[:zen_websocket, :request_correlator, :timeout] | count | Request timed out |
[:zen_websocket, :request_correlator, :fail_all] | count | Pending request failed on disconnect (metadata: id, reason) |
[:zen_websocket, :subscription_manager, :add] | count | Subscription added |
[:zen_websocket, :subscription_manager, :remove] | count | Subscription removed |
[:zen_websocket, :subscription_manager, :restore] | channel_count | Subscriptions restored |
[:zen_websocket, :pool, :route] | health, pool_size | Connection selected |
[:zen_websocket, :pool, :health] | pool_size, avg_health | Pool health snapshot |
[:zen_websocket, :pool, :failover] | attempt | Failover triggered |
See Performance Tuning Guide for complete telemetry reference.
# Attach to telemetry events
:telemetry.attach(
"websocket-logger",
[:zen_websocket, :connection, :upgrade],
fn _event, measurements, _metadata, _config ->
Logger.info("WebSocket connected in #{measurements.connect_time_ms}ms")
end,
nil
)Module Limits
Each module follows strict simplicity rules:
- Maximum 5 public functions per new module (existing core modules may exceed this)
- Maximum 15 lines per function
- Maximum 2 levels of function calls
- Real API testing only (no mocks)
Getting Help
- Examples: See
lib/zen_websocket/examples/directory - Tests: Review
test/for usage patterns - Deribit: See
DeribitAdapterfor complete platform integration - Guides: See
docs/guides/for performance tuning and adapter building - Verification workflow: See
CLAUDE.mdfor the current JSON-oriented commands (mix test.json --quiet --summary-only,mix dialyzer.json --quiet,mix credo --strict --format json,mix security,mix docs)
Common Mistakes to Avoid
- Creating abstractions too early - Start with direct usage
- Mocking in tests - Always use real WebSocket endpoints or Testing module
- Custom error types - Handle raw Gun/WebSocket errors
- Complex supervision - Use provided patterns (1, 2, or 3)
- Ignoring heartbeats - Configure heartbeat for production
Migration from Other Libraries
From Websockex
# Old (Websockex with callbacks)
defmodule MyClient do
use WebSockex
def handle_frame({:text, msg}, state), do: {:ok, state}
end
# New (ZenWebsocket - simpler)
{:ok, client} = ZenWebsocket.Client.connect(url)
# Messages handled via message_handler configurationFrom Gun directly
# You're already using the right approach!
# ZenWebsocket is a thin, focused layer over GunPerformance Characteristics
- Connection Time: < 100ms typical
- Message Latency: < 1ms processing
- Memory: ~50KB per connection
- Reconnection: Exponential backoff (1s, 2s, 4s...)
- Concurrency: Thousands of simultaneous connections
Required Environment Variables
For platform integrations:
# Deribit
export DERIBIT_CLIENT_ID="your_client_id"
export DERIBIT_CLIENT_SECRET="your_client_secret"
Best Practices Summary
- Start with Pattern 1 (direct) for development
- Move to Pattern 2 or 3 for production
- Configure heartbeats for long-lived connections
- Test against real endpoints or use Testing module
- Handle raw errors with pattern matching
- Use telemetry for monitoring
- Enable
record_tofor debugging production issues - Keep it simple - 5 core functions, monitoring functions when needed