macula (macula v0.20.3)
View SourceMacula - Main API for distributed workloads on Macula platform.
This is the ONLY module workload applications should import. It provides a stable, versioned API for all platform operations:
- Mesh networking (connect, publish, subscribe, RPC) - Platform Layer (leader election, CRDTs, workload registration) - Service discovery (DHT queries, node identity)
Quick Start
Connect to local platform and publish events:
{ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
ok = macula:publish(Client, <<"my.events">>, #{type => <<"test">>}).Architecture
Workload applications run in the same BEAM VM as the Macula platform. Use connect_local/1` to connect via process-to-process communication: ``` Workload App → macula:connect_local/1 → macula_gateway → Mesh (QUIC/HTTP3)''
Platform Layer (v0.10.0+)
Register with platform for coordination features:
{ok, #{leader_node := Leader}} = macula:register_workload(Client, #{
workload_name => <<"my_app">>
}).DHT Network Bootstrap
The platform handles DHT bootstrapping via MACULA_BOOTSTRAP_PEERS`. Workloads dont need to manage peer discovery.
Summary
Types
Arguments for RPC calls.
Reference to a connected Macula mesh client.
Event payload data. Typically a map that will be JSON-encoded.
Connection or operation options.
RPC procedure name. Example: "my.app.get_user".
Reference to an active subscription for unsubscribe operations.
Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).
Functions
Advertise a service that this client provides.
Advertise a service with options.
Make a synchronous RPC call.
Make an RPC call with options.
Make an RPC call to a specific target node.
Make an RPC call to a specific target node with options.
Connect to a Macula mesh network.
Connect to the local Macula gateway (for in-VM workloads).
Disconnect from the Macula mesh.
Discover subscribers to a topic via DHT query.
Ensure this node is running in distributed mode.
Get the Erlang cookie for the cluster.
Get the current Platform Layer leader node.
Get the node ID of this client.
Subscribe to node up/down events.
Propose a CRDT update to Platform Layer shared state.
Propose a CRDT update with specific CRDT type.
Publish an event to a topic.
Publish an event with options.
Read the current value of a CRDT-managed shared state entry.
Register this workload with the Platform Layer.
Set the Erlang cookie for this node and persist it.
Subscribe to a topic.
Subscribe to Platform Layer leader change notifications.
Stop advertising a service.
Unsubscribe from node up/down events.
Unsubscribe from a topic.
Types
Arguments for RPC calls.
-type client() :: pid().
Reference to a connected Macula mesh client.
Event payload data. Typically a map that will be JSON-encoded.
-type options() :: map().
Connection or operation options.
-type procedure() :: binary().
RPC procedure name. Example: "my.app.get_user".
-type subscription_ref() :: reference().
Reference to an active subscription for unsubscribe operations.
-type topic() :: binary().
Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).
Functions
-spec advertise(Client :: client(), Procedure :: procedure(), Handler :: macula_service_registry:handler_fn()) -> {ok, reference()} | {error, Reason :: term()}.
Advertise a service that this client provides.
Registers a handler function for the specified procedure and advertises it to the DHT so other clients can discover and call it.
The handler function receives a map of arguments and must return {ok, Result} or {error, Reason}.
Options
ttl- Advertisement TTL in seconds (default: 300)metadata- Custom metadata map (default: #{})
Examples
%% Define a handler function
Handler = fun(#{user_id := UserId}) ->
{ok, #{user_id => UserId, name => <<"Alice">>}}
end.
%% Advertise the service
{ok, Ref} = macula:advertise(
Client,
<<"my.app.get_user">>,
Handler
).
%% Other clients can now call:
%% {ok, User} = macula:call(OtherClient, <<"my.app.get_user">>,
%% #{user_id => <<"user-123">>}).
-spec advertise(Client :: client(), Procedure :: procedure(), Handler :: macula_service_registry:handler_fn(), Opts :: options()) -> {ok, reference()} | {error, Reason :: term()}.
Advertise a service with options.
-spec call(Client :: client(), Procedure :: procedure(), Args :: args()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make a synchronous RPC call.
Calls a remote procedure and waits for the result.
Examples
%% Simple RPC call
{ok, User} = macula:call(Client, <<"my.app.get_user">>, #{
user_id => <<"user-123">>
}).
%% With timeout
{ok, Result} = macula:call(Client, <<"my.app.process">>,
#{data => <<"large">>},
#{timeout => 30000}).
-spec call(Client :: client(), Procedure :: procedure(), Args :: args(), Opts :: options()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make an RPC call with options.
-spec call_to(Client :: client(), TargetNodeId :: binary(), Procedure :: procedure(), Args :: args()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make an RPC call to a specific target node.
Unlike call/4 which discovers any provider via DHT, this function sends the RPC directly to the specified target node. Use this when you already know which node provides the service (e.g., from a previous DHT discovery, a specific publisher node, or direct node advertisement).
The message is still routed via DHT infrastructure (for NAT traversal, relay fallback, etc.), but it targets a specific node rather than discovering one.
Examples
%% Call a specific node (e.g., publisher node for package pull)
PublisherNodeId = <<...32 bytes...>>,
{ok, Manifest} = macula:call_to(Client, PublisherNodeId,
<<"packages.manifest.fetch">>,
#{image_ref => <<"my.app:1.0.0">>}).
%% With timeout
{ok, Result} = macula:call_to(Client, TargetNodeId,
<<"my.procedure">>, Args,
#{timeout => 30000}).
-spec call_to(Client :: client(), TargetNodeId :: binary(), Procedure :: procedure(), Args :: args(), Opts :: options()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make an RPC call to a specific target node with options.
Connect to a Macula mesh network.
Creates a new HTTP/3 (QUIC) connection to the specified mesh endpoint.
Options
realm- Required. Binary realm identifier (e.g.,<<"my.app.realm">>)auth- Optional. Authentication map withapi_keyor other auth methodstimeout- Optional. Connection timeout in milliseconds (default: 5000)node_id- Optional. 32-byte node ID (generated if not provided)
Examples
%% Basic connection
{ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
realm => <<"my.realm">>
}).
%% With API key authentication
{ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
realm => <<"my.realm">>,
auth => #{api_key => <<"secret-key">>}
}).
Connect to the local Macula gateway (for in-VM workloads).
This function is used by applications running in the same BEAM VM as the Macula platform. Instead of creating a QUIC connection to localhost, it connects directly to the local macula_gateway process via process-to-process communication.
Architecture
Phoenix/Elixir App → macula_local_client → macula_gateway
↓ (QUIC)
Other PeersWhen to Use
- ✅ Use
connect_local/1when your application runs in the same VM as Macula - ✅ Phoenix applications deployed with Macula in the same container
- ❌ Do NOT use
connect/2with localhost URL - it creates unnecessary QUIC overhead
Options
realm- Required. Binary realm identifier (e.g.,<<"my.app.realm">>)event_handler- Optional. PID to receive events (default: caller PID)
Examples
%% Elixir Phoenix application
{:ok, client} = :macula.connect_local(%{
realm: "macula.arcade.dev"
})
%% Erlang application
{ok, Client} = macula:connect_local(#{
realm => <<"my.app.realm">>
}).
Disconnect from the Macula mesh.
Cleanly closes the HTTP/3 connection and cleans up all subscriptions.
-spec discover_subscribers(Client :: client(), Topic :: topic()) -> {ok, [#{node_id := binary(), endpoint := binary()}]} | {error, Reason :: term()}.
Discover subscribers to a topic via DHT query.
Queries the DHT for all nodes subscribed to the given topic. Returns a list of subscriber nodes with their node IDs and endpoints.
This is used for P2P discovery before sending direct messages.
-spec ensure_distributed() -> ok | {error, term()}.
Ensure this node is running in distributed mode.
If the node is already distributed, returns ok immediately. Otherwise, starts distribution with a generated node name.
This function is used by bc_gitops to delegate cluster setup to the Macula platform when available.
Examples:
ok = macula:ensure_distributed().
-spec get_cookie() -> atom().
Get the Erlang cookie for the cluster.
Resolves the cookie from various sources in priority order: 1. Application env: {macula, [{cookie, CookieValue}]} 2. Environment variable: MACULA_COOKIE or RELEASE_COOKIE 3. User's ~/.erlang.cookie file 4. Auto-generated (persisted to ~/.erlang.cookie)
Examples:
Cookie = macula:get_cookie().
Get the current Platform Layer leader node.
Queries the Platform Layer for the current leader node ID. The leader is elected via Raft consensus and handles coordination tasks.
Returns {error, no_leader} if leader election is in progress.
Examples
case macula:get_leader(Client) of
{ok, LeaderNodeId} ->
%% Check if we're the leader
{ok, OurNodeId} = macula:get_node_id(Client),
case LeaderNodeId == OurNodeId of
true -> coordinate_globally();
false -> defer_to_leader()
end;
{error, no_leader} ->
wait_for_leader_election()
end.
Get the node ID of this client.
Returns the 32-byte node ID assigned to this client.
-spec monitor_nodes() -> ok.
Subscribe to node up/down events.
After calling this function, the calling process will receive {nodeup, Node} and {nodedown, Node} messages when nodes join or leave the cluster.
Examples:
ok = macula:monitor_nodes().
receive
{nodeup, Node} -> handle_node_up(Node);
{nodedown, Node} -> handle_node_down(Node)
end.
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term()) -> ok | {error, Reason :: term()}.
Propose a CRDT update to Platform Layer shared state.
Updates platform-managed shared state using Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution across nodes.
Default CRDT type is lww_register (Last-Write-Wins Register). See propose_crdt_update/4 for other CRDT types.
Examples
%% Store simple value (LWW-Register)
ok = macula:propose_crdt_update(
Client,
<<"my.app.config.max_users">>,
1000
).
%% Later read it back
{ok, 1000} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term(), Opts :: options()) -> ok | {error, Reason :: term()}.
Propose a CRDT update with specific CRDT type.
Updates platform-managed shared state using the specified CRDT type for automatic conflict resolution.
CRDT Types
lww_register- Last-Write-Wins Register (default)- Use for: Configuration values, latest status
- Conflict resolution: Latest timestamp wins
g_counter- Grow-Only Counter- Use for: Metrics, totals (never decrease)
- Operations: increment only
pn_counter- Positive-Negative Counter- Use for: Bidirectional counters (can increase/decrease)
- Operations: increment, decrement
g_set- Grow-Only Set- Use for: Accumulating collections (never remove)
- Operations: add elements only
or_set- Observed-Remove Set- Use for: Sets with add/remove operations
- Operations: add, remove elements
Examples
%% Increment a counter
ok = macula:propose_crdt_update(
Client,
<<"my.app.active_games">>,
{increment, 1},
#{crdt_type => pn_counter}
).
%% Add to a set
ok = macula:propose_crdt_update(
Client,
<<"my.app.player_ids">>,
{add, <<"player123">>},
#{crdt_type => or_set}
).
-spec publish(Client :: client(), Topic :: topic(), Data :: event_data()) -> ok | {error, Reason :: term()}.
Publish an event to a topic.
Publishes data to the specified topic. All subscribers to this topic will receive the event.
Topic Design
Topics should describe EVENT TYPES, not entity instances:
- Good:
<<"my.app.user.registered">>(event type) - Bad:
<<"my.app.user.123.registered">>(entity ID in topic)
Entity IDs belong in the event payload, not the topic name.
Examples
%% Publish with default options
ok = macula:publish(Client, <<"my.app.events">>, #{
type => <<"user.registered">>,
user_id => <<"user-123">>,
email => <<"user@example.com">>
}).
%% Publish with options
ok = macula:publish(Client, <<"my.app.events">>, #{
data => <<"important">>
}, #{acknowledge => true}).
-spec publish(Client :: client(), Topic :: topic(), Data :: event_data(), Opts :: options()) -> ok | {error, Reason :: term()}.
Publish an event with options.
This is fire-and-forget - returns ok immediately without blocking. Uses gen_server:cast to avoid blocking the caller (prevents LiveView freezes). Both macula_local_client and macula_peer handle {publish_async, ...} casts.
Read the current value of a CRDT-managed shared state entry.
Reads from the local CRDT replica. The value reflects all converged updates from across the platform cluster.
Returns {error, not_found} if the key has never been written.
Examples
%% Read LWW-Register value
{ok, MaxUsers} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
%% Read counter value
{ok, GameCount} = macula:read_crdt(Client, <<"my.app.active_games">>).
%% Read set value
{ok, PlayerSet} = macula:read_crdt(Client, <<"my.app.player_ids">>).
-spec register_workload(Client :: client(), Opts :: options()) -> {ok, map()} | {error, Reason :: term()}.
Register this workload with the Platform Layer.
Registers the workload application with Macula's Platform Layer and returns information about the current platform cluster state, including the current leader node.
Options
workload_name- Required. Binary name identifying this workload type (e.g.,<<"macula_arcade">>,<<"my_app">>)capabilities- Optional. List of atoms describing workload capabilities (e.g.,[coordinator, game_server])
Returns
leader_node- Binary node ID of the current Platform Layer leadercluster_size- Integer count of nodes in the platform clusterplatform_version- Binary version string (e.g.,<<"0.10.0">>)
Examples
{ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
{ok, Info} = macula:register_workload(Client, #{
workload_name => <<"my_app_coordinator">>,
capabilities => [coordinator, matchmaking]
}),
#{leader_node := Leader, cluster_size := Size} = Info.
Set the Erlang cookie for this node and persist it.
Sets the cookie for the current node and attempts to persist it to ~/.erlang.cookie for future sessions.
Examples:
ok = macula:set_cookie(my_secret_cookie).
-spec subscribe(Client :: client(), Topic :: topic(), Callback :: fun((event_data()) -> ok)) -> {ok, subscription_ref()} | {error, Reason :: term()}.
Subscribe to a topic.
Subscribes to events on the specified topic. The callback function will be invoked for each event received.
Callback Function
The callback receives the event data and should return ok.
Examples
%% Simple subscription
{ok, SubRef} = macula:subscribe(Client, <<"my.app.events">>,
fun(EventData) ->
io:format("Event: ~p~n", [EventData]),
ok
end).
%% Unsubscribe later
ok = macula:unsubscribe(Client, SubRef).
-spec subscribe_leader_changes(Client :: client(), Callback :: fun((map()) -> ok)) -> {ok, subscription_ref()} | {error, Reason :: term()}.
Subscribe to Platform Layer leader change notifications.
Registers a callback function to be invoked whenever the Platform Layer leader changes due to election or node failure.
The callback receives a map with:
old_leader- Previous leader node ID (may beundefined)new_leader- New leader node IDterm- Raft term number (monotonically increasing)
Examples
{ok, SubRef} = macula:subscribe_leader_changes(Client,
fun(#{old_leader := Old, new_leader := New}) ->
io:format("Leader changed: ~p -> ~p~n", [Old, New]),
handle_leadership_transition(New),
ok
end).
Stop advertising a service.
Removes the local handler and stops advertising to the DHT.
Examples
ok = macula:unadvertise(Client, <<"my.app.get_user">>).
-spec unmonitor_nodes() -> ok.
Unsubscribe from node up/down events.
Stops the calling process from receiving nodeup/nodedown messages.
Examples:
ok = macula:unmonitor_nodes().
-spec unsubscribe(Client :: client(), SubRef :: subscription_ref()) -> ok | {error, Reason :: term()}.
Unsubscribe from a topic.
Removes the subscription identified by the subscription reference.