Ethereumex.WebsocketServer (ethereumex v0.12.1)

View Source

WebSocket client implementation for Ethereum JSON-RPC API.

This module manages a persistent WebSocket connection to an Ethereum node and handles the complete request-response cycle for JSON-RPC calls, including subscriptions. It maintains state of ongoing requests, subscriptions, and matches responses to their original callers.

Features

  • Standard JSON-RPC requests via WebSocket
  • Real-time event subscriptions (newHeads, logs, newPendingTransactions)
  • Automatic reconnection with exponential backoff
  • Batch request support
  • Concurrent request handling

Request Types

Standard Requests

Standard JSON-RPC requests are handled through the post/1 function:

{:ok, result} = WebsocketServer.post(encoded_request)

Subscriptions

The module supports Ethereum's pub/sub functionality for real-time events:

# Subscribe to new block headers
{:ok, subscription_id} = WebsocketServer.subscribe(%{
  jsonrpc: "2.0",
  method: "eth_subscribe",
  params: ["newHeads"],
  id: 1
})

# Subscribe to logs with filter
{:ok, subscription_id} = WebsocketServer.subscribe(%{
  jsonrpc: "2.0",
  method: "eth_subscribe",
  params: ["logs", %{address: "0x123..."}],
  id: 2
})

# Unsubscribe (single or multiple subscriptions)
{:ok, true} = WebsocketServer.unsubscribe(%{
  jsonrpc: "2.0",
  method: "eth_unsubscribe",
  params: [subscription_id],
  id: 3
})

State Management

The module maintains several state maps:

%State{
  requests: %{request_id => caller_pid},                    # Standard requests
  subscription_requests: %{request_id => caller_pid},       # Pending subscriptions
  unsubscription_requests: %{request_id => caller_pid},     # Pending unsubscriptions
  subscriptions: %{subscription_id => subscriber_pid}       # Active subscriptions
}

Subscription Notifications

When a subscribed event occurs, the notification is automatically forwarded to the subscriber process. Notifications are received as messages in the format:

# New headers notification
%{
  "jsonrpc" => "2.0",
  "method" => "eth_subscription",
  "params" => %{
    "subscription" => "0x9cef478923ff08bf67fde6c64013158d",
    "result" => %{
      "number" => "0x1b4",
      "hash" => "0x8216c5785ac562ff41e2dcfdf5785ac562ff41e2dcfdf829c5a142f1fccd7d",
      "parentHash" => "0x9646252be9520f6e71339a8df9c55e4d7619deeb018d2a3f2d21fc165dde5eb5"
    }
  }
}

# Logs notification
%{
  "jsonrpc" => "2.0",
  "method" => "eth_subscription",
  "params" => %{
    "subscription" => "0x4a8a4c0517381924f9838102c5a4dcb7",
    "result" => %{
      "address" => "0x8320fe7702b96808f7bbc0d4a888ed1468216cfd",
      "topics" => ["0xd78a0cb8bb633d06981248b816e7bd33c2a35a6089241d099fa519e361cab902"],
      "data" => "0x000000000000000000000000000000000000000000000000000000000000000a"
    }
  }
}

Error Handling

  • Connection failures are automatically retried with exponential backoff:
    • Up to @max_reconnect_attempts attempts
    • Starting with @backoff_initial_delay ms delay, doubling each attempt
    • Reconnection attempts are logged
  • Request timeouts after @request_timeout ms
  • Invalid JSON responses are handled gracefully
  • Unmatched responses (no waiting caller) are safely ignored
  • Subscription errors are propagated to the subscriber

Summary

Functions

Sends a JSON-RPC request and waits for response.

Starts the WebSocket connection.

Subscribes to Ethereum events via WebSocket.

Unsubscribes from an existing Ethereum event subscription.

Types

event_type()

@type event_type() :: :newHeads | :logs | :newPendingTransactions

request_id()

@type request_id() :: pos_integer() | String.t()

subscription_id()

@type subscription_id() :: String.t()

Functions

post(encoded_request)

@spec post(binary()) ::
  {:ok, term()} | {:error, :invalid_request_format | :timeout | :decoded_error}

Sends a JSON-RPC request and waits for response.

Returns {:ok, result} on success or {:error, reason} on failure. Times out after 5000ms.

start_link(opts \\ [])

@spec start_link(keyword()) :: {:ok, pid()} | {:error, term()}

Starts the WebSocket connection.

Options

  • :url - WebSocket endpoint URL (defaults to Config.websocket_url())
  • :name - Process name (defaults to MODULE)

subscribe(request)

@spec subscribe(map()) ::
  {:ok, subscription_id()}
  | {:error, :invalid_request_format | :timeout | :decoded_error}

Subscribes to Ethereum events via WebSocket.

The request should be a map containing:

  • id: A unique request identifier
  • method: "eth_subscribe"
  • params: Parameters for the subscription, including the event type

Returns {:ok, subscription_id} on success or {:error, reason} on failure. Times out after 5000ms.

unsubscribe(request)

@spec unsubscribe(map()) ::
  {:ok, true} | {:error, :invalid_request_format | :timeout | :decoded_error}

Unsubscribes from an existing Ethereum event subscription.

The request should be a map containing:

  • id: A unique request identifier
  • method: "eth_unsubscribe"
  • params: A list containing the subscription IDs to unsubscribe from

Returns {:ok, true} on success or {:error, reason} on failure. Times out after 5000ms.