macula_stream_v1 (macula v3.13.0)

View Source

Macula streaming RPC — single-stream state machine.

Owns one streaming RPC's state. Each call_stream, open_stream, or server-side handler invocation gets its own macula_stream gen_server. Phase 1 only supports the LOCAL dispatch path: the client-side and server-side macula_stream processes live in the same BEAM and are paired with pair/2. Phase 2 swaps the local peer for a QUIC stream owned by macula_mesh_client; the public API stays unchanged.

See PLAN_MACULA_STREAMING.md (macula-architecture/plans).

Summary

Functions

Abort the stream with a STREAM_ERROR frame. Both sides close; any pending recv/await_reply waiters receive {error, {Code, Message}}.

Attach a remote (QUIC) peer to this stream. Used by macula_mesh_client to wire a stream to its QUIC-carrier: all deliveries out of this stream will be encoded as STREAM_* frames and sent through Client's relay connection, tagged with StreamId. Deliveries INTO this stream still arrive via the deliver_chunk/end/error/reply casts below — mesh_client decodes the incoming frames and forwards them to the local stream pid it tracks for this StreamId.

Wait for the terminal reply (client-stream / bidi).

Close both sides. Idempotent.

Half-close the write side. Recv side stays open.

Deliver a chunk frame from the peer.

Deliver a STREAM_END frame from the peer.

Deliver a STREAM_ERROR frame from the peer.

Deliver a STREAM_REPLY frame from the peer.

Inspect stream state (debugging).

Pair two stream processes as peers (Phase 1 local dispatch).

Receive the next chunk (blocks indefinitely).

Send a binary chunk on the stream.

Server-side: emit a terminal error as the reply value.

Server-side: emit the terminal reply.

Start a stream gen_server.

Types

chunk/0

-type chunk() :: binary() | {raw, binary()} | {term, term()}.

encoding/0

-type encoding() :: raw | msgpack.

mode/0

-type mode() :: server_stream | client_stream | bidi.

peer/0

-type peer() :: undefined | {local, pid()} | {remote, pid(), stream_id()}.

result/0

-type result() :: {ok, term()} | {error, term()}.

role/0

-type role() :: client | server.

stream_id/0

-type stream_id() :: binary().

Functions

abort(Pid, Code, Message)

-spec abort(pid(), binary(), binary()) -> ok.

Abort the stream with a STREAM_ERROR frame. Both sides close; any pending recv/await_reply waiters receive {error, {Code, Message}}.

attach_remote(StreamPid, ClientPid, StreamId)

-spec attach_remote(pid(), pid(), stream_id()) -> ok.

Attach a remote (QUIC) peer to this stream. Used by macula_mesh_client to wire a stream to its QUIC-carrier: all deliveries out of this stream will be encoded as STREAM_* frames and sent through Client's relay connection, tagged with StreamId. Deliveries INTO this stream still arrive via the deliver_chunk/end/error/reply casts below — mesh_client decodes the incoming frames and forwards them to the local stream pid it tracks for this StreamId.

await_reply(Pid)

-spec await_reply(pid()) -> result().

Wait for the terminal reply (client-stream / bidi).

await_reply(Pid, Timeout)

-spec await_reply(pid(), timeout()) -> result() | {error, timeout}.

close(Pid)

-spec close(pid()) -> ok.

Close both sides. Idempotent.

close_send(Pid)

-spec close_send(pid()) -> ok.

Half-close the write side. Recv side stays open.

deliver_chunk(Pid, Encoding, Body)

-spec deliver_chunk(pid(), encoding(), term()) -> ok.

Deliver a chunk frame from the peer.

deliver_end(Pid, Role)

-spec deliver_end(pid(), send | both) -> ok.

Deliver a STREAM_END frame from the peer.

deliver_error(Pid, Code, Message)

-spec deliver_error(pid(), binary(), binary()) -> ok.

Deliver a STREAM_ERROR frame from the peer.

deliver_reply(Pid, Result)

-spec deliver_reply(pid(), result()) -> ok.

Deliver a STREAM_REPLY frame from the peer.

handle_call(Msg, From, State)

handle_cast(Msg, State)

handle_info(Msg, State)

info(Pid)

-spec info(pid()) -> map().

Inspect stream state (debugging).

init(Opts)

pair(A, B)

-spec pair(pid(), pid()) -> ok.

Pair two stream processes as peers (Phase 1 local dispatch).

recv(Pid)

-spec recv(pid()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

Receive the next chunk (blocks indefinitely).

recv(Pid, Timeout)

-spec recv(pid(), timeout()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

send(Pid, Bin)

-spec send(pid(), binary()) -> ok | {error, term()}.

Send a binary chunk on the stream.

send(Pid, Body, _)

-spec send(pid(), binary() | term(), encoding()) -> ok | {error, term()}.

set_error(Pid, Reason)

-spec set_error(pid(), term()) -> ok.

Server-side: emit a terminal error as the reply value.

set_reply(Pid, Result)

-spec set_reply(pid(), term()) -> ok.

Server-side: emit the terminal reply.

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start a stream gen_server.

Required opts: id, role, mode, owner.

terminate(Reason, State)