Elixir bindings for Media over QUIC (MOQ) via Rustler NIFs on top of
moqtail-rs.
MOQX is a thin, low-level, functional async API with explicit correlation
and typed async message families.
This module is intentionally the low-level core contract:
- it does not hide protocol state behind blocking calls
- it does not add automatic retries or buffering policy
- it does not manage caller mailboxes for you
- it prefers explicit correlation refs/handles and typed async events
Convenience helpers such as publish_catalog/2, update_catalog/2,
fetch_catalog/2, await_catalog/2, and await_track_active/2 live in
MOQX.Helpers on top of this contract. A future managed/stateful ergonomics
layer can also be built on top, but that is deliberately separate from the
MOQX core API itself.
Core design points:
- split roles only: publisher sessions publish and subscriber sessions subscribe
- explicit draft-14 protocol target via moqtail-rs / moqtail
- both subgroup-stream and object-datagram delivery are exposed explicitly
- async operations are correlated (
connect_ref,publish_ref,flush_ref,fetch_ref, subscription handle) - async outcomes are explicit and typed:
- lifecycle/success events (e.g.
%MOQX.ConnectOk{},%MOQX.SubscribeOk{}) - request-level failures (
{:moqx_request_error, %MOQX.RequestError{}}) - transport/runtime failures (
{:moqx_transport_error, %MOQX.TransportError{}})
- lifecycle/success events (e.g.
Example
{:ok, connect_ref} = MOQX.connect_subscriber("https://relay.example.com")
subscriber =
receive do
{:moqx_connect_ok, %MOQX.ConnectOk{ref: ^connect_ref, session: session}} -> session
{:moqx_request_error, %MOQX.RequestError{op: :connect, ref: ^connect_ref} = err} ->
raise "connect rejected: #{inspect(err)}"
{:moqx_transport_error, %MOQX.TransportError{op: :connect, ref: ^connect_ref} = err} ->
raise "connect runtime failure: #{inspect(err)}"
end
{:ok, handle} = MOQX.subscribe(subscriber, "anon/demo", "video")
receive do
{:moqx_subscribe_ok, %MOQX.SubscribeOk{handle: ^handle}} -> :ok
end
receive do
{:moqx_object, %MOQX.ObjectReceived{handle: ^handle, object: %MOQX.Object{} = obj}} -> obj
end
:ok = MOQX.unsubscribe(handle)
Summary
Types
Common asynchronous error message families.
Opaque broadcast resource returned by publish/2.
Raw CMSF catalog payload bytes (UTF-8 JSON).
Options for close_subgroup/2.
Connection messages delivered to the caller process.
Opaque connect correlation reference returned by connect/2.
Extension header on send or receive. Even types carry varints; odd types carry binaries.
Requested group ordering for fetch delivery.
Fetch start or end location as {group_id, object_id}.
Fetch lifecycle messages delivered to the caller process.
Fetch options accepted by fetch/4.
Opaque fetch correlation reference returned by fetch/4.
Opaque flush correlation reference returned by flush_subgroup/1.
Object status for write_object/4 and received objects.
Options for open_subgroup/3.
Publish namespace readiness messages delivered to the caller process.
Opaque publish correlation reference returned by publish/2.
Publisher or subscriber session role.
Opaque session resource returned via %MOQX.ConnectOk{}.
Opaque subgroup handle returned by open_subgroup/3.
Subgroup id convention, mirroring moqtail-ts
Publish-side subgroup messages delivered to the caller process.
Subscription messages delivered to the caller process.
Subscribe options accepted by subscribe/4 and subscribe_track/4.
Opaque subscription handle returned by subscribe/3,4.
TLS connect options.
TLS verification mode for relay connections.
Opaque track resource returned by create_track/2.
Publish-side track lifecycle messages delivered to the caller process.
MOQ protocol version string, e.g. "moq-transport-14".
Options for write_datagram/3.
Options for write_object/4.
Functions
Closes a session.
Closes a subgroup and finishes its underlying uni-stream.
Connects to a relay with an explicit role.
Connects a publisher session.
Connects a subscriber session.
Creates a named track inside a broadcast.
Submits a raw fetch request on a subscriber session.
Finishes a track.
Requests a flush of the subgroup's underlying QUIC stream.
Opens a subgroup on a publishing track.
Submits publish-namespace announcement for the given path on a publisher session.
Subscribes a subscriber session to one track in a broadcast.
Same as subscribe/3, with explicit subscription options.
Subscribes using a %MOQX.Catalog.Track{}.
Same as subscribe_track/3, with explicit options.
Cancels an active track subscription.
Writes one object as a draft-14 MOQT object datagram.
Writes one frame to a track.
Writes one object to an open subgroup.
Types
@type async_error_message() :: {:moqx_request_error, MOQX.RequestError.t()} | {:moqx_transport_error, MOQX.TransportError.t()}
Common asynchronous error message families.
@opaque broadcast()
Opaque broadcast resource returned by publish/2.
@type catalog_payload() :: binary()
Raw CMSF catalog payload bytes (UTF-8 JSON).
@type close_subgroup_opt() :: {:end_of_group, boolean()}
Options for close_subgroup/2.
@type connect_message() :: {:moqx_connect_ok, MOQX.ConnectOk.t()} | async_error_message()
Connection messages delivered to the caller process.
@opaque connect_ref()
Opaque connect correlation reference returned by connect/2.
@type extension() :: {non_neg_integer(), non_neg_integer() | binary()}
Extension header on send or receive. Even types carry varints; odd types carry binaries.
@type fetch_group_order() :: :original | :ascending | :descending
Requested group ordering for fetch delivery.
@type fetch_location() :: {non_neg_integer(), non_neg_integer()}
Fetch start or end location as {group_id, object_id}.
@type fetch_message() :: {:moqx_fetch_ok, MOQX.FetchOk.t()} | {:moqx_fetch_object, MOQX.FetchObject.t()} | {:moqx_fetch_done, MOQX.FetchDone.t()} | async_error_message()
Fetch lifecycle messages delivered to the caller process.
@type fetch_opt() :: {:priority, 0..255} | {:group_order, fetch_group_order()} | {:start, fetch_location()} | {:end, fetch_location()}
Fetch options accepted by fetch/4.
@type fetch_ref() :: reference()
Opaque fetch correlation reference returned by fetch/4.
@opaque flush_ref()
Opaque flush correlation reference returned by flush_subgroup/1.
@type object_status() :: :normal | :does_not_exist | :end_of_group | :end_of_track
Object status for write_object/4 and received objects.
@type open_subgroup_opt() :: {:subgroup_id, subgroup_id()} | {:priority, 0..255} | {:end_of_group, boolean()} | {:extensions_present, boolean()}
Options for open_subgroup/3.
@type publish_message() :: {:moqx_publish_ok, MOQX.PublishOk.t()} | async_error_message()
Publish namespace readiness messages delivered to the caller process.
@opaque publish_ref()
Opaque publish correlation reference returned by publish/2.
@type role() :: :publisher | :subscriber
Publisher or subscriber session role.
@opaque session()
Opaque session resource returned via %MOQX.ConnectOk{}.
@opaque subgroup_handle()
Opaque subgroup handle returned by open_subgroup/3.
@type subgroup_id() :: nil | non_neg_integer()
Subgroup id convention, mirroring moqtail-ts:
nil— first-object-id mode (wire format omits the subgroup id; receivers infer it from the first object's id)0— fixed-zero mode (wire format has no subgroup id field either; receivers default to 0)- any positive integer — explicit subgroup id carried on the wire
@type subgroup_message() :: {:moqx_flush_ok, MOQX.FlushDone.t()} | async_error_message()
Publish-side subgroup messages delivered to the caller process.
@type subscribe_message() :: {:moqx_subscribe_ok, MOQX.SubscribeOk.t()} | {:moqx_track_init, MOQX.TrackInit.t()} | {:moqx_object, MOQX.ObjectReceived.t()} | {:moqx_end_of_group, MOQX.EndOfGroup.t()} | {:moqx_publish_done, MOQX.PublishDone.t()} | async_error_message()
Subscription messages delivered to the caller process.
@type subscribe_opt() :: {:delivery_timeout_ms, non_neg_integer()} | {:init_data, binary()} | {:track_meta, map()} | {:track, MOQX.Catalog.Track.t()}
Subscribe options accepted by subscribe/4 and subscribe_track/4.
@type subscription_handle() :: reference()
Opaque subscription handle returned by subscribe/3,4.
Holds the internal state needed to cancel the subscription via
unsubscribe/1. When the last reference to the handle is garbage
collected, the subscription is automatically canceled.
@type tls_opt() :: {:verify, tls_verify()} | {:cacertfile, String.t()}
TLS connect options.
@type tls_verify() :: :verify_peer | :insecure
TLS verification mode for relay connections.
@opaque track()
Opaque track resource returned by create_track/2.
@type track_message() :: {:moqx_track_active, MOQX.TrackActive.t()} | {:moqx_track_closed, MOQX.TrackClosed.t()} | async_error_message()
Publish-side track lifecycle messages delivered to the caller process.
@type version() :: String.t()
MOQ protocol version string, e.g. "moq-transport-14".
@type write_datagram_opt() :: {:group_id, non_neg_integer()} | {:object_id, non_neg_integer()} | {:priority, 0..255} | {:extensions, [extension()]} | {:end_of_group, boolean()}
Options for write_datagram/3.
@type write_object_opt() :: {:status, object_status()} | {:extensions, [extension()]}
Options for write_object/4.
Functions
@spec close(session()) :: :ok
Closes a session.
@spec close_subgroup(subgroup_handle(), [close_subgroup_opt()]) :: :ok | {:error, MOQX.RequestError.t()}
Closes a subgroup and finishes its underlying uni-stream.
Options
:end_of_group— whentrue, emits an end-of-group marker object before finishing the stream. Requires the subgroup to have been opened withend_of_group: true(otherwise the on-wire header variant would be inconsistent).
Dropping the handle (garbage collection) triggers a plain close without the
end-of-group marker, matching the semantics of close_subgroup(handle, end_of_group: false).
@spec connect(String.t(), [connect_opt()]) :: {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
Connects to a relay with an explicit role.
Prefer connect_publisher/2 and connect_subscriber/2 unless you need to
select the role dynamically.
Supported options:
:role- required,:publisheror:subscriber:tls- optional TLS controls:verify: :verify_peer | :insecure(defaults to:verify_peer)cacertfile: "/path/to/rootCA.pem"to trust a custom root CA PEM
connect/2 is the dynamic-role entrypoint only. There is no supported merged
publisher/subscriber session mode, and listener/server APIs remain out of scope.
Returns {:ok, connect_ref} immediately. The caller later receives a
connect_message/0 correlated by that ref.
@spec connect_publisher(String.t(), Keyword.t()) :: {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
Connects a publisher session.
Accepts the same options as connect/2, except :role is fixed to :publisher.
Returns {:ok, connect_ref} immediately.
@spec connect_subscriber(String.t(), Keyword.t()) :: {:ok, connect_ref()} | {:error, MOQX.RequestError.t()}
Connects a subscriber session.
Accepts the same options as connect/2, except :role is fixed to :subscriber.
Returns {:ok, connect_ref} immediately.
@spec create_track(broadcast(), String.t()) :: {:ok, track()} | {:error, MOQX.RequestError.t()}
Creates a named track inside a broadcast.
Track activation is asynchronous. The caller receives
{:moqx_track_active, %MOQX.TrackActive{track: track, ...}} once relay-side
subscribe activation is observed for this track.
@spec fetch(session(), String.t(), String.t(), [fetch_opt()]) :: {:ok, fetch_ref()} | {:error, MOQX.RequestError.t()}
Submits a raw fetch request on a subscriber session.
Returns {:ok, ref} immediately after the request is accepted for submission.
The caller later receives a fetch_message/0 stream correlated by ref.
Misuse errors, such as calling this with a publisher session, are returned as
{:error, %MOQX.RequestError{...}} immediately.
@spec finish_track(track()) :: :ok | {:error, MOQX.RequestError.t()}
Finishes a track.
Subscribers eventually receive terminal lifecycle via
{:moqx_publish_done, %MOQX.PublishDone{...}}.
The track owner process receives
{:moqx_track_closed, %MOQX.TrackClosed{track: track, ...}} exactly once.
After finish_track/1, further writes on the same track handle fail
synchronously with typed request error code: :track_closed.
@spec flush_subgroup(subgroup_handle()) :: {:ok, flush_ref()} | {:error, MOQX.RequestError.t()}
Requests a flush of the subgroup's underlying QUIC stream.
Returns {:ok, flush_ref} immediately. The caller later receives
{:moqx_flush_ok, %MOQX.FlushDone{...}} when the flush completes, or
{:moqx_transport_error, %MOQX.TransportError{...}} on failure.
@spec open_subgroup(track(), non_neg_integer(), [open_subgroup_opt()]) :: {:ok, subgroup_handle()} | {:error, MOQX.RequestError.t()}
Opens a subgroup on a publishing track.
Returns {:ok, handle} synchronously (the QUIC uni-stream is opened
asynchronously). Any async failure during stream open, write, flush, or close
arrives later as {:moqx_transport_error, %MOQX.TransportError{...}}.
Synchronous lifecycle gating applies before stream open and failures return
typed request errors with code: :track_not_active | :track_closed.
group_id is an explicit non-negative integer chosen by the caller. Multiple
calls with the same group_id but different :subgroup_id open parallel
subgroup streams within the same group.
Options
:subgroup_id—nil | 0 | pos_integer(default0).nilselects the first-object-id mode (wire omits the subgroup id; receivers infer it from the first object).0selects the fixed-zero mode. Any positive integer is carried explicitly on the wire.:priority—0..255publisher priority (default0).:end_of_group— whentrue, the chosen subgroup header variant signals that an end-of-group marker will be emitted. You must callclose_subgroup(handle, end_of_group: true)to actually write the marker. (defaultfalse):extensions_present— whentrue, the subgroup header declares that every object on this stream carries an extensions block (possibly empty). Required if anywrite_object/4on this subgroup will pass:extensions. (defaultfalse)
@spec publish(session(), String.t()) :: {:ok, publish_ref()} | {:error, MOQX.RequestError.t()}
Submits publish-namespace announcement for the given path on a publisher session.
Returns {:ok, publish_ref} immediately. The broadcast becomes usable only
after the caller receives:
{:moqx_publish_ok, %MOQX.PublishOk{ref: publish_ref, broadcast: broadcast, ...}}
Failures are delivered as typed async errors correlated by publish_ref:
{:moqx_request_error, %MOQX.RequestError{op: :publish, ref: publish_ref, ...}}{:moqx_transport_error, %MOQX.TransportError{op: :publish, ref: publish_ref, ...}}
This explicit lifecycle prevents downstream code from creating tracks before the relay acknowledges namespace readiness.
@spec subscribe(session(), String.t(), String.t()) :: {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
Subscribes a subscriber session to one track in a broadcast.
Returns {:ok, handle} immediately. The caller later receives a
subscribe_message/0 stream correlated by that handle:
{:moqx_subscribe_ok, %MOQX.SubscribeOk{...}}when active{:moqx_track_init, %MOQX.TrackInit{...}}once per subscription{:moqx_object, %MOQX.ObjectReceived{...}}for each delivered object{:moqx_end_of_group, %MOQX.EndOfGroup{...}}when a subgroup or datagram signals end-of-group{:moqx_publish_done, %MOQX.PublishDone{...}}on terminal lifecycle{:moqx_request_error, %MOQX.RequestError{...}}for request rejection{:moqx_transport_error, %MOQX.TransportError{...}}for runtime failures
Supported options:
:delivery_timeout_ms-- draft-14 MOQT DELIVERY_TIMEOUT in milliseconds (encoded as parameter0x02onSUBSCRIBE):init_data-- binary init segment/configuration to surface in:moqx_track_init:track_meta-- map surfaced in:moqx_track_init:track--%MOQX.Catalog.Track{}convenience; fills:init_dataand:track_meta
Misuse errors, such as calling this with a publisher session, are returned as
{:error, %MOQX.RequestError{...}} immediately.
@spec subscribe(session(), String.t(), String.t(), [subscribe_opt()]) :: {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
Same as subscribe/3, with explicit subscription options.
@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t()) :: {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
Subscribes using a %MOQX.Catalog.Track{}.
This convenience helper derives track_name, init_data, and track_meta
from the provided track and forwards to subscribe/4.
@spec subscribe_track(session(), String.t(), MOQX.Catalog.Track.t(), [subscribe_opt()]) :: {:ok, subscription_handle()} | {:error, MOQX.RequestError.t()}
Same as subscribe_track/3, with explicit options.
:track is not accepted here (it is implied by the track argument).
@spec unsubscribe(subscription_handle()) :: :ok
Cancels an active track subscription.
Sends MOQ Unsubscribe to the relay and removes local subscription state.
The caller may still receive {:moqx_publish_done, %MOQX.PublishDone{...}}
once the relay acknowledges with PublishDone.
Idempotent: repeated calls (and calls after the subscription has already
ended) return :ok without sending further control traffic.
Dropping the handle (garbage collection) triggers the same cleanup, so short-lived subscribing processes do not need to call this explicitly.
@spec write_datagram(track(), binary() | MOQX.NativeBinary.t(), [write_datagram_opt()]) :: :ok | {:error, MOQX.RequestError.t()}
Writes one object as a draft-14 MOQT object datagram.
This is the explicit low-latency, loss-tolerant publish path for small objects. It does not fragment payloads and does not auto-select datagrams based on payload size.
Required options:
:group_id— non-negative integer group id:object_id— non-negative integer object id
Optional options:
:priority—0..255publisher priority (default0):extensions— per-object extension headers (default[]):end_of_group— whether this datagram marks the last object in the group (defaultfalse)
Subscribers receive the existing {:moqx_object, %MOQX.ObjectReceived{...}}
family, with %MOQX.Object{transport: :datagram}.
@spec write_frame(track(), binary()) :: :ok | {:error, MOQX.RequestError.t()}
Writes one frame to a track.
Convenience wrapper over the subgroup primitives: opens a subgroup with
subgroup id 0, writes one object, closes the stream. Each call creates the
next group in that track.
The call is synchronously gated by track lifecycle and returns typed request
errors with code: :track_not_active | :track_closed when not writable.
Subscribers receive {:moqx_object, handle, %MOQX.Object{group_id: group_seq, subgroup_id: 0, object_id: 0, status: :normal, transport: :subgroup, payload: data}}.
For fine-grained control over subgroup streams use open_subgroup/3 +
write_object/4 + close_subgroup/2.
For datagram delivery semantics use write_datagram/3 explicitly; this
helper does not auto-route small payloads to datagrams.
@spec write_object( subgroup_handle(), non_neg_integer(), binary() | MOQX.NativeBinary.t(), [write_object_opt()] ) :: :ok | {:error, MOQX.RequestError.t()}
Writes one object to an open subgroup.
Returns :ok after the bytes are queued to the underlying Tokio runtime.
Any async failure arrives as {:moqx_transport_error, %MOQX.TransportError{...}}.
object_id must be strictly greater than the previous object's id on the
same subgroup. Pick 0 for the first object and increment monotonically.
Options
:status— one of:normal | :does_not_exist | :end_of_group | :end_of_track(default:normal). For:normal, thepayloadis sent as-is. For marker statuses, the payload is ignored on the wire (the object is a zero-length status marker).:extensions— per-object extension headers (default[]).