# `Onchain.Subscription`
[🔗](https://github.com/ZenHive/onchain/blob/v0.5.4/lib/onchain/subscription.ex#L1)

Real-time Ethereum subscriptions via `eth_subscribe` over WebSocket.

Wraps [zen_websocket](https://hex.pm/packages/zen_websocket) with Ethereum-specific
subscription management. Supports three subscription types: new block headers,
pending transactions, and filtered event logs.

## Does

- Connect to a WebSocket-capable Ethereum endpoint (`connect/2`)
- Subscribe to `newHeads`, `newPendingTransactions`, and `logs` (`subscribe/3`)
- Parse subscription notifications into normalized Elixir maps
- Deliver parsed events via handler function or process messages
- Unsubscribe and clean up resources (`unsubscribe/2`, `close/1`)

## Does Not

- Persist or index events (see rexex for durable indexing)
- Convert HTTP RPC URLs to WebSocket URLs (consumer provides `wss://` directly)
- Manage reconnection subscriptions (delegates reconnection to zen_websocket)

## Event Delivery

Events are delivered via a handler function passed to `connect/2`. The default
handler sends `{:subscription, event}` messages to the calling process.

Event shapes:
- `{:new_heads, subscription_id, head_map}`
- `{:pending_transactions, subscription_id, tx_hash}`
- `{:logs, subscription_id, log_map}`
- `{:parse_error, subscription_id, reason}` — malformed notification; `reason` is a
  tagged tuple from the internal parser
  (`{:invalid_head, _}` | `{:invalid_tx_hash, _}` | `{:invalid_log, _}`)

## Subscribe Race Buffering

The race between `eth_subscribe`'s RPC reply (which returns the `subscription_id`)
and the Agent registration of that id is closed by buffering: notifications that
arrive for an unregistered `subscription_id` are queued per-id (cap
`100` entries; oldest dropped on overflow with a `Logger.warning`).
On registration, buffered notifications are flushed FIFO through the same handler
path before `subscribe/3` returns.

Cross-buffer / post-registration ordering is best-effort: a notification that
arrives between the atomic register-and-drain step and the synchronous flush is
dispatched immediately and may interleave with buffered events. Acceptable for
self-contained, independent notifications (heads, hashes, logs).

Handler exceptions during flush propagate (consistent with fire-and-forget
`dispatch_event/4` semantics). Remaining buffered events for that flush are lost;
Agent state remains consistent.

## Error Format

- Connection failures: `{:error, {:connection_error, reason}}`
- RPC errors: `{:error, {:rpc_error, %{code: integer, message: string}}}`
- Invalid subscription type: `{:error, {:invalid_subscription_type, type}}`

## Functions

| Function | Purpose |
|----------|---------|
| `connect/2` | Open WebSocket connection to Ethereum node |
| `connect!/2` | Same, raises on error |
| `subscribe/3` | Subscribe to a notification type |
| `subscribe!/3` | Same, raises on error |
| `unsubscribe/2` | Cancel a subscription by ID |
| `unsubscribe!/2` | Same, raises on error |
| `close/1` | Close connection and free resources |

## API Functions
| Function | Arity | Description | Param Kinds |
| --- | --- | --- | --- |
| `close` | 1 | Close the WebSocket connection and free resources. | `sub: value` |
| `unsubscribe!` | 2 | Cancel a subscription by ID. Raises on error. | `sub: value`, `subscription_id: value` |
| `unsubscribe` | 2 | Cancel a subscription by ID. | `sub: value`, `subscription_id: value` |
| `subscribe!` | 3 | Subscribe to an Ethereum notification type. Raises on error. | `sub: value`, `type: value`, `opts: value` |
| `subscribe` | 3 | Subscribe to an Ethereum notification type. | `sub: value`, `type: value`, `opts: value` |
| `connect!` | 2 | Open a WebSocket connection. Raises on error. | `ws_url: value`, `opts: value` |
| `connect` | 2 | Open a WebSocket connection to an Ethereum node. | `ws_url: value`, `opts: value` |

# `event`

```elixir
@type event() ::
  {:new_heads, String.t(), head()}
  | {:pending_transactions, String.t(), String.t()}
  | {:logs, String.t(), log()}
  | {:parse_error, String.t(), term()}
```

# `handler`

```elixir
@type handler() :: (event() -&gt; any())
```

# `head`

```elixir
@type head() :: %{
  number: non_neg_integer(),
  hash: String.t(),
  parent_hash: String.t(),
  timestamp: non_neg_integer(),
  miner: String.t(),
  gas_limit: non_neg_integer(),
  gas_used: non_neg_integer(),
  base_fee_per_gas: non_neg_integer() | nil,
  logs_bloom: String.t(),
  transactions_root: String.t(),
  state_root: String.t(),
  receipts_root: String.t()
}
```

# `log`

```elixir
@type log() :: %{
  address: String.t(),
  topics: [String.t()],
  data: String.t(),
  block_number: non_neg_integer(),
  transaction_hash: String.t(),
  log_index: non_neg_integer(),
  transaction_index: non_neg_integer(),
  removed: boolean()
}
```

# `subscription_type`

```elixir
@type subscription_type() :: :new_heads | :pending_transactions | {:logs, map()}
```

# `t`

```elixir
@type t() :: %Onchain.Subscription{
  agent: pid(),
  client: ZenWebsocket.Client.t(),
  handler: handler()
}
```

# `close`

```elixir
@spec close(t()) :: :ok
```

Close the WebSocket connection and free resources.

## Parameters

  * `sub` - Subscription handle (value)

## Returns

Always returns :ok (`:ok`)

```elixir
# descripex:contract
%{
  params: %{sub: %{description: "Subscription handle", kind: :value}},
  returns: %{type: ":ok", description: "Always returns :ok"}
}
```

# `connect`

```elixir
@spec connect(
  String.t(),
  keyword()
) :: {:ok, t()} | {:error, term()}
```

Open a WebSocket connection to an Ethereum node.

## Parameters

  * `ws_url` - WebSocket URL (wss:// or ws://) (value)
  * `opts` - Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff) (default: `[]`, value)

## Returns

Subscription handle for subscribe/unsubscribe/close calls (`{:ok, %Onchain.Subscription{}} | {:error, term()}`)

```elixir
# descripex:contract
%{
  params: %{
    opts: %{
      default: [],
      description: "Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff)",
      kind: :value
    },
    ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
  },
  returns: %{
    type: "{:ok, %Onchain.Subscription{}} | {:error, term()}",
    description: "Subscription handle for subscribe/unsubscribe/close calls"
  }
}
```

# `connect!`

```elixir
@spec connect!(
  String.t(),
  keyword()
) :: t()
```

Open a WebSocket connection. Raises on error.

## Parameters

  * `ws_url` - WebSocket URL (wss:// or ws://) (value)
  * `opts` - Same options as connect/2 (default: `[]`, value)

## Returns

Subscription handle (`%Onchain.Subscription{}`)

```elixir
# descripex:contract
%{
  params: %{
    opts: %{default: [], description: "Same options as connect/2", kind: :value},
    ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
  },
  returns: %{
    type: "%Onchain.Subscription{}",
    description: "Subscription handle"
  }
}
```

# `subscribe`

```elixir
@spec subscribe(t(), subscription_type(), keyword()) ::
  {:ok, String.t()} | {:error, term()}
```

Subscribe to an Ethereum notification type.

## Parameters

  * `sub` - Subscription handle from connect/2 (value)
  * `type` - Subscription type: :new_heads, :pending_transactions, or \{:logs, filter_map\} (value)
  * `opts` - Reserved for future options (default: `[]`, value)

## Returns

Subscription ID for unsubscribe (`{:ok, subscription_id} | {:error, term()}`)

```elixir
# descripex:contract
%{
  params: %{
    type: %{
      description: "Subscription type: :new_heads, :pending_transactions, or {:logs, filter_map}",
      kind: :value
    },
    opts: %{
      default: [],
      description: "Reserved for future options",
      kind: :value
    },
    sub: %{description: "Subscription handle from connect/2", kind: :value}
  },
  returns: %{
    type: "{:ok, subscription_id} | {:error, term()}",
    description: "Subscription ID for unsubscribe"
  }
}
```

# `subscribe!`

```elixir
@spec subscribe!(t(), subscription_type(), keyword()) :: String.t()
```

Subscribe to an Ethereum notification type. Raises on error.

## Parameters

  * `sub` - Subscription handle from connect/2 (value)
  * `type` - Same types as subscribe/3 (value)
  * `opts` - Reserved for future options (default: `[]`, value)

## Returns

Subscription ID (`String.t()`)

```elixir
# descripex:contract
%{
  params: %{
    type: %{description: "Same types as subscribe/3", kind: :value},
    opts: %{
      default: [],
      description: "Reserved for future options",
      kind: :value
    },
    sub: %{description: "Subscription handle from connect/2", kind: :value}
  },
  returns: %{type: "String.t()", description: "Subscription ID"}
}
```

# `unsubscribe`

```elixir
@spec unsubscribe(t(), String.t()) :: {:ok, boolean()} | {:error, term()}
```

Cancel a subscription by ID.

## Parameters

  * `sub` - Subscription handle (value)
  * `subscription_id` - Subscription ID from subscribe/3 (value)

## Returns

true if unsubscribed successfully (`{:ok, boolean()} | {:error, term()}`)

```elixir
# descripex:contract
%{
  params: %{
    sub: %{description: "Subscription handle", kind: :value},
    subscription_id: %{
      description: "Subscription ID from subscribe/3",
      kind: :value
    }
  },
  returns: %{
    type: "{:ok, boolean()} | {:error, term()}",
    description: "true if unsubscribed successfully"
  }
}
```

# `unsubscribe!`

```elixir
@spec unsubscribe!(t(), String.t()) :: boolean()
```

Cancel a subscription by ID. Raises on error.

## Parameters

  * `sub` - Subscription handle (value)
  * `subscription_id` - Subscription ID (value)

## Returns

true if unsubscribed (`boolean()`)

```elixir
# descripex:contract
%{
  params: %{
    sub: %{description: "Subscription handle", kind: :value},
    subscription_id: %{description: "Subscription ID", kind: :value}
  },
  returns: %{type: "boolean()", description: "true if unsubscribed"}
}
```

---

*Consult [api-reference.md](api-reference.md) for complete listing*
