Elixir bindings for Media over QUIC (MOQ) via Rustler NIFs on top of moqtail-rs.

MOQX is a thin, low-level, functional async API with explicit correlation and typed async message families.

This module is intentionally the low-level core contract:

  • it does not hide protocol state behind blocking calls
  • it does not add automatic retries or buffering policy
  • it does not manage caller mailboxes for you
  • it prefers explicit correlation refs/handles and typed async events

Convenience helpers such as publish_catalog/2, update_catalog/2, fetch_catalog/2, await_catalog/2, and await_track_active/2 live in MOQX.Helpers on top of this contract. A future managed/stateful ergonomics layer can also be built on top, but that is deliberately separate from the MOQX core API itself.

Core design points:

  • split roles only: publisher sessions publish and subscriber sessions subscribe
  • explicit draft-14 protocol target via moqtail-rs / moqtail
  • both subgroup-stream and object-datagram delivery are exposed explicitly
  • async operations are correlated (connect_ref, publish_ref, flush_ref, fetch_ref, subscription handle)
  • async outcomes are explicit and typed:
    • lifecycle/success events (e.g. %MOQX.ConnectOk{}, %MOQX.SubscribeOk{})
    • request-level failures ({:moqx_request_error, %MOQX.RequestError{}})
    • transport/runtime failures ({:moqx_transport_error, %MOQX.TransportError{}})

Example

{:ok, connect_ref} = MOQX.connect_subscriber("https://relay.example.com")

subscriber =
  receive do
    {:moqx_connect_ok, %MOQX.ConnectOk{ref: ^connect_ref, session: session}} -> session
    {:moqx_request_error, %MOQX.RequestError{op: :connect, ref: ^connect_ref} = err} ->
      raise "connect rejected: #{inspect(err)}"

    {:moqx_transport_error, %MOQX.TransportError{op: :connect, ref: ^connect_ref} = err} ->
      raise "connect runtime failure: #{inspect(err)}"
  end

{:ok, handle} = MOQX.subscribe(subscriber, "anon/demo", "video")

receive do
  {:moqx_subscribe_ok, %MOQX.SubscribeOk{handle: ^handle}} -> :ok
end

receive do
  {:moqx_object, %MOQX.ObjectReceived{handle: ^handle, object: %MOQX.Object{} = obj}} -> obj
end

:ok = MOQX.unsubscribe(handle)

Summary

Types

Common asynchronous error message families.

Opaque broadcast resource returned by publish/2.

Raw CMSF catalog payload bytes (UTF-8 JSON).

Connection messages delivered to the caller process.

Opaque connect correlation reference returned by connect/2.

Extension header on send or receive. Even types carry varints; odd types carry binaries.

Requested group ordering for fetch delivery.

Fetch start or end location as {group_id, object_id}.

Fetch lifecycle messages delivered to the caller process.

Fetch options accepted by fetch/4.

Opaque fetch correlation reference returned by fetch/4.

Opaque flush correlation reference returned by flush_subgroup/1.

Object status for write_object/4 and received objects.

Publish namespace readiness messages delivered to the caller process.

Opaque publish correlation reference returned by publish/2.

Publisher or subscriber session role.

Opaque session resource returned via %MOQX.ConnectOk{}.

Opaque subgroup handle returned by open_subgroup/3.

Subgroup id convention, mirroring moqtail-ts

Publish-side subgroup messages delivered to the caller process.

Subscription messages delivered to the caller process.

Subscribe options accepted by subscribe/4 and subscribe_track/4.

Opaque subscription handle returned by subscribe/3,4.

TLS connect options.

TLS verification mode for relay connections.

Opaque track resource returned by create_track/2.

Publish-side track lifecycle messages delivered to the caller process.

MOQ protocol version string, e.g. "moq-transport-14".

Functions

Closes a session.

Closes a subgroup and finishes its underlying uni-stream.

Connects to a relay with an explicit role.

Connects a publisher session.

Connects a subscriber session.

Creates a named track inside a broadcast.

Submits a raw fetch request on a subscriber session.

Finishes a track.

Requests a flush of the subgroup's underlying QUIC stream.

Opens a subgroup on a publishing track.

Submits publish-namespace announcement for the given path on a publisher session.

Subscribes a subscriber session to one track in a broadcast.

