ZenWebsocket Usage Rules

Copy Markdown View Source

Core Principles

  1. Start Simple: Use direct connection for development, add supervision for production
  2. 5 Essential Functions: The core API is 5 functions (connect, send_message, subscribe, get_state, close), plus monitoring functions (get_latency_stats, get_heartbeat_health, get_state_metrics, reconnect)
  3. Real API Testing: Always test against real endpoints, never mock WebSocket behavior

Quick Start Pattern

# Simplest possible usage - connect and send
{:ok, client} = ZenWebsocket.Client.connect("wss://test.deribit.com/ws/api/v2")
ZenWebsocket.Client.send_message(client, Jason.encode!(%{method: "public/test"}))

The 5 Essential Functions

# 1. Connect to WebSocket
{:ok, client} = ZenWebsocket.Client.connect(url, opts)

# 2. Send messages (must be binary — use Jason.encode!/1 for maps)
:ok = ZenWebsocket.Client.send_message(client, Jason.encode!(%{method: "public/test"}))

# 3. Subscribe to channels
:ok = ZenWebsocket.Client.subscribe(client, channels)

# 4. Check connection state
state = ZenWebsocket.Client.get_state(client)  # :connected, :connecting, :disconnected

# 5. Close connection
:ok = ZenWebsocket.Client.close(client)

Additional Monitoring Functions

# Latency percentiles (p50/p99/last/count — all integers in ms)
stats = ZenWebsocket.Client.get_latency_stats(client)

# Heartbeat health (failure_count, last_heartbeat_at, config, timer_active)
health = ZenWebsocket.Client.get_heartbeat_health(client)

# Connection metrics (subscriptions_size, pending_requests_size, state_memory, ...)
metrics = ZenWebsocket.Client.get_state_metrics(client)

# Explicit reconnection using the stored connection contract
{:ok, new_client} = ZenWebsocket.Client.reconnect(client)

Common Patterns

Pattern 1: Development/Testing (No Supervision)

# Direct connection - crashes won't restart
{:ok, client} = ZenWebsocket.Client.connect(url)
# Use the client...
ZenWebsocket.Client.close(client)

Pattern 2: Production with Dynamic Connections

# Add to your supervision tree
children = [
  ZenWebsocket.ClientSupervisor,
  # ... other children
]

# Start connections dynamically
{:ok, client} = ZenWebsocket.ClientSupervisor.start_client(url, opts)

Pattern 3: Production with Fixed Connections

# Add specific clients to supervision tree
children = [
  {ZenWebsocket.Client, [
    url: "wss://api.example.com/ws",
    id: :main_websocket,
    heartbeat_config: %{type: :ping_pong, interval: 30_000}
  ]}
]

Configuration Options

opts = [
  # Connection
  timeout: 5000,              # Connection timeout in ms
  headers: [],                # Additional headers
  debug: false,               # Enable verbose debug logging

  # Reconnection
  retry_count: 3,             # Max reconnection attempts
  retry_delay: 1000,          # Initial retry delay (exponential backoff)
  reconnect_on_error: true,   # Auto-reconnect on errors
  restore_subscriptions: true, # Restore subscriptions after reconnect

  # Heartbeat
  heartbeat_config: %{
    type: :ping_pong,         # :ping_pong, :deribit, :binance
    interval: 30_000,         # Heartbeat interval in ms
    message: nil              # Reserved for future custom heartbeat support
  },

  # Session Recording
  record_to: "/tmp/session.jsonl",  # Enable message recording (nil to disable)

  # Latency Monitoring
  latency_buffer_size: 100    # Samples for p50/p99 calculations
]

Custom Client Discovery (Distributed Applications)

For distributed applications using :pg, Horde, or other registries, ZenWebsocket provides hooks to integrate with your registry of choice.

Lifecycle Callbacks

Register clients with your registry using on_connect/on_disconnect:

{:ok, client} = ZenWebsocket.ClientSupervisor.start_client(
  "wss://api.example.com/ws",
  on_connect: fn pid -> :pg.join(:ws_pool, pid) end,
  on_disconnect: fn pid -> :pg.leave(:ws_pool, pid) end
)

