macula_client (macula v3.13.0)

View Source

macula_client — the canonical pool client.

Holds N peering links to N stations and routes ops with replication, subscription replay, and inbound-event dedup. Apps don't manage individual macula_station_link workers; they call macula_client (or the macula facade, which re-exports the public surface).

Per PLAN_V2_PARITY Q2 §1: pool is the canonical client handle. A single-station link is an internal worker only.

Lifecycle

  {ok, Pool} = macula_client:connect(Seeds, Opts).
  ok          = macula_client:publish(Pool, Realm, Topic, Payload, #{}).
  {ok, Sub}   = macula_client:subscribe(Pool, Realm, Topic, self(), #{}).
  receive {macula_event, Sub, Topic, Payload, Meta} -> ... end.
  ok          = macula_client:unsubscribe(Pool, Sub).
  ok          = macula_client:close(Pool).
  

Replication

publish/5 fans the PUBLISH frame to replication_factor (default 1) currently-spawned links. **Partial success counts as success** per PLAN_V2_PARITY §5.1.1: the call returns ok as soon as one link accepts the frame; the others are best-effort. When zero links are spawned the call returns {error, {transient, no_healthy_station}}.

subscribe/5 applies to every spawned link. The pool delivers a deduped event stream to the consumer regardless of which link relayed any given EVENT.

Dedup

Inbound EVENT frames are keyed by (Realm, Publisher, Seq) in an ETS table owned by the pool. The table is swept every dedup_sweep_ms (default 30s) for entries older than dedup_window_ms (default 60s).

Replay

When a link's process dies the pool monitor fires; the pool schedules a respawn after ?LINK_RESPAWN_DELAY_MS (1s). On respawn, the pool re-issues every currently-tracked (Realm, Topic) subscription against the new link via the internal macula_client_replay helper.

Summary

Functions

OTP child spec — drop the pool into a caller's supervision tree. Id is the supervisor child id.

Stop the pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message; every link terminates with the pool.

Spawn a pool with one link per seed. Returns immediately; link handshakes complete asynchronously. Publish/subscribe block until at least one link is connected (or fail with {error, {transient, no_healthy_station}} on the publish path).

Publish a frame to replication_factor currently-spawned links. Partial success = success. Realm is per-call (32 bytes).

Subscribe Subscriber to (Realm, Topic). The pool subscribes every currently-spawned link and dedupes inbound events before fan-out. Returns {ok, SubRef}; Subscriber receives {macula_event, SubRef, Topic, Payload, Meta} for each delivered event and {macula_event_gone, SubRef, Reason} once when the pool closes or the subscriber pid dies.

Drop a subscription. Idempotent — unknown SubRef is a no-op. The wire-level link subscription persists for the pool's lifetime (one wire sub per (Realm, Topic) multiplexed across local consumers); Phase 4 will tighten this.

Types

opts/0

-type opts() ::
          #{identity => macula_identity:key_pair(),
            replication_factor => pos_integer(),
            capabilities => non_neg_integer(),
            alpn => [binary()],
            connect_timeout_ms => pos_integer(),
            dedup_window_ms => non_neg_integer(),
            dedup_sweep_ms => pos_integer()}.

pool/0

-type pool() :: pid().

seed/0

-type seed() :: binary() | string() | #{host := binary() | string(), port := inet:port_number()}.

Functions

child_spec(Id, Seeds, Opts)

-spec child_spec(term(), [seed()], opts()) -> supervisor:child_spec().

OTP child spec — drop the pool into a caller's supervision tree. Id is the supervisor child id.

close(Pool)

-spec close(pool()) -> ok.

Stop the pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message; every link terminates with the pool.

code_change(OldVsn, S, Extra)

connect(Seeds, Opts)

-spec connect([seed()], opts()) -> {ok, pool()} | {error, term()}.

Spawn a pool with one link per seed. Returns immediately; link handshakes complete asynchronously. Publish/subscribe block until at least one link is connected (or fail with {error, {transient, no_healthy_station}} on the publish path).

handle_call(Req, From, S)

handle_cast(Msg, S)

handle_info(Other, S)

init(_)

publish(Pool, Realm, Topic, Payload, Opts)

-spec publish(pool(), <<_:256>>, binary(), term(), map()) -> ok | {error, term()}.

Publish a frame to replication_factor currently-spawned links. Partial success = success. Realm is per-call (32 bytes).

subscribe(Pool, Realm, Topic, Subscriber, Opts)

-spec subscribe(pool(), <<_:256>>, binary(), pid(), map()) -> {ok, reference()}.

Subscribe Subscriber to (Realm, Topic). The pool subscribes every currently-spawned link and dedupes inbound events before fan-out. Returns {ok, SubRef}; Subscriber receives {macula_event, SubRef, Topic, Payload, Meta} for each delivered event and {macula_event_gone, SubRef, Reason} once when the pool closes or the subscriber pid dies.

terminate(Reason, State)

unsubscribe(Pool, SubRef)

-spec unsubscribe(pool(), reference()) -> ok.

Drop a subscription. Idempotent — unknown SubRef is a no-op. The wire-level link subscription persists for the pool's lifetime (one wire sub per (Realm, Topic) multiplexed across local consumers); Phase 4 will tighten this.