Same as subscribe/3, with explicit subscription options.

Subscribes using a %MOQX.Catalog.Track{}.

Cancels an active track subscription.

Writes one object as a draft-14 MOQT object datagram.

Writes one frame to a track.

Writes one object to an open subgroup.

Types

async_error_message()

@type async_error_message() ::
  {:moqx_request_error, MOQX.RequestError.t()}
  | {:moqx_transport_error, MOQX.TransportError.t()}

Common asynchronous error message families.

broadcast()

@opaque broadcast()

Opaque broadcast resource returned by publish/2.

catalog_payload()

@type catalog_payload() :: binary()

Raw CMSF catalog payload bytes (UTF-8 JSON).

close_subgroup_opt()

@type close_subgroup_opt() :: {:end_of_group, boolean()}

Options for close_subgroup/2.

connect_message()

@type connect_message() ::
  {:moqx_connect_ok, MOQX.ConnectOk.t()} | async_error_message()

Connection messages delivered to the caller process.

connect_opt()

@type connect_opt() :: {:role, role()} | {:tls, [tls_opt()]}

connect_ref()

@opaque connect_ref()

Opaque connect correlation reference returned by connect/2.

extension()

@type extension() :: {non_neg_integer(), non_neg_integer() | binary()}

Extension header on send or receive. Even types carry varints; odd types carry binaries.

fetch_group_order()

@type fetch_group_order() :: :original | :ascending | :descending

Requested group ordering for fetch delivery.

fetch_location()

@type fetch_location() :: {non_neg_integer(), non_neg_integer()}

Fetch start or end location as {group_id, object_id}.

fetch_message()

@type fetch_message() ::
  {:moqx_fetch_ok, MOQX.FetchOk.t()}
  | {:moqx_fetch_object, MOQX.FetchObject.t()}
  | {:moqx_fetch_done, MOQX.FetchDone.t()}
  | async_error_message()

Fetch lifecycle messages delivered to the caller process.

fetch_opt()

@type fetch_opt() ::
  {:priority, 0..255}
  | {:group_order, fetch_group_order()}
  | {:start, fetch_location()}
  | {:end, fetch_location()}

Fetch options accepted by fetch/4.

fetch_ref()

@type fetch_ref() :: reference()

Opaque fetch correlation reference returned by fetch/4.

flush_ref()

@opaque flush_ref()

Opaque flush correlation reference returned by flush_subgroup/1.

object_status()

@type object_status() :: :normal | :does_not_exist | :end_of_group | :end_of_track

Object status for write_object/4 and received objects.

open_subgroup_opt()

@type open_subgroup_opt() ::
  {:subgroup_id, subgroup_id()}
  | {:priority, 0..255}
  | {:end_of_group, boolean()}
  | {:extensions_present, boolean()}

Options for open_subgroup/3.

publish_message()

@type publish_message() ::
  {:moqx_publish_ok, MOQX.PublishOk.t()} | async_error_message()

Publish namespace readiness messages delivered to the caller process.

publish_ref()

@opaque publish_ref()

Opaque publish correlation reference returned by publish/2.

role()

@type role() :: :publisher | :subscriber

Publisher or subscriber session role.

session()

@opaque session()

Opaque session resource returned via %MOQX.ConnectOk{}.

subgroup_handle()

@opaque subgroup_handle()

Opaque subgroup handle returned by open_subgroup/3.

subgroup_id()

@type subgroup_id() :: nil | non_neg_integer()