Important: on_disconnect is called during terminate/2, which requires a graceful shutdown. It will NOT be called if the process is killed with :kill signal.

Custom Discovery for Load Balancing

Use client_discovery to route messages across nodes:

ZenWebsocket.ClientSupervisor.send_balanced(
  message,
  client_discovery: fn -> :pg.get_members(:ws_pool) end
)

Without client_discovery, send_balanced/2 defaults to list_clients/0 (local connections only).

Example: Multi-Node Setup with pg

# Node A and Node B both run:
:pg.start_link()

# Define callbacks once
defmodule MyApp.WSCallbacks do
  def on_connect(pid), do: :pg.join(:ws_pool, pid)
  def on_disconnect(pid), do: :pg.leave(:ws_pool, pid)
end

# Start clients with pg callbacks
{:ok, _} = ZenWebsocket.ClientSupervisor.start_client(url,
  on_connect: &MyApp.WSCallbacks.on_connect/1,
  on_disconnect: &MyApp.WSCallbacks.on_disconnect/1
)

# Route to any healthy client across all nodes
ZenWebsocket.ClientSupervisor.send_balanced(msg,
  client_discovery: fn -> :pg.get_members(:ws_pool) end
)

Example: Horde Registry

# With Horde for distributed process registry
{:ok, _} = ZenWebsocket.ClientSupervisor.start_client(url,
  on_connect: fn pid ->
    Horde.Registry.register(MyApp.WSRegistry, {:ws_client, pid}, pid)
  end,
  on_disconnect: fn pid ->
    Horde.Registry.unregister(MyApp.WSRegistry, {:ws_client, pid})
  end
)

# Custom discovery using Horde
ZenWebsocket.ClientSupervisor.send_balanced(msg,
  client_discovery: fn ->
    Horde.Registry.select(MyApp.WSRegistry, [{{:ws_client, :_}, :_, :"$1"}, [], [:"$1"]}])
  end
)

Callback Error Handling

Callback errors are caught and logged - they won't crash the client or prevent connection/termination:

# This won't crash the client
on_connect: fn _pid -> raise "intentional error" end
# Warning logged: "Lifecycle callback error: %RuntimeError{message: \"intentional error\"}"

Session Recording

Record WebSocket sessions for debugging, testing, and replay:

# Enable recording when connecting
{:ok, client} = ZenWebsocket.Client.connect(url, record_to: "/tmp/debug.jsonl")

# Use the connection normally - all messages are recorded
ZenWebsocket.Client.send_message(client, Jason.encode!(%{action: "subscribe", channel: "trades"}))

# Close to flush remaining buffer
ZenWebsocket.Client.close(client)

# Get session metadata (count, duration, timestamps)
{:ok, meta} = ZenWebsocket.Recorder.metadata("/tmp/debug.jsonl")
# => %{count: 42, inbound: 30, outbound: 12, duration_ms: 5000, ...}

# Replay the recorded session
ZenWebsocket.Recorder.replay("/tmp/debug.jsonl", fn entry ->
  IO.inspect(entry, label: "#{entry.dir} at #{entry.ts}")
end)

Recording format: JSONL (one JSON object per line) for streaming writes. Binary frames are base64-encoded.

Platform-Specific Rules

Deribit Integration

# Use the Deribit adapter for complete integration
{:ok, adapter} = ZenWebsocket.Examples.DeribitAdapter.start_link([
  url: "wss://test.deribit.com/ws/api/v2",
  client_id: System.get_env("DERIBIT_CLIENT_ID"),
  client_secret: System.get_env("DERIBIT_CLIENT_SECRET")
])

# The adapter handles:
# - Authentication flow
# - Heartbeat/test_request
# - Subscription management
# - Cancel-on-disconnect

Reconnection Behavior

ZenWebsocket supports two reconnect paths with different preservation semantics.

Automatic Reconnect (reconnect_on_error: true)

When a connection drops and reconnect_on_error: true (the default), the same Client GenServer reconnects with exponential backoff.

