macula (macula v0.14.3)
View SourceMacula - Main API for distributed workloads on Macula platform.
This is the ONLY module workload applications should import. It provides a stable, versioned API for all platform operations:
- Mesh networking (connect, publish, subscribe, RPC) - Platform Layer (leader election, CRDTs, workload registration) - Service discovery (DHT queries, node identity)
Quick Start
Connect to local platform and publish events:
{ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
ok = macula:publish(Client, <<"my.events">>, #{type => <<"test">>}).Architecture
Workload applications run in the same BEAM VM as the Macula platform. Use connect_local/1` to connect via process-to-process communication: ``` Workload App → macula:connect_local/1 → macula_gateway → Mesh (QUIC/HTTP3)''
Platform Layer (v0.10.0+)
Register with platform for coordination features:
{ok, #{leader_node := Leader}} = macula:register_workload(Client, #{
workload_name => <<"my_app">>
}).DHT Network Bootstrap
The platform handles DHT bootstrapping via MACULA_BOOTSTRAP_PEERS`. Workloads dont need to manage peer discovery.
Summary
Types
Arguments for RPC calls.
Reference to a connected Macula mesh client.
Event payload data. Typically a map that will be JSON-encoded.
Connection or operation options.
RPC procedure name. Example: "my.app.get_user".
Reference to an active subscription for unsubscribe operations.
Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).
Functions
Advertise a service that this client provides.
Advertise a service with options.
Make a synchronous RPC call.
Make an RPC call with options.
Connect to a Macula mesh network.
Connect to the local Macula gateway (for in-VM workloads).
Disconnect from the Macula mesh.
Discover subscribers to a topic via DHT query.
Get the current Platform Layer leader node.
Get the node ID of this client.
Propose a CRDT update to Platform Layer shared state.
Propose a CRDT update with specific CRDT type.
Publish an event to a topic.
Publish an event with options.
Read the current value of a CRDT-managed shared state entry.
Register this workload with the Platform Layer.
Subscribe to a topic.
Subscribe to Platform Layer leader change notifications.
Stop advertising a service.
Unsubscribe from a topic.
Types
Arguments for RPC calls.
-type client() :: pid().
Reference to a connected Macula mesh client.
Event payload data. Typically a map that will be JSON-encoded.
-type options() :: map().
Connection or operation options.
-type procedure() :: binary().
RPC procedure name. Example: "my.app.get_user".
-type subscription_ref() :: reference().
Reference to an active subscription for unsubscribe operations.
-type topic() :: binary().
Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).
Functions
-spec advertise(Client :: client(), Procedure :: procedure(), Handler :: macula_service_registry:handler_fn()) -> {ok, reference()} | {error, Reason :: term()}.
Advertise a service that this client provides.
Registers a handler function for the specified procedure and advertises it to the DHT so other clients can discover and call it.
The handler function receives a map of arguments and must return {ok, Result} or {error, Reason}.
Options
ttl- Advertisement TTL in seconds (default: 300)metadata- Custom metadata map (default: #{})
Examples
%% Define a handler function
Handler = fun(#{user_id := UserId}) ->
{ok, #{user_id => UserId, name => <<"Alice">>}}
end.
%% Advertise the service
{ok, Ref} = macula:advertise(
Client,
<<"my.app.get_user">>,
Handler
).
%% Other clients can now call:
%% {ok, User} = macula:call(OtherClient, <<"my.app.get_user">>,
%% #{user_id => <<"user-123">>}).
-spec advertise(Client :: client(), Procedure :: procedure(), Handler :: macula_service_registry:handler_fn(), Opts :: options()) -> {ok, reference()} | {error, Reason :: term()}.
Advertise a service with options.
-spec call(Client :: client(), Procedure :: procedure(), Args :: args()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make a synchronous RPC call.
Calls a remote procedure and waits for the result.
Examples
%% Simple RPC call
{ok, User} = macula:call(Client, <<"my.app.get_user">>, #{
user_id => <<"user-123">>
}).
%% With timeout
{ok, Result} = macula:call(Client, <<"my.app.process">>,
#{data => <<"large">>},
#{timeout => 30000}).
-spec call(Client :: client(), Procedure :: procedure(), Args :: args(), Opts :: options()) -> {ok, Result :: term()} | {error, Reason :: term()}.
Make an RPC call with options.
Connect to a Macula mesh network.
Creates a new HTTP/3 (QUIC) connection to the specified mesh endpoint.
Options
realm- Required. Binary realm identifier (e.g.,<<"my.app.realm">>)auth- Optional. Authentication map withapi_keyor other auth methodstimeout- Optional. Connection timeout in milliseconds (default: 5000)node_id- Optional. 32-byte node ID (generated if not provided)
Examples
%% Basic connection
{ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
realm => <<"my.realm">>
}).
%% With API key authentication
{ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
realm => <<"my.realm">>,
auth => #{api_key => <<"secret-key">>}
}).
Connect to the local Macula gateway (for in-VM workloads).
This function is used by applications running in the same BEAM VM as the Macula platform. Instead of creating a QUIC connection to localhost, it connects directly to the local macula_gateway process via process-to-process communication.
Architecture
Phoenix/Elixir App → macula_local_client → macula_gateway
↓ (QUIC)
Other PeersWhen to Use
- ✅ Use
connect_local/1when your application runs in the same VM as Macula - ✅ Phoenix applications deployed with Macula in the same container
- ❌ Do NOT use
connect/2with localhost URL - it creates unnecessary QUIC overhead
Options
realm- Required. Binary realm identifier (e.g.,<<"my.app.realm">>)event_handler- Optional. PID to receive events (default: caller PID)
Examples
%% Elixir Phoenix application
{:ok, client} = :macula.connect_local(%{
realm: "macula.arcade.dev"
})
%% Erlang application
{ok, Client} = macula:connect_local(#{
realm => <<"my.app.realm">>
}).
Disconnect from the Macula mesh.
Cleanly closes the HTTP/3 connection and cleans up all subscriptions.
-spec discover_subscribers(Client :: client(), Topic :: topic()) -> {ok, [#{node_id := binary(), endpoint := binary()}]} | {error, Reason :: term()}.
Discover subscribers to a topic via DHT query.
Queries the DHT for all nodes subscribed to the given topic. Returns a list of subscriber nodes with their node IDs and endpoints.
This is used for P2P discovery before sending direct messages.
Get the current Platform Layer leader node.
Queries the Platform Layer for the current leader node ID. The leader is elected via Raft consensus and handles coordination tasks.
Returns {error, no_leader} if leader election is in progress.
Examples
case macula:get_leader(Client) of
{ok, LeaderNodeId} ->
%% Check if we're the leader
{ok, OurNodeId} = macula:get_node_id(Client),
case LeaderNodeId == OurNodeId of
true -> coordinate_globally();
false -> defer_to_leader()
end;
{error, no_leader} ->
wait_for_leader_election()
end.
Get the node ID of this client.
Returns the 32-byte node ID assigned to this client.
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term()) -> ok | {error, Reason :: term()}.
Propose a CRDT update to Platform Layer shared state.
Updates platform-managed shared state using Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution across nodes.
Default CRDT type is lww_register (Last-Write-Wins Register). See propose_crdt_update/4 for other CRDT types.
Examples
%% Store simple value (LWW-Register)
ok = macula:propose_crdt_update(
Client,
<<"my.app.config.max_users">>,
1000
).
%% Later read it back
{ok, 1000} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term(), Opts :: options()) -> ok | {error, Reason :: term()}.
Propose a CRDT update with specific CRDT type.
Updates platform-managed shared state using the specified CRDT type for automatic conflict resolution.
CRDT Types
lww_register- Last-Write-Wins Register (default)- Use for: Configuration values, latest status
- Conflict resolution: Latest timestamp wins
g_counter- Grow-Only Counter- Use for: Metrics, totals (never decrease)
- Operations: increment only
pn_counter- Positive-Negative Counter- Use for: Bidirectional counters (can increase/decrease)
- Operations: increment, decrement
g_set- Grow-Only Set- Use for: Accumulating collections (never remove)
- Operations: add elements only
or_set- Observed-Remove Set- Use for: Sets with add/remove operations
- Operations: add, remove elements
Examples
%% Increment a counter
ok = macula:propose_crdt_update(
Client,
<<"my.app.active_games">>,
{increment, 1},
#{crdt_type => pn_counter}
).
%% Add to a set
ok = macula:propose_crdt_update(
Client,
<<"my.app.player_ids">>,
{add, <<"player123">>},
#{crdt_type => or_set}
).
-spec publish(Client :: client(), Topic :: topic(), Data :: event_data()) -> ok | {error, Reason :: term()}.
Publish an event to a topic.
Publishes data to the specified topic. All subscribers to this topic will receive the event.
Topic Design
Topics should describe EVENT TYPES, not entity instances:
- Good:
<<"my.app.user.registered">>(event type) - Bad:
<<"my.app.user.123.registered">>(entity ID in topic)
Entity IDs belong in the event payload, not the topic name.
Examples
%% Publish with default options
ok = macula:publish(Client, <<"my.app.events">>, #{
type => <<"user.registered">>,
user_id => <<"user-123">>,
email => <<"user@example.com">>
}).
%% Publish with options
ok = macula:publish(Client, <<"my.app.events">>, #{
data => <<"important">>
}, #{acknowledge => true}).
-spec publish(Client :: client(), Topic :: topic(), Data :: event_data(), Opts :: options()) -> ok | {error, Reason :: term()}.
Publish an event with options.
This is fire-and-forget - returns ok immediately without blocking. Uses gen_server:cast to avoid blocking the caller (prevents LiveView freezes). Both macula_local_client and macula_peer handle {publish_async, ...} casts.
Read the current value of a CRDT-managed shared state entry.
Reads from the local CRDT replica. The value reflects all converged updates from across the platform cluster.
Returns {error, not_found} if the key has never been written.
Examples
%% Read LWW-Register value
{ok, MaxUsers} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
%% Read counter value
{ok, GameCount} = macula:read_crdt(Client, <<"my.app.active_games">>).
%% Read set value
{ok, PlayerSet} = macula:read_crdt(Client, <<"my.app.player_ids">>).
-spec register_workload(Client :: client(), Opts :: options()) -> {ok, map()} | {error, Reason :: term()}.
Register this workload with the Platform Layer.
Registers the workload application with Macula's Platform Layer and returns information about the current platform cluster state, including the current leader node.
Options
workload_name- Required. Binary name identifying this workload type (e.g.,<<"macula_arcade">>,<<"my_app">>)capabilities- Optional. List of atoms describing workload capabilities (e.g.,[coordinator, game_server])
Returns
leader_node- Binary node ID of the current Platform Layer leadercluster_size- Integer count of nodes in the platform clusterplatform_version- Binary version string (e.g.,<<"0.10.0">>)
Examples
{ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
{ok, Info} = macula:register_workload(Client, #{
workload_name => <<"my_app_coordinator">>,
capabilities => [coordinator, matchmaking]
}),
#{leader_node := Leader, cluster_size := Size} = Info.
-spec subscribe(Client :: client(), Topic :: topic(), Callback :: fun((event_data()) -> ok)) -> {ok, subscription_ref()} | {error, Reason :: term()}.
Subscribe to a topic.
Subscribes to events on the specified topic. The callback function will be invoked for each event received.
Callback Function
The callback receives the event data and should return ok.
Examples
%% Simple subscription
{ok, SubRef} = macula:subscribe(Client, <<"my.app.events">>,
fun(EventData) ->
io:format("Event: ~p~n", [EventData]),
ok
end).
%% Unsubscribe later
ok = macula:unsubscribe(Client, SubRef).
-spec subscribe_leader_changes(Client :: client(), Callback :: fun((map()) -> ok)) -> {ok, subscription_ref()} | {error, Reason :: term()}.
Subscribe to Platform Layer leader change notifications.
Registers a callback function to be invoked whenever the Platform Layer leader changes due to election or node failure.
The callback receives a map with:
old_leader- Previous leader node ID (may beundefined)new_leader- New leader node IDterm- Raft term number (monotonically increasing)
Examples
{ok, SubRef} = macula:subscribe_leader_changes(Client,
fun(#{old_leader := Old, new_leader := New}) ->
io:format("Leader changed: ~p -> ~p~n", [Old, New]),
handle_leadership_transition(New),
ok
end).
Stop advertising a service.
Removes the local handler and stops advertising to the DHT.
Examples
ok = macula:unadvertise(Client, <<"my.app.get_user">>).
-spec unsubscribe(Client :: client(), SubRef :: subscription_ref()) -> ok | {error, Reason :: term()}.
Unsubscribe from a topic.
Removes the subscription identified by the subscription reference.