Manages request/response correlation for JSON-RPC WebSocket connections.
Pure functional module - state ownership stays with Client GenServer. Tracks pending requests with timeouts and matches responses by ID.
Telemetry Events
The following telemetry events are emitted:
[:zen_websocket, :request_correlator, :track]- Emitted when a request is tracked.- Measurements:
%{count: 1} - Metadata:
%{id: id, timeout_ms: timeout}
- Measurements:
[:zen_websocket, :request_correlator, :resolve]- Emitted when a response is matched.- Measurements:
%{count: 1, round_trip_ms: milliseconds} - Metadata:
%{id: id}
- Measurements:
[:zen_websocket, :request_correlator, :timeout]- Emitted when a request times out.- Measurements:
%{count: 1} - Metadata:
%{id: id}
- Measurements:
[:zen_websocket, :request_correlator, :fail_all]- Emitted for each pending request failed viafail_all/2(e.g., on connection loss).- Measurements:
%{count: 1} - Metadata:
%{id: id, reason: reason}
- Measurements:
API Functions
| Function | Arity | Description | Param Kinds |
|---|---|---|---|
pending_count | 1 | Return the count of pending requests. | state: value |
fail_all | 2 | Fail every pending request with the given reason. | state: value, reason: value |
timeout | 2 | Handle a timeout for a pending request. | state: value, id: value |
resolve | 2 | Resolve a pending request by ID, returning the caller info. | state: value, id: value |
track | 4 | Track a pending request with a timeout timer. | state: value, id: value, from: value, timeout_ms: value |
extract_id | 1 | Extract the request ID from a JSON message. | message: value |
Summary
Functions
Extracts the request ID from a JSON message.
Fails every pending request with {:error, reason} and cancels their timeout timers.
Returns the count of pending requests.
Resolves a pending request by ID, returning the caller info.
Handles a timeout for a pending request.
Tracks a pending request with a timeout timer.
Types
Functions
Extracts the request ID from a JSON message.
Returns {:ok, id} if the message contains a non-nil ID field,
or :no_id if no ID is present, the message is not valid JSON,
or the ID is nil.
Fails every pending request with {:error, reason} and cancels their timeout timers.
Used when the connection drops and in-flight correlated responses can no longer
arrive. Callers blocked on GenServer.call receive the reply immediately
instead of waiting for their per-call timeout.
@spec pending_count(state()) :: non_neg_integer()
Returns the count of pending requests.
Resolves a pending request by ID, returning the caller info.
Cancels the timeout timer and removes the request from pending.
Returns {entry, new_state} where entry is {from, timeout_ref, start_time} or nil.
Handles a timeout for a pending request.
Removes the request from pending and returns the caller info.
Returns {entry, new_state} where entry is {from, timeout_ref, start_time} or nil.
@spec track(state(), term(), GenServer.from(), pos_integer()) :: {:ok, state()} | {:error, :duplicate_id, state()}
Tracks a pending request with a timeout timer.
Creates a timer that will send {:timeout, timeout_ref, {:correlation_timeout, id}}
to self() after the specified timeout. Must be called from within a
GenServer context.
Returns {:ok, new_state} on success. If id already has a pending entry,
returns {:error, :duplicate_id, state} with state unchanged — the
existing caller's from and timeout timer are preserved. No timer is
started and no :track telemetry is emitted on the duplicate path.