Preserved Across Automatic Reconnect

StateDetails
Config structFull validated ZenWebsocket.Config struct
Handler callbackSame function reference — no need to re-register
Heartbeat configTimer restarted with original interval after reconnect
SubscriptionsRestored automatically if restore_subscriptions: true (default)
Latency statsHistorical measurements accumulate across reconnects
Session recorderContinues recording to the same file
on_disconnect callbackSame function reference

Reset on Automatic Reconnect

StateDetails
Runtime retry counterstate.retry_count resets to 0 after successful reconnect; config retry_count stays unchanged
Heartbeat failuresCounter reset to 0
Heartbeat timerCancelled on disconnect, restarted on reconnect
Gun PID / stream refNew connection process and stream

Pending Requests on Automatic Reconnect

Pending RPC requests (pending_requests) remain in state across reconnects. Responses on the new connection may not match original request IDs — callers should handle request timeouts gracefully.

Explicit Reconnect (Client.reconnect/1)

Client.reconnect/1 starts a fresh Client process using the stored connection contract from the original client struct returned by connect/2 or ClientSupervisor.start_client/2.

If the original client was started under ClientSupervisor.start_client/2, explicit reconnect goes back through ClientSupervisor.start_client/2 so the replacement client stays supervised and reruns :on_connect.

Preserved Across Explicit Reconnect

StateDetails
Config structFull validated ZenWebsocket.Config struct, including headers/timeouts/retry settings
Handler callbackSame function reference
Heartbeat configSame heartbeat configuration for the new client
Supervision modeSupervised clients reconnect as supervised clients; direct clients reconnect directly
on_connect callbackRerun for supervised reconnect so registries can re-register the new PID
on_disconnect callbackSame function reference

Reset on Explicit Reconnect

StateDetails
SubscriptionsFresh client state — resubscribe after reconnect if needed
Pending requestsFresh client state — in-flight requests from the old client do not carry over
Latency stats / heartbeat stateFresh client state with new counters and timers
Server / Gun PIDsNew Client GenServer, Gun process, and stream ref

Error Handling

# All functions return tagged tuples
case ZenWebsocket.Client.connect(url) do
  {:ok, client} ->
    # Success path
    client

  {:error, reason} ->
    # Get human-readable explanation with fix suggestion
    explanation = ZenWebsocket.ErrorHandler.explain(reason)
    Logger.error("#{explanation.message}. #{explanation.suggestion}")
    # Errors are passed raw from Gun/WebSocket
    # Common errors: :timeout, :connection_refused, :protocol_error
end

Handler Message Reference

Your handler function (passed via connect/2 as :handler) receives one of the tuple shapes below. This section is the complete contract — matched against ZenWebsocket.Client.handler_message/0.

Input Shapes

The tuples delivered to your handler:

ShapeWhen emittedPayload
{:message, map}Decoded JSON frame (including subscription updates)Decoded map
{:message, binary}Text frame that did not decode as JSONRaw text binary
{:binary, binary}WebSocket binary frameRaw bytes
{:unmatched_response, map}JSON-RPC response whose "id" did not match any pending requestDecoded response map
{:protocol_error, reason}Fatal, unrecoverable frame error (client will stop)Unwrapped reason

Ping/pong/close control frames are handled transparently by the client and never reach your handler. Any frame decode error is fatal — {:protocol_error, _} is the only error shape you receive.

Custom Handler Example

A pattern-matching handler that distinguishes every shape:

handler = fn
  {:message, %{} = json} ->
    # Decoded JSON frame (subscription update or general message)
    MyApp.Router.route(json)

  {:message, text} when is_binary(text) ->
    # Text frame that was not valid JSON
    MyApp.TextStream.receive(text)

  {:binary, bin} ->
    MyApp.BinaryStream.receive(bin)

  {:unmatched_response, response} ->
    # Late reply after RequestCorrelator already timed it out, or an ID collision
    Logger.warning("unmatched response: #{inspect(response)}")

  {:protocol_error, reason} ->
    Logger.error("fatal protocol error: #{inspect(reason)}")
end