Subgroup id convention, mirroring moqtail-ts:

  • nil — first-object-id mode (wire format omits the subgroup id; receivers infer it from the first object's id)
  • 0 — fixed-zero mode (wire format has no subgroup id field either; receivers default to 0)
  • any positive integer — explicit subgroup id carried on the wire

subgroup_message()

@type subgroup_message() ::
  {:moqx_flush_ok, MOQX.FlushDone.t()} | async_error_message()

Publish-side subgroup messages delivered to the caller process.

subscribe_message()

@type subscribe_message() ::
  {:moqx_subscribe_ok, MOQX.SubscribeOk.t()}
  | {:moqx_track_init, MOQX.TrackInit.t()}
  | {:moqx_object, MOQX.ObjectReceived.t()}
  | {:moqx_end_of_group, MOQX.EndOfGroup.t()}
  | {:moqx_publish_done, MOQX.PublishDone.t()}
  | async_error_message()

Subscription messages delivered to the caller process.

subscribe_opt()

@type subscribe_opt() ::
  {:delivery_timeout_ms, non_neg_integer()}
  | {:init_data, binary()}
  | {:track_meta, map()}
  | {:track, MOQX.Catalog.Track.t()}

Subscribe options accepted by subscribe/4 and subscribe_track/4.

subscription_handle()

@type subscription_handle() :: reference()

Opaque subscription handle returned by subscribe/3,4.

Holds the internal state needed to cancel the subscription via unsubscribe/1. When the last reference to the handle is garbage collected, the subscription is automatically canceled.

tls_opt()

@type tls_opt() :: {:verify, tls_verify()} | {:cacertfile, String.t()}

TLS connect options.

tls_verify()

@type tls_verify() :: :verify_peer | :insecure

TLS verification mode for relay connections.

track()

@opaque track()

Opaque track resource returned by create_track/2.

track_message()

@type track_message() ::
  {:moqx_track_active, MOQX.TrackActive.t()}
  | {:moqx_track_closed, MOQX.TrackClosed.t()}
  | async_error_message()

Publish-side track lifecycle messages delivered to the caller process.

version()

@type version() :: String.t()

MOQ protocol version string, e.g. "moq-transport-14".

write_datagram_opt()

@type write_datagram_opt() ::
  {:group_id, non_neg_integer()}
  | {:object_id, non_neg_integer()}
  | {:priority, 0..255}
  | {:extensions, [extension()]}
  | {:end_of_group, boolean()}

Options for write_datagram/3.

write_object_opt()

@type write_object_opt() :: {:status, object_status()} | {:extensions, [extension()]}

Options for write_object/4.

Functions

close(session)

@spec close(session()) :: :ok

Closes a session.

close_subgroup(subgroup, opts \\ [])

@spec close_subgroup(subgroup_handle(), [close_subgroup_opt()]) ::
  :ok | {:error, MOQX.RequestError.t()}

Closes a subgroup and finishes its underlying uni-stream.

Options

  • :end_of_group — when true, emits an end-of-group marker object before finishing the stream. Requires the subgroup to have been opened with end_of_group: true (otherwise the on-wire header variant would be inconsistent).

Dropping the handle (garbage collection) triggers a plain close without the end-of-group marker, matching the semantics of close_subgroup(handle, end_of_group: false).

connect(url, opts)

@spec connect(String.t(), [connect_opt()]) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}

Connects to a relay with an explicit role.

Prefer connect_publisher/2 and connect_subscriber/2 unless you need to select the role dynamically.

Supported options:

  • :role - required, :publisher or :subscriber
  • :tls - optional TLS controls:
    • verify: :verify_peer | :insecure (defaults to :verify_peer)

    • cacertfile: "/path/to/rootCA.pem" to trust a custom root CA PEM

connect/2 is the dynamic-role entrypoint only. There is no supported merged publisher/subscriber session mode, and listener/server APIs remain out of scope.

Returns {:ok, connect_ref} immediately. The caller later receives a connect_message/0 correlated by that ref.

connect_publisher(url, opts \\ [])

@spec connect_publisher(String.t(), Keyword.t()) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}

Connects a publisher session.

Accepts the same options as connect/2, except :role is fixed to :publisher. Returns {:ok, connect_ref} immediately.

connect_subscriber(url, opts \\ [])

@spec connect_subscriber(String.t(), Keyword.t()) ::
  {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}

Connects a subscriber session.

Accepts the same options as connect/2, except :role is fixed to :subscriber. Returns {:ok, connect_ref} immediately.

create_track(broadcast, track_name)

@spec create_track(broadcast(), String.t()) ::
  {:ok, track()} | {:error, MOQX.RequestError.t()}

Creates a named track inside a broadcast.

Track activation is asynchronous. The caller receives {:moqx_track_active, %MOQX.TrackActive{track: track, ...}} once relay-side subscribe activation is observed for this track.

fetch(session, namespace, track_name, opts \\ [])

@spec fetch(session(), String.t(), String.t(), [fetch_opt()]) ::
  {:ok, fetch_ref()} | {:error, MOQX.RequestError.t()}

Submits a raw fetch request on a subscriber session.

Returns {:ok, ref} immediately after the request is accepted for submission. The caller later receives a fetch_message/0 stream correlated by ref.

Misuse errors, such as calling this with a publisher session, are returned as {:error, %MOQX.RequestError{...}} immediately.

finish_track(track)

@spec finish_track(track()) :: :ok | {:error, MOQX.RequestError.t()}

Finishes a track.

Subscribers eventually receive terminal lifecycle via {:moqx_publish_done, %MOQX.PublishDone{...}}.

The track owner process receives {:moqx_track_closed, %MOQX.TrackClosed{track: track, ...}} exactly once.

After finish_track/1, further writes on the same track handle fail synchronously with typed request error code: :track_closed.

flush_subgroup(subgroup)

@spec flush_subgroup(subgroup_handle()) ::
  {:ok, flush_ref()} | {:error, MOQX.RequestError.t()}

Requests a flush of the subgroup's underlying QUIC stream.

Returns {:ok, flush_ref} immediately. The caller later receives {:moqx_flush_ok, %MOQX.FlushDone{...}} when the flush completes, or {:moqx_transport_error, %MOQX.TransportError{...}} on failure.

open_subgroup(track, group_id, opts \\ [])

@spec open_subgroup(track(), non_neg_integer(), [open_subgroup_opt()]) ::
  {:ok, subgroup_handle()} | {:error, MOQX.RequestError.t()}

Opens a subgroup on a publishing track.

Returns {:ok, handle} synchronously (the QUIC uni-stream is opened asynchronously). Any async failure during stream open, write, flush, or close arrives later as {:moqx_transport_error, %MOQX.TransportError{...}}.

Synchronous lifecycle gating applies before stream open and failures return typed request errors with code: :track_not_active | :track_closed.

group_id is an explicit non-negative integer chosen by the caller. Multiple calls with the same group_id but different :subgroup_id open parallel subgroup streams within the same group.

Options

  • :subgroup_idnil | 0 | pos_integer (default 0). nil selects the first-object-id mode (wire omits the subgroup id; receivers infer it from the first object). 0 selects the fixed-zero mode. Any positive integer is carried explicitly on the wire.

  • :priority0..255 publisher priority (default 0).

  • :end_of_group — when true, the chosen subgroup header variant signals that an end-of-group marker will be emitted. You must call close_subgroup(handle, end_of_group: true) to actually write the marker. (default false)

  • :extensions_present — when true, the subgroup header declares that every object on this stream carries an extensions block (possibly empty). Required if any write_object/4 on this subgroup will pass :extensions. (default false)

publish(session, broadcast_path)

@spec publish(session(), String.t()) ::
  {:ok, publish_ref()} | {:error, MOQX.RequestError.t()}

Submits publish-namespace announcement for the given path on a publisher session.

Returns {:ok, publish_ref} immediately. The broadcast becomes usable only after the caller receives:

  • {:moqx_publish_ok, %MOQX.PublishOk{ref: publish_ref, broadcast: broadcast, ...}}

Failures are delivered as typed async errors correlated by publish_ref:

  • {:moqx_request_error, %MOQX.RequestError{op: :publish, ref: publish_ref, ...}}
  • {:moqx_transport_error, %MOQX.TransportError{op: :publish, ref: publish_ref, ...}}

This explicit lifecycle prevents downstream code from creating tracks before the relay acknowledges namespace readiness.

subscribe(session, broadcast_path, track_name)

@spec subscribe(session(), String.t(), String.t()) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}

Subscribes a subscriber session to one track in a broadcast.

Returns {:ok, handle} immediately. The caller later receives a subscribe_message/0 stream correlated by that handle:

  • {:moqx_subscribe_ok, %MOQX.SubscribeOk{...}} when active
  • {:moqx_track_init, %MOQX.TrackInit{...}} once per subscription
  • {:moqx_object, %MOQX.ObjectReceived{...}} for each delivered object
  • {:moqx_end_of_group, %MOQX.EndOfGroup{...}} when a subgroup or datagram signals end-of-group
  • {:moqx_publish_done, %MOQX.PublishDone{...}} on terminal lifecycle
  • {:moqx_request_error, %MOQX.RequestError{...}} for request rejection
  • {:moqx_transport_error, %MOQX.TransportError{...}} for runtime failures

