Ethereumex.WebsocketServer (ethereumex v0.12.1)
View SourceWebSocket client implementation for Ethereum JSON-RPC API.
This module manages a persistent WebSocket connection to an Ethereum node and handles the complete request-response cycle for JSON-RPC calls, including subscriptions. It maintains state of ongoing requests, subscriptions, and matches responses to their original callers.
Features
- Standard JSON-RPC requests via WebSocket
- Real-time event subscriptions (newHeads, logs, newPendingTransactions)
- Automatic reconnection with exponential backoff
- Batch request support
- Concurrent request handling
Request Types
Standard Requests
Standard JSON-RPC requests are handled through the post/1
function:
{:ok, result} = WebsocketServer.post(encoded_request)
Subscriptions
The module supports Ethereum's pub/sub functionality for real-time events:
# Subscribe to new block headers
{:ok, subscription_id} = WebsocketServer.subscribe(%{
jsonrpc: "2.0",
method: "eth_subscribe",
params: ["newHeads"],
id: 1
})
# Subscribe to logs with filter
{:ok, subscription_id} = WebsocketServer.subscribe(%{
jsonrpc: "2.0",
method: "eth_subscribe",
params: ["logs", %{address: "0x123..."}],
id: 2
})
# Unsubscribe (single or multiple subscriptions)
{:ok, true} = WebsocketServer.unsubscribe(%{
jsonrpc: "2.0",
method: "eth_unsubscribe",
params: [subscription_id],
id: 3
})
State Management
The module maintains several state maps:
%State{
requests: %{request_id => caller_pid}, # Standard requests
subscription_requests: %{request_id => caller_pid}, # Pending subscriptions
unsubscription_requests: %{request_id => caller_pid}, # Pending unsubscriptions
subscriptions: %{subscription_id => subscriber_pid} # Active subscriptions
}
Subscription Notifications
When a subscribed event occurs, the notification is automatically forwarded to the subscriber process. Notifications are received as messages in the format:
# New headers notification
%{
"jsonrpc" => "2.0",
"method" => "eth_subscription",
"params" => %{
"subscription" => "0x9cef478923ff08bf67fde6c64013158d",
"result" => %{
"number" => "0x1b4",
"hash" => "0x8216c5785ac562ff41e2dcfdf5785ac562ff41e2dcfdf829c5a142f1fccd7d",
"parentHash" => "0x9646252be9520f6e71339a8df9c55e4d7619deeb018d2a3f2d21fc165dde5eb5"
}
}
}
# Logs notification
%{
"jsonrpc" => "2.0",
"method" => "eth_subscription",
"params" => %{
"subscription" => "0x4a8a4c0517381924f9838102c5a4dcb7",
"result" => %{
"address" => "0x8320fe7702b96808f7bbc0d4a888ed1468216cfd",
"topics" => ["0xd78a0cb8bb633d06981248b816e7bd33c2a35a6089241d099fa519e361cab902"],
"data" => "0x000000000000000000000000000000000000000000000000000000000000000a"
}
}
}
Error Handling
- Connection failures are automatically retried with exponential backoff:
- Up to
@max_reconnect_attempts
attempts - Starting with
@backoff_initial_delay
ms delay, doubling each attempt - Reconnection attempts are logged
- Up to
- Request timeouts after
@request_timeout
ms - Invalid JSON responses are handled gracefully
- Unmatched responses (no waiting caller) are safely ignored
- Subscription errors are propagated to the subscriber
Summary
Functions
Sends a JSON-RPC request and waits for response.
Starts the WebSocket connection.
Subscribes to Ethereum events via WebSocket.
Unsubscribes from an existing Ethereum event subscription.
Types
@type event_type() :: :newHeads | :logs | :newPendingTransactions
@type request_id() :: pos_integer() | String.t()
@type subscription_id() :: String.t()
Functions
@spec post(binary()) :: {:ok, term()} | {:error, :invalid_request_format | :timeout | :decoded_error}
Sends a JSON-RPC request and waits for response.
Returns {:ok, result}
on success or {:error, reason}
on failure.
Times out after 5000ms.
Starts the WebSocket connection.
Options
- :url - WebSocket endpoint URL (defaults to Config.websocket_url())
- :name - Process name (defaults to MODULE)
@spec subscribe(map()) :: {:ok, subscription_id()} | {:error, :invalid_request_format | :timeout | :decoded_error}
Subscribes to Ethereum events via WebSocket.
The request should be a map containing:
- id: A unique request identifier
- method: "eth_subscribe"
- params: Parameters for the subscription, including the event type
Returns {:ok, subscription_id}
on success or {:error, reason}
on failure.
Times out after 5000ms.
@spec unsubscribe(map()) :: {:ok, true} | {:error, :invalid_request_format | :timeout | :decoded_error}
Unsubscribes from an existing Ethereum event subscription.
The request should be a map containing:
- id: A unique request identifier
- method: "eth_unsubscribe"
- params: A list containing the subscription IDs to unsubscribe from
Returns {:ok, true}
on success or {:error, reason}
on failure.
Times out after 5000ms.