{:ok, client} = ZenWebsocket.Client.connect(url, handler: handler)

Handler return values are ignored.

Default Handler Translation

If you do not pass :handler, a default handler forwards messages to the parent process as {:websocket_*, _} tuples. The translation:

Input shapeMessage sent to parent
{:message, data}{:websocket_message, data}
{:binary, data}{:websocket_message, data} (same tag)
{:unmatched_response, response}{:websocket_unmatched_response, response}
{:protocol_error, reason}{:websocket_protocol_error, reason}

Note that {:message, _} and {:binary, _} both collapse to :websocket_message, so the default handler cannot distinguish text from binary frames. If you need to tell them apart, supply a custom handler.

Testing Rules

# Use the Testing module for controlled tests
alias ZenWebsocket.Testing

# Start a mock server
{:ok, server} = Testing.start_mock_server()
{:ok, client} = ZenWebsocket.Client.connect(server.url)

# Inject messages from server to client
Testing.inject_message(server, ~s({"type": "hello"}))

# Assert client sent expected message (supports string, regex, map, or function matchers)
assert Testing.assert_message_sent(server, %{"type" => "ping"}, 1000)

# Simulate disconnects for error handling tests
Testing.simulate_disconnect(server, :going_away)

# Cleanup
Testing.stop_server(server)

ExUnit Integration Pattern

defmodule MyTest do
  use ExUnit.Case
  alias ZenWebsocket.Testing

  setup do
    {:ok, server} = Testing.start_mock_server()
    on_exit(fn -> Testing.stop_server(server) end)
    {:ok, server: server}
  end

  test "client handles server message", %{server: server} do
    {:ok, client} = ZenWebsocket.Client.connect(server.url)
    Testing.inject_message(server, ~s({"type": "pong"}))
    assert_receive {:websocket_message, _}, 1000
    ZenWebsocket.Client.close(client)
  end
end

Real API Testing

# For integration tests against real endpoints
@tag :integration
test "real WebSocket behavior" do
  {:ok, client} = ZenWebsocket.Client.connect("wss://test.deribit.com/ws/api/v2")
  # Test against real API...
end

DO NOT

  1. Don't create wrapper modules - Use the Client functions directly
  2. Don't mock WebSocket behavior - Test against real endpoints or use Testing module
  3. Don't add custom reconnection - Use built-in retry options
  4. Don't transform errors - Handle raw Gun/WebSocket errors
  5. Don't avoid GenServers - Client uses GenServer appropriately for state

Architecture Notes

  • Gun Transport: Built on Gun for HTTP/2 and WebSocket
  • GenServer State: Client maintains connection state in GenServer
  • ETS Registry: Fast connection lookups via ETS
  • Exponential Backoff: Smart reconnection with backoff
  • Real API Testing: All tests use real APIs or Testing module (no mocks)

Monitoring and Observability

Latency Statistics

# Get latency metrics (p50/p99/last/count — all integers in ms)
stats = ZenWebsocket.Client.get_latency_stats(client)
# => %{p50: 12, p99: 45, last: 10, count: 100}

Heartbeat Health

# Check heartbeat status
health = ZenWebsocket.Client.get_heartbeat_health(client)
# => %{failure_count: 0, last_heartbeat_at: -576460748, config: :disabled, timer_active: false}
# Note: last_heartbeat_at is a monotonic timestamp (System.monotonic_time(:millisecond)), not a wall-clock DateTime

State Metrics

# Get connection state metrics
metrics = ZenWebsocket.Client.get_state_metrics(client)
# => %{subscriptions_size: 12, pending_requests_size: 5, state_memory: 1024, ...}

Rate Limiter Status

# Check rate limiter pressure
status = ZenWebsocket.RateLimiter.status(limiter)
# => %{tokens: 85, queue_size: 5, pressure_level: :low, suggested_delay_ms: 0}
# pressure_level: :none (<25%), :low (25-50%), :medium (50-75%), :high (>75%)

Key Telemetry Events

