ZenWebsocket.RequestCorrelator (ZenWebsocket v0.4.2)

Copy Markdown View Source

Manages request/response correlation for JSON-RPC WebSocket connections.

Pure functional module - state ownership stays with Client GenServer. Tracks pending requests with timeouts and matches responses by ID.

Telemetry Events

The following telemetry events are emitted:

  • [:zen_websocket, :request_correlator, :track] - Emitted when a request is tracked.

    • Measurements: %{count: 1}
    • Metadata: %{id: id, timeout_ms: timeout}
  • [:zen_websocket, :request_correlator, :resolve] - Emitted when a response is matched.

    • Measurements: %{count: 1, round_trip_ms: milliseconds}
    • Metadata: %{id: id}
  • [:zen_websocket, :request_correlator, :timeout] - Emitted when a request times out.

    • Measurements: %{count: 1}
    • Metadata: %{id: id}
  • [:zen_websocket, :request_correlator, :fail_all] - Emitted for each pending request failed via fail_all/2 (e.g., on connection loss).

    • Measurements: %{count: 1}
    • Metadata: %{id: id, reason: reason}

API Functions

FunctionArityDescriptionParam Kinds
pending_count1Return the count of pending requests.state: value
fail_all2Fail every pending request with the given reason.state: value, reason: value
timeout2Handle a timeout for a pending request.state: value, id: value
resolve2Resolve a pending request by ID, returning the caller info.state: value, id: value
track4Track a pending request with a timeout timer.state: value, id: value, from: value, timeout_ms: value
extract_id1Extract the request ID from a JSON message.message: value

Summary

Types

Client state map containing pending_requests field (subset of Client.state)

Functions

Extracts the request ID from a JSON message.

Fails every pending request with {:error, reason} and cancels their timeout timers.

Returns the count of pending requests.

Resolves a pending request by ID, returning the caller info.

Handles a timeout for a pending request.

Tracks a pending request with a timeout timer.

Types

state()

@type state() :: %{
  :pending_requests => %{
    optional(term()) => {GenServer.from(), reference(), integer()}
  },
  optional(atom()) => term()
}

Client state map containing pending_requests field (subset of Client.state)

Functions

extract_id(message)

@spec extract_id(binary()) :: {:ok, term()} | :no_id

Extracts the request ID from a JSON message.

Returns {:ok, id} if the message contains a non-nil ID field, or :no_id if no ID is present, the message is not valid JSON, or the ID is nil.

fail_all(state, reason)

@spec fail_all(state(), term()) :: state()

Fails every pending request with {:error, reason} and cancels their timeout timers.

Used when the connection drops and in-flight correlated responses can no longer arrive. Callers blocked on GenServer.call receive the reply immediately instead of waiting for their per-call timeout.

pending_count(state)

@spec pending_count(state()) :: non_neg_integer()

Returns the count of pending requests.

resolve(state, id)

@spec resolve(state(), term()) ::
  {{GenServer.from(), reference(), integer()} | nil, state()}

Resolves a pending request by ID, returning the caller info.

Cancels the timeout timer and removes the request from pending. Returns {entry, new_state} where entry is {from, timeout_ref, start_time} or nil.

timeout(state, id)

@spec timeout(state(), term()) ::
  {{GenServer.from(), reference(), integer()} | nil, state()}

Handles a timeout for a pending request.

Removes the request from pending and returns the caller info. Returns {entry, new_state} where entry is {from, timeout_ref, start_time} or nil.

track(state, id, from, timeout_ms)

@spec track(state(), term(), GenServer.from(), pos_integer()) ::
  {:ok, state()} | {:error, :duplicate_id, state()}

Tracks a pending request with a timeout timer.

Creates a timer that will send {:timeout, timeout_ref, {:correlation_timeout, id}} to self() after the specified timeout. Must be called from within a GenServer context.

Returns {:ok, new_state} on success. If id already has a pending entry, returns {:error, :duplicate_id, state} with state unchanged — the existing caller's from and timeout timer are preserved. No timer is started and no :track telemetry is emitted on the duplicate path.