# `MOQX`
[🔗](https://github.com/dmorn/moqx/blob/main/lib/moqx.ex#L1)

Elixir bindings for Media over QUIC (MOQ) via Rustler NIFs on top of
`moqtail-rs`.

`MOQX` is a thin, low-level, functional async API with explicit correlation
and typed async message families.

This module is intentionally the low-level core contract:

- it does not hide protocol state behind blocking calls
- it does not add automatic retries or buffering policy
- it does not manage caller mailboxes for you
- it prefers explicit correlation refs/handles and typed async events

Convenience helpers such as `publish_catalog/2`, `update_catalog/2`,
`fetch_catalog/2`, `await_catalog/2`, and `await_track_active/2` live in
`MOQX.Helpers` on top of this contract. A future managed/stateful ergonomics
layer can also be built on top, but that is deliberately separate from the
`MOQX` core API itself.

Core design points:

- split roles only: publisher sessions publish and subscriber sessions subscribe
- explicit draft-14 protocol target via moqtail-rs / moqtail
- both subgroup-stream and object-datagram delivery are exposed explicitly
- async operations are correlated (`connect_ref`, `publish_ref`, `flush_ref`, `fetch_ref`, subscription handle)
- async outcomes are explicit and typed:
  - lifecycle/success events (e.g. `%MOQX.ConnectOk{}`, `%MOQX.SubscribeOk{}`)
  - request-level failures (`{:moqx_request_error, %MOQX.RequestError{}}`)
  - transport/runtime failures (`{:moqx_transport_error, %MOQX.TransportError{}}`)

## Example

    {:ok, connect_ref} = MOQX.connect_subscriber("https://relay.example.com")

    subscriber =
      receive do
        {:moqx_connect_ok, %MOQX.ConnectOk{ref: ^connect_ref, session: session}} -> session
        {:moqx_request_error, %MOQX.RequestError{op: :connect, ref: ^connect_ref} = err} ->
          raise "connect rejected: #{inspect(err)}"

        {:moqx_transport_error, %MOQX.TransportError{op: :connect, ref: ^connect_ref} = err} ->
          raise "connect runtime failure: #{inspect(err)}"
      end

    {:ok, handle} = MOQX.subscribe(subscriber, "anon/demo", "video")

    receive do
      {:moqx_subscribe_ok, %MOQX.SubscribeOk{handle: ^handle}} -> :ok
    end

    receive do
      {:moqx_object, %MOQX.ObjectReceived{handle: ^handle, object: %MOQX.Object{} = obj}} -> obj
    end

    :ok = MOQX.unsubscribe(handle)

# `async_error_message`

```elixir
@type async_error_message() ::
  {:moqx_request_error, MOQX.RequestError.t()}
  | {:moqx_transport_error, MOQX.TransportError.t()}
```

Common asynchronous error message families.

# `broadcast`

```elixir
@opaque broadcast()
```

Opaque broadcast resource returned by `publish/2`.

# `catalog_payload`

```elixir
@type catalog_payload() :: binary()
```

Raw CMSF catalog payload bytes (UTF-8 JSON).

# `close_subgroup_opt`

```elixir
@type close_subgroup_opt() :: {:end_of_group, boolean()}
```

Options for `close_subgroup/2`.

# `connect_message`

```elixir
@type connect_message() ::
  {:moqx_connect_ok, MOQX.ConnectOk.t()} | async_error_message()
```

Connection messages delivered to the caller process.

# `connect_opt`

```elixir
@type connect_opt() :: {:role, role()} | {:tls, [tls_opt()]}
```

# `connect_ref`

```elixir
@opaque connect_ref()
```

Opaque connect correlation reference returned by `connect/2`.

# `extension`

```elixir
@type extension() :: {non_neg_integer(), non_neg_integer() | binary()}
```

Extension header on send or receive. Even types carry varints; odd types carry binaries.

# `fetch_group_order`

```elixir
@type fetch_group_order() :: :original | :ascending | :descending
```

Requested group ordering for fetch delivery.

# `fetch_location`

```elixir
@type fetch_location() :: {non_neg_integer(), non_neg_integer()}
```

Fetch start or end location as `{group_id, object_id}`.

# `fetch_message`

```elixir
@type fetch_message() ::
  {:moqx_fetch_ok, MOQX.FetchOk.t()}
  | {:moqx_fetch_object, MOQX.FetchObject.t()}
  | {:moqx_fetch_done, MOQX.FetchDone.t()}
  | async_error_message()
```

Fetch lifecycle messages delivered to the caller process.

# `fetch_opt`

```elixir
@type fetch_opt() ::
  {:priority, 0..255}
  | {:group_order, fetch_group_order()}
  | {:start, fetch_location()}
  | {:end, fetch_location()}
```

Fetch options accepted by `fetch/4`.

# `fetch_ref`

```elixir
@type fetch_ref() :: reference()
```

Opaque fetch correlation reference returned by `fetch/4`.

# `flush_ref`

```elixir
@opaque flush_ref()
```

Opaque flush correlation reference returned by `flush_subgroup/1`.

# `object_status`

```elixir
@type object_status() :: :normal | :does_not_exist | :end_of_group | :end_of_track
```

Object status for `write_object/4` and received objects.

# `open_subgroup_opt`

```elixir
@type open_subgroup_opt() ::
  {:subgroup_id, subgroup_id()}
  | {:priority, 0..255}
  | {:end_of_group, boolean()}
  | {:extensions_present, boolean()}
```

Options for `open_subgroup/3`.

# `publish_message`

```elixir
@type publish_message() ::
  {:moqx_publish_ok, MOQX.PublishOk.t()} | async_error_message()
```

Publish namespace readiness messages delivered to the caller process.

# `publish_ref`

```elixir
@opaque publish_ref()
```

Opaque publish correlation reference returned by `publish/2`.

# `role`

```elixir
@type role() :: :publisher | :subscriber
```

Publisher or subscriber session role.

# `session`

```elixir
@opaque session()
```

Opaque session resource returned via `%MOQX.ConnectOk{}`.

# `subgroup_handle`

```elixir
@opaque subgroup_handle()
```

Opaque subgroup handle returned by `open_subgroup/3`.

# `subgroup_id`

```elixir
@type subgroup_id() :: nil | non_neg_integer()
```

Subgroup id convention, mirroring moqtail-ts:

  * `nil` — first-object-id mode (wire format omits the subgroup id; receivers
    infer it from the first object's id)
  * `0` — fixed-zero mode (wire format has no subgroup id field either; receivers
    default to 0)
  * any positive integer — explicit subgroup id carried on the wire

# `subgroup_message`

```elixir
@type subgroup_message() ::
  {:moqx_flush_ok, MOQX.FlushDone.t()} | async_error_message()
```

Publish-side subgroup messages delivered to the caller process.

# `subscribe_message`

```elixir
@type subscribe_message() ::
  {:moqx_subscribe_ok, MOQX.SubscribeOk.t()}
  | {:moqx_track_init, MOQX.TrackInit.t()}
  | {:moqx_object, MOQX.ObjectReceived.t()}
  | {:moqx_end_of_group, MOQX.EndOfGroup.t()}
  | {:moqx_publish_done, MOQX.PublishDone.t()}
  | async_error_message()
```

Subscription messages delivered to the caller process.

# `subscribe_opt`

```elixir
@type subscribe_opt() ::
  {:delivery_timeout_ms, non_neg_integer()}
  | {:init_data, binary()}
  | {:track_meta, map()}
  | {:track, MOQX.Catalog.Track.t()}
```

Subscribe options accepted by `subscribe/4` and `subscribe_track/4`.

# `subscription_handle`

```elixir
@type subscription_handle() :: reference()
```

Opaque subscription handle returned by `subscribe/3,4`.

Holds the internal state needed to cancel the subscription via
`unsubscribe/1`. When the last reference to the handle is garbage
collected, the subscription is automatically canceled.

# `tls_opt`

```elixir
@type tls_opt() :: {:verify, tls_verify()} | {:cacertfile, String.t()}
```

TLS connect options.

# `tls_verify`

```elixir
@type tls_verify() :: :verify_peer | :insecure
```

TLS verification mode for relay connections.

# `track`

```elixir
@opaque track()
```

Opaque track resource returned by `create_track/2`.

# `track_message`

```elixir
@type track_message() ::
  {:moqx_track_active, MOQX.TrackActive.t()}
  | {:moqx_track_closed, MOQX.TrackClosed.t()}
  | async_error_message()
```

Publish-side track lifecycle messages delivered to the caller process.

# `version`

```elixir
@type version() :: String.t()
```

MOQ protocol version string, e.g. `"moq-transport-14"`.

# `write_datagram_opt`

```elixir
@type write_datagram_opt() ::
  {:group_id, non_neg_integer()}
  | {:object_id, non_neg_integer()}
  | {:priority, 0..255}
  | {:extensions, [extension()]}
  | {:end_of_group, boolean()}
```

Options for `write_datagram/3`.

# `write_object_opt`

```elixir
@type write_object_opt() :: {:status, object_status()} | {:extensions, [extension()]}
```

Options for `write_object/4`.

# `close`

```elixir
@spec close(session()) :: :ok
```

Closes a session.

# `close_subgroup`

```elixir
@spec close_subgroup(subgroup_handle(), [close_subgroup_opt()]) ::
  :ok | {:error, MOQX.RequestError.t()}
```

Closes a subgroup and finishes its underlying uni-stream.

## Options

  * `:end_of_group` — when `true`, emits an end-of-group marker object before
    finishing the stream. Requires the subgroup to have been opened with
    `end_of_group: true` (otherwise the on-wire header variant would be
    inconsistent).

Dropping the handle (garbage collection) triggers a plain close without the
end-of-group marker, matching the semantics of `close_subgroup(handle,
end_of_group: false)`.

# `connect`

```elixir
@spec connect(String.t(), [connect_opt()]) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
```

Connects to a relay with an explicit role.

Prefer `connect_publisher/2` and `connect_subscriber/2` unless you need to
select the role dynamically.

Supported options:

- `:role` - required, `:publisher` or `:subscriber`
- `:tls` - optional TLS controls:
  - `verify: :verify_peer | :insecure` (defaults to `:verify_peer`)
  - `cacertfile: "/path/to/rootCA.pem"` to trust a custom root CA PEM

`connect/2` is the dynamic-role entrypoint only. There is no supported merged
publisher/subscriber session mode, and listener/server APIs remain out of scope.

Returns `{:ok, connect_ref}` immediately. The caller later receives a
`t:connect_message/0` correlated by that ref.

# `connect_publisher`

```elixir
@spec connect_publisher(String.t(), Keyword.t()) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
```

Connects a publisher session.

Accepts the same options as `connect/2`, except `:role` is fixed to `:publisher`.
Returns `{:ok, connect_ref}` immediately.

# `connect_subscriber`

```elixir
@spec connect_subscriber(String.t(), Keyword.t()) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
```

Connects a subscriber session.

Accepts the same options as `connect/2`, except `:role` is fixed to `:subscriber`.
Returns `{:ok, connect_ref}` immediately.

# `create_track`

```elixir
@spec create_track(broadcast(), String.t()) ::
  {:ok, track()} | {:error, MOQX.RequestError.t()}
```

Creates a named track inside a broadcast.

Track activation is asynchronous. The caller receives
`{:moqx_track_active, %MOQX.TrackActive{track: track, ...}}` once relay-side
subscribe activation is observed for this track.

# `fetch`

```elixir
@spec fetch(session(), String.t(), String.t(), [fetch_opt()]) ::
  {:ok, fetch_ref()} | {:error, MOQX.RequestError.t()}
```

Submits a raw fetch request on a subscriber session.

Returns `{:ok, ref}` immediately after the request is accepted for submission.
The caller later receives a `t:fetch_message/0` stream correlated by `ref`.

Misuse errors, such as calling this with a publisher session, are returned as
`{:error, %MOQX.RequestError{...}}` immediately.

# `finish_track`

```elixir
@spec finish_track(track()) :: :ok | {:error, MOQX.RequestError.t()}
```

Finishes a track.

Subscribers eventually receive terminal lifecycle via
`{:moqx_publish_done, %MOQX.PublishDone{...}}`.

The track owner process receives
`{:moqx_track_closed, %MOQX.TrackClosed{track: track, ...}}` exactly once.

After `finish_track/1`, further writes on the same track handle fail
synchronously with typed request error `code: :track_closed`.

# `flush_subgroup`

```elixir
@spec flush_subgroup(subgroup_handle()) ::
  {:ok, flush_ref()} | {:error, MOQX.RequestError.t()}
```

Requests a flush of the subgroup's underlying QUIC stream.

Returns `{:ok, flush_ref}` immediately. The caller later receives
`{:moqx_flush_ok, %MOQX.FlushDone{...}}` when the flush completes, or
`{:moqx_transport_error, %MOQX.TransportError{...}}` on failure.

# `open_subgroup`

```elixir
@spec open_subgroup(track(), non_neg_integer(), [open_subgroup_opt()]) ::
  {:ok, subgroup_handle()} | {:error, MOQX.RequestError.t()}
```

Opens a subgroup on a publishing track.

Returns `{:ok, handle}` synchronously (the QUIC uni-stream is opened
asynchronously). Any async failure during stream open, write, flush, or close
arrives later as `{:moqx_transport_error, %MOQX.TransportError{...}}`.

Synchronous lifecycle gating applies before stream open and failures return
typed request errors with `code: :track_not_active | :track_closed`.

`group_id` is an explicit non-negative integer chosen by the caller. Multiple
calls with the same `group_id` but different `:subgroup_id` open parallel
subgroup streams within the same group.

## Options

  * `:subgroup_id` — `nil | 0 | pos_integer` (default `0`). `nil` selects the
    first-object-id mode (wire omits the subgroup id; receivers infer it from
    the first object). `0` selects the fixed-zero mode. Any positive integer
    is carried explicitly on the wire.

  * `:priority` — `0..255` publisher priority (default `0`).

  * `:end_of_group` — when `true`, the chosen subgroup header variant signals
    that an end-of-group marker will be emitted. You must call
    `close_subgroup(handle, end_of_group: true)` to actually write the marker.
    (default `false`)

  * `:extensions_present` — when `true`, the subgroup header declares that
    every object on this stream carries an extensions block (possibly empty).
    Required if any `write_object/4` on this subgroup will pass
    `:extensions`. (default `false`)

# `publish`

```elixir
@spec publish(session(), String.t()) ::
  {:ok, publish_ref()} | {:error, MOQX.RequestError.t()}
```

Submits publish-namespace announcement for the given path on a publisher session.

Returns `{:ok, publish_ref}` immediately. The broadcast becomes usable only
after the caller receives:

- `{:moqx_publish_ok, %MOQX.PublishOk{ref: publish_ref, broadcast: broadcast, ...}}`

Failures are delivered as typed async errors correlated by `publish_ref`:

- `{:moqx_request_error, %MOQX.RequestError{op: :publish, ref: publish_ref, ...}}`
- `{:moqx_transport_error, %MOQX.TransportError{op: :publish, ref: publish_ref, ...}}`

This explicit lifecycle prevents downstream code from creating tracks before
the relay acknowledges namespace readiness.

# `subscribe`

```elixir
@spec subscribe(session(), String.t(), String.t()) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
```

Subscribes a subscriber session to one track in a broadcast.

Returns `{:ok, handle}` immediately. The caller later receives a
`t:subscribe_message/0` stream correlated by that `handle`:

- `{:moqx_subscribe_ok, %MOQX.SubscribeOk{...}}` when active
- `{:moqx_track_init, %MOQX.TrackInit{...}}` once per subscription
- `{:moqx_object, %MOQX.ObjectReceived{...}}` for each delivered object
- `{:moqx_end_of_group, %MOQX.EndOfGroup{...}}` when a subgroup or datagram signals end-of-group
- `{:moqx_publish_done, %MOQX.PublishDone{...}}` on terminal lifecycle
- `{:moqx_request_error, %MOQX.RequestError{...}}` for request rejection
- `{:moqx_transport_error, %MOQX.TransportError{...}}` for runtime failures

Supported options:

- `:delivery_timeout_ms` -- draft-14 MOQT DELIVERY_TIMEOUT in milliseconds
  (encoded as parameter `0x02` on `SUBSCRIBE`)
- `:init_data` -- binary init segment/configuration to surface in `:moqx_track_init`
- `:track_meta` -- map surfaced in `:moqx_track_init`
- `:track` -- `%MOQX.Catalog.Track{}` convenience; fills `:init_data` and `:track_meta`

Misuse errors, such as calling this with a publisher session, are returned as
`{:error, %MOQX.RequestError{...}}` immediately.

# `subscribe`

```elixir
@spec subscribe(session(), String.t(), String.t(), [subscribe_opt()]) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
```

Same as `subscribe/3`, with explicit subscription options.

# `subscribe_track`

```elixir
@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t()) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
```

Subscribes using a `%MOQX.Catalog.Track{}`.

This convenience helper derives `track_name`, `init_data`, and `track_meta`
from the provided track and forwards to `subscribe/4`.

# `subscribe_track`

```elixir
@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t(), [subscribe_opt()]) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
```

Same as `subscribe_track/3`, with explicit options.

`:track` is not accepted here (it is implied by the `track` argument).

# `unsubscribe`

```elixir
@spec unsubscribe(subscription_handle()) :: :ok
```

Cancels an active track subscription.

Sends MOQ `Unsubscribe` to the relay and removes local subscription state.
The caller may still receive `{:moqx_publish_done, %MOQX.PublishDone{...}}`
once the relay acknowledges with `PublishDone`.

Idempotent: repeated calls (and calls after the subscription has already
ended) return `:ok` without sending further control traffic.

Dropping the handle (garbage collection) triggers the same cleanup, so
short-lived subscribing processes do not need to call this explicitly.

# `write_datagram`

```elixir
@spec write_datagram(track(), binary() | MOQX.NativeBinary.t(), [write_datagram_opt()]) ::
  :ok | {:error, MOQX.RequestError.t()}
```

Writes one object as a draft-14 MOQT object datagram.

This is the explicit low-latency, loss-tolerant publish path for small
objects. It does not fragment payloads and does not auto-select datagrams
based on payload size.

Required options:

  * `:group_id` — non-negative integer group id
  * `:object_id` — non-negative integer object id

Optional options:

  * `:priority` — `0..255` publisher priority (default `0`)
  * `:extensions` — per-object extension headers (default `[]`)
  * `:end_of_group` — whether this datagram marks the last object in the
    group (default `false`)

Subscribers receive the existing `{:moqx_object, %MOQX.ObjectReceived{...}}`
family, with `%MOQX.Object{transport: :datagram}`.

# `write_frame`

```elixir
@spec write_frame(track(), binary()) :: :ok | {:error, MOQX.RequestError.t()}
```

Writes one frame to a track.

Convenience wrapper over the subgroup primitives: opens a subgroup with
subgroup id `0`, writes one object, closes the stream. Each call creates the
next group in that track.

The call is synchronously gated by track lifecycle and returns typed request
errors with `code: :track_not_active | :track_closed` when not writable.

Subscribers receive `{:moqx_object, handle, %MOQX.Object{group_id: group_seq,
subgroup_id: 0, object_id: 0, status: :normal, transport: :subgroup,
payload: data}}`.

For fine-grained control over subgroup streams use `open_subgroup/3` +
`write_object/4` + `close_subgroup/2`.

For datagram delivery semantics use `write_datagram/3` explicitly; this
helper does not auto-route small payloads to datagrams.

# `write_object`

```elixir
@spec write_object(
  subgroup_handle(),
  non_neg_integer(),
  binary() | MOQX.NativeBinary.t(),
  [write_object_opt()]
) :: :ok | {:error, MOQX.RequestError.t()}
```

Writes one object to an open subgroup.

Returns `:ok` after the bytes are queued to the underlying Tokio runtime.
Any async failure arrives as `{:moqx_transport_error, %MOQX.TransportError{...}}`.

`object_id` must be strictly greater than the previous object's id on the
same subgroup. Pick `0` for the first object and increment monotonically.

## Options

  * `:status` — one of `:normal | :does_not_exist | :end_of_group |
    :end_of_track` (default `:normal`). For `:normal`, the `payload` is sent
    as-is. For marker statuses, the payload is ignored on the wire (the
    object is a zero-length status marker).

  * `:extensions` — per-object extension headers (default `[]`).

---

*Consult [api-reference.md](api-reference.md) for complete listing*