EventMeasurementsWhen
[:zen_websocket, :connection, :upgrade]connect_time_msWebSocket upgrade complete
[:zen_websocket, :heartbeat, :pong]rtt_msHeartbeat response received
[:zen_websocket, :rate_limiter, :consume]tokens_remaining, costToken consumed
[:zen_websocket, :rate_limiter, :refill]tokens_before, tokens_after, refill_rateBucket refilled
[:zen_websocket, :rate_limiter, :queue]queue_size, costRequest queued
[:zen_websocket, :rate_limiter, :queue_full]queue_sizeQueue at capacity
[:zen_websocket, :rate_limiter, :pressure]queue_size, ratioPressure threshold crossed
[:zen_websocket, :request_correlator, :track]countRequest tracked
[:zen_websocket, :request_correlator, :resolve]count, round_trip_msResponse correlated
[:zen_websocket, :request_correlator, :timeout]countRequest timed out
[:zen_websocket, :request_correlator, :fail_all]countPending request failed on disconnect (metadata: id, reason)
[:zen_websocket, :subscription_manager, :add]countSubscription added
[:zen_websocket, :subscription_manager, :remove]countSubscription removed
[:zen_websocket, :subscription_manager, :restore]channel_countSubscriptions restored
[:zen_websocket, :pool, :route]health, pool_sizeConnection selected
[:zen_websocket, :pool, :health]pool_size, avg_healthPool health snapshot
[:zen_websocket, :pool, :failover]attemptFailover triggered

See Performance Tuning Guide for complete telemetry reference.

# Attach to telemetry events
:telemetry.attach(
  "websocket-logger",
  [:zen_websocket, :connection, :upgrade],
  fn _event, measurements, _metadata, _config ->
    Logger.info("WebSocket connected in #{measurements.connect_time_ms}ms")
  end,
  nil
)

Module Limits

Each module follows strict simplicity rules:

  • Maximum 5 public functions per new module (existing core modules may exceed this)
  • Maximum 15 lines per function
  • Maximum 2 levels of function calls
  • Real API testing only (no mocks)

Getting Help

  • Examples: See lib/zen_websocket/examples/ directory
  • Tests: Review test/ for usage patterns
  • Deribit: See DeribitAdapter for complete platform integration
  • Guides: See docs/guides/ for performance tuning and adapter building
  • Verification workflow: See CLAUDE.md for the current JSON-oriented commands (mix test.json --quiet --summary-only, mix dialyzer.json --quiet, mix credo --strict --format json, mix security, mix docs)

Common Mistakes to Avoid

  1. Creating abstractions too early - Start with direct usage
  2. Mocking in tests - Always use real WebSocket endpoints or Testing module
  3. Custom error types - Handle raw Gun/WebSocket errors
  4. Complex supervision - Use provided patterns (1, 2, or 3)
  5. Ignoring heartbeats - Configure heartbeat for production

Migration from Other Libraries

From Websockex

# Old (Websockex with callbacks)
defmodule MyClient do
  use WebSockex
  def handle_frame({:text, msg}, state), do: {:ok, state}
end

# New (ZenWebsocket - simpler)
{:ok, client} = ZenWebsocket.Client.connect(url)
# Messages handled via message_handler configuration

From Gun directly

# You're already using the right approach!
# ZenWebsocket is a thin, focused layer over Gun

Performance Characteristics

  • Connection Time: < 100ms typical
  • Message Latency: < 1ms processing
  • Memory: ~50KB per connection
  • Reconnection: Exponential backoff (1s, 2s, 4s...)
  • Concurrency: Thousands of simultaneous connections

Required Environment Variables

For platform integrations:

# Deribit
export DERIBIT_CLIENT_ID="your_client_id"
export DERIBIT_CLIENT_SECRET="your_client_secret"

Best Practices Summary

  1. Start with Pattern 1 (direct) for development
  2. Move to Pattern 2 or 3 for production
  3. Configure heartbeats for long-lived connections
  4. Test against real endpoints or use Testing module
  5. Handle raw errors with pattern matching
  6. Use telemetry for monitoring
  7. Enable record_to for debugging production issues
  8. Keep it simple - 5 core functions, monitoring functions when needed