Supported options:

  • :delivery_timeout_ms -- draft-14 MOQT DELIVERY_TIMEOUT in milliseconds (encoded as parameter 0x02 on SUBSCRIBE)
  • :init_data -- binary init segment/configuration to surface in :moqx_track_init
  • :track_meta -- map surfaced in :moqx_track_init
  • :track -- %MOQX.Catalog.Track{} convenience; fills :init_data and :track_meta

Misuse errors, such as calling this with a publisher session, are returned as {:error, %MOQX.RequestError{...}} immediately.

subscribe(session, broadcast_path, track_name, opts)

@spec subscribe(session(), String.t(), String.t(), [subscribe_opt()]) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}

Same as subscribe/3, with explicit subscription options.

subscribe_track(session, broadcast_path, track)

@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t()) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}

Subscribes using a %MOQX.Catalog.Track{}.

This convenience helper derives track_name, init_data, and track_meta from the provided track and forwards to subscribe/4.

subscribe_track(session, broadcast_path, track, opts)

@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t(), [subscribe_opt()]) ::
  {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}

Same as subscribe_track/3, with explicit options.

:track is not accepted here (it is implied by the track argument).

unsubscribe(handle)

@spec unsubscribe(subscription_handle()) :: :ok

Cancels an active track subscription.

Sends MOQ Unsubscribe to the relay and removes local subscription state. The caller may still receive {:moqx_publish_done, %MOQX.PublishDone{...}} once the relay acknowledges with PublishDone.

Idempotent: repeated calls (and calls after the subscription has already ended) return :ok without sending further control traffic.

Dropping the handle (garbage collection) triggers the same cleanup, so short-lived subscribing processes do not need to call this explicitly.

write_datagram(track, payload, opts \\ [])

@spec write_datagram(track(), binary() | MOQX.NativeBinary.t(), [write_datagram_opt()]) ::
  :ok | {:error, MOQX.RequestError.t()}

Writes one object as a draft-14 MOQT object datagram.

This is the explicit low-latency, loss-tolerant publish path for small objects. It does not fragment payloads and does not auto-select datagrams based on payload size.

Required options:

  • :group_id — non-negative integer group id
  • :object_id — non-negative integer object id

Optional options:

  • :priority0..255 publisher priority (default 0)
  • :extensions — per-object extension headers (default [])
  • :end_of_group — whether this datagram marks the last object in the group (default false)

Subscribers receive the existing {:moqx_object, %MOQX.ObjectReceived{...}} family, with %MOQX.Object{transport: :datagram}.

write_frame(track, data)

@spec write_frame(track(), binary()) :: :ok | {:error, MOQX.RequestError.t()}

Writes one frame to a track.

Convenience wrapper over the subgroup primitives: opens a subgroup with subgroup id 0, writes one object, closes the stream. Each call creates the next group in that track.

The call is synchronously gated by track lifecycle and returns typed request errors with code: :track_not_active | :track_closed when not writable.

Subscribers receive {:moqx_object, handle, %MOQX.Object{group_id: group_seq, subgroup_id: 0, object_id: 0, status: :normal, transport: :subgroup, payload: data}}.

For fine-grained control over subgroup streams use open_subgroup/3 + write_object/4 + close_subgroup/2.

For datagram delivery semantics use write_datagram/3 explicitly; this helper does not auto-route small payloads to datagrams.

write_object(subgroup, object_id, payload, opts \\ [])

@spec write_object(
  subgroup_handle(),
  non_neg_integer(),
  binary() | MOQX.NativeBinary.t(),
  [write_object_opt()]
) :: :ok | {:error, MOQX.RequestError.t()}

Writes one object to an open subgroup.

Returns :ok after the bytes are queued to the underlying Tokio runtime. Any async failure arrives as {:moqx_transport_error, %MOQX.TransportError{...}}.

object_id must be strictly greater than the previous object's id on the same subgroup. Pick 0 for the first object and increment monotonically.

Options

  • :status — one of :normal | :does_not_exist | :end_of_group | :end_of_track (default :normal). For :normal, the payload is sent as-is. For marker statuses, the payload is ignored on the wire (the object is a zero-length status marker).

  • :extensions — per-object extension headers (default []).