macula (macula v0.20.3)

View Source

Macula - Main API for distributed workloads on Macula platform.

This is the ONLY module workload applications should import. It provides a stable, versioned API for all platform operations:

- Mesh networking (connect, publish, subscribe, RPC) - Platform Layer (leader election, CRDTs, workload registration) - Service discovery (DHT queries, node identity)

Quick Start

Connect to local platform and publish events:

   {ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
   ok = macula:publish(Client, <<"my.events">>, #{type => <<"test">>}).

Architecture

Workload applications run in the same BEAM VM as the Macula platform. Use connect_local/1` to connect via process-to-process communication: ``` Workload App → macula:connect_local/1 → macula_gateway → Mesh (QUIC/HTTP3)''

Platform Layer (v0.10.0+)

Register with platform for coordination features:

   {ok, #{leader_node := Leader}} = macula:register_workload(Client, #{
       workload_name => <<"my_app">>
   }).

DHT Network Bootstrap

The platform handles DHT bootstrapping via MACULA_BOOTSTRAP_PEERS`. Workloads dont need to manage peer discovery.

Summary

Types

Arguments for RPC calls.

Reference to a connected Macula mesh client.

Event payload data. Typically a map that will be JSON-encoded.

Connection or operation options.

RPC procedure name. Example: "my.app.get_user".

Reference to an active subscription for unsubscribe operations.

Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).

Functions

Advertise a service that this client provides.

Advertise a service with options.

Make a synchronous RPC call.

Make an RPC call with options.

Make an RPC call to a specific target node.

Make an RPC call to a specific target node with options.

Connect to a Macula mesh network.

Connect to the local Macula gateway (for in-VM workloads).

Disconnect from the Macula mesh.

Discover subscribers to a topic via DHT query.

Ensure this node is running in distributed mode.

Get the Erlang cookie for the cluster.

Get the current Platform Layer leader node.

Get the node ID of this client.

Subscribe to node up/down events.

Propose a CRDT update to Platform Layer shared state.

Propose a CRDT update with specific CRDT type.

Publish an event to a topic.

Publish an event with options.

Read the current value of a CRDT-managed shared state entry.

Register this workload with the Platform Layer.

Set the Erlang cookie for this node and persist it.

Subscribe to a topic.

Subscribe to Platform Layer leader change notifications.

Stop advertising a service.

Unsubscribe from node up/down events.

Unsubscribe from a topic.

Types

args/0

-type args() :: map() | list() | binary().

Arguments for RPC calls.

client/0

-type client() :: pid().

Reference to a connected Macula mesh client.

event_data/0

-type event_data() :: map() | binary().

Event payload data. Typically a map that will be JSON-encoded.

options/0

-type options() :: map().

Connection or operation options.

procedure/0

-type procedure() :: binary().

RPC procedure name. Example: "my.app.get_user".

subscription_ref/0

-type subscription_ref() :: reference().

Reference to an active subscription for unsubscribe operations.

topic/0

-type topic() :: binary().

Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).

Functions

advertise(Client, Procedure, Handler)

-spec advertise(Client :: client(),
                Procedure :: procedure(),
                Handler :: macula_service_registry:handler_fn()) ->
                   {ok, reference()} | {error, Reason :: term()}.

Advertise a service that this client provides.

Registers a handler function for the specified procedure and advertises it to the DHT so other clients can discover and call it.

The handler function receives a map of arguments and must return {ok, Result} or {error, Reason}.

Options

  • ttl - Advertisement TTL in seconds (default: 300)
  • metadata - Custom metadata map (default: #{})

Examples

  %% Define a handler function
  Handler = fun(#{user_id := UserId}) ->
      {ok, #{user_id => UserId, name => <<"Alice">>}}
  end.
 
  %% Advertise the service
  {ok, Ref} = macula:advertise(
      Client,
      <<"my.app.get_user">>,
      Handler
  ).
 
  %% Other clients can now call:
  %% {ok, User} = macula:call(OtherClient, <<"my.app.get_user">>,
  %%     #{user_id => <<"user-123">>}).

advertise(Client, Procedure, Handler, Opts)

-spec advertise(Client :: client(),
                Procedure :: procedure(),
                Handler :: macula_service_registry:handler_fn(),
                Opts :: options()) ->
                   {ok, reference()} | {error, Reason :: term()}.

Advertise a service with options.

call(Client, Procedure, Args)

-spec call(Client :: client(), Procedure :: procedure(), Args :: args()) ->
              {ok, Result :: term()} | {error, Reason :: term()}.

Make a synchronous RPC call.

Calls a remote procedure and waits for the result.

Examples

  %% Simple RPC call
  {ok, User} = macula:call(Client, <<"my.app.get_user">>, #{
      user_id => <<"user-123">>
  }).
 
  %% With timeout
  {ok, Result} = macula:call(Client, <<"my.app.process">>,
      #{data => <<"large">>},
      #{timeout => 30000}).

call(Client, Procedure, Args, Opts)

-spec call(Client :: client(), Procedure :: procedure(), Args :: args(), Opts :: options()) ->
              {ok, Result :: term()} | {error, Reason :: term()}.

Make an RPC call with options.

call_to(Client, TargetNodeId, Procedure, Args)

-spec call_to(Client :: client(), TargetNodeId :: binary(), Procedure :: procedure(), Args :: args()) ->
                 {ok, Result :: term()} | {error, Reason :: term()}.

Make an RPC call to a specific target node.

Unlike call/4 which discovers any provider via DHT, this function sends the RPC directly to the specified target node. Use this when you already know which node provides the service (e.g., from a previous DHT discovery, a specific publisher node, or direct node advertisement).

The message is still routed via DHT infrastructure (for NAT traversal, relay fallback, etc.), but it targets a specific node rather than discovering one.

Examples

  %% Call a specific node (e.g., publisher node for package pull)
  PublisherNodeId = <<...32 bytes...>>,
  {ok, Manifest} = macula:call_to(Client, PublisherNodeId,
      <<"packages.manifest.fetch">>,
      #{image_ref => <<"my.app:1.0.0">>}).
 
  %% With timeout
  {ok, Result} = macula:call_to(Client, TargetNodeId,
      <<"my.procedure">>, Args,
      #{timeout => 30000}).

call_to(Client, TargetNodeId, Procedure, Args, Opts)

-spec call_to(Client :: client(),
              TargetNodeId :: binary(),
              Procedure :: procedure(),
              Args :: args(),
              Opts :: options()) ->
                 {ok, Result :: term()} | {error, Reason :: term()}.

Make an RPC call to a specific target node with options.

connect(Url, Opts)

-spec connect(Url :: binary(), Opts :: options()) -> {ok, client()} | {error, Reason :: term()}.

Connect to a Macula mesh network.

Creates a new HTTP/3 (QUIC) connection to the specified mesh endpoint.

Options

  • realm - Required. Binary realm identifier (e.g., <<"my.app.realm">>)
  • auth - Optional. Authentication map with api_key or other auth methods
  • timeout - Optional. Connection timeout in milliseconds (default: 5000)
  • node_id - Optional. 32-byte node ID (generated if not provided)

Examples

  %% Basic connection
  {ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
      realm => <<"my.realm">>
  }).
 
  %% With API key authentication
  {ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
      realm => <<"my.realm">>,
      auth => #{api_key => <<"secret-key">>}
  }).

connect_local(Opts)

(since v0.8.9)
-spec connect_local(Opts :: options()) -> {ok, client()} | {error, Reason :: term()}.

Connect to the local Macula gateway (for in-VM workloads).

This function is used by applications running in the same BEAM VM as the Macula platform. Instead of creating a QUIC connection to localhost, it connects directly to the local macula_gateway process via process-to-process communication.

Architecture

  Phoenix/Elixir App  macula_local_client  macula_gateway
                                               (QUIC)
                                         Other Peers

When to Use

  • ✅ Use connect_local/1 when your application runs in the same VM as Macula
  • ✅ Phoenix applications deployed with Macula in the same container
  • ❌ Do NOT use connect/2 with localhost URL - it creates unnecessary QUIC overhead

Options

  • realm - Required. Binary realm identifier (e.g., <<"my.app.realm">>)
  • event_handler - Optional. PID to receive events (default: caller PID)

Examples

  %% Elixir Phoenix application
  {:ok, client} = :macula.connect_local(%{
      realm: "macula.arcade.dev"
  })
 
  %% Erlang application
  {ok, Client} = macula:connect_local(#{
      realm => <<"my.app.realm">>
  }).

disconnect(Client)

-spec disconnect(Client :: client()) -> ok | {error, Reason :: term()}.

Disconnect from the Macula mesh.

Cleanly closes the HTTP/3 connection and cleans up all subscriptions.

discover_subscribers(Client, Topic)

-spec discover_subscribers(Client :: client(), Topic :: topic()) ->
                              {ok, [#{node_id := binary(), endpoint := binary()}]} |
                              {error, Reason :: term()}.

Discover subscribers to a topic via DHT query.

Queries the DHT for all nodes subscribed to the given topic. Returns a list of subscriber nodes with their node IDs and endpoints.

This is used for P2P discovery before sending direct messages.

ensure_distributed()

(since v0.16.3)
-spec ensure_distributed() -> ok | {error, term()}.

Ensure this node is running in distributed mode.

If the node is already distributed, returns ok immediately. Otherwise, starts distribution with a generated node name.

This function is used by bc_gitops to delegate cluster setup to the Macula platform when available.

Examples:

  ok = macula:ensure_distributed().

get_cookie()

(since v0.16.3)
-spec get_cookie() -> atom().

Get the Erlang cookie for the cluster.

Resolves the cookie from various sources in priority order: 1. Application env: {macula, [{cookie, CookieValue}]} 2. Environment variable: MACULA_COOKIE or RELEASE_COOKIE 3. User's ~/.erlang.cookie file 4. Auto-generated (persisted to ~/.erlang.cookie)

Examples:

  Cookie = macula:get_cookie().

get_leader(Client)

(since v0.10.0)
-spec get_leader(Client :: client()) -> {ok, binary()} | {error, no_leader | term()}.

Get the current Platform Layer leader node.

Queries the Platform Layer for the current leader node ID. The leader is elected via Raft consensus and handles coordination tasks.

Returns {error, no_leader} if leader election is in progress.

Examples

  case macula:get_leader(Client) of
      {ok, LeaderNodeId} ->
          %% Check if we're the leader
          {ok, OurNodeId} = macula:get_node_id(Client),
          case LeaderNodeId == OurNodeId of
              true -> coordinate_globally();
              false -> defer_to_leader()
          end;
      {error, no_leader} ->
          wait_for_leader_election()
  end.

get_node_id(Client)

-spec get_node_id(Client :: client()) -> {ok, binary()} | {error, Reason :: term()}.

Get the node ID of this client.

Returns the 32-byte node ID assigned to this client.

monitor_nodes()

(since v0.16.3)
-spec monitor_nodes() -> ok.

Subscribe to node up/down events.

After calling this function, the calling process will receive {nodeup, Node} and {nodedown, Node} messages when nodes join or leave the cluster.

Examples:

  ok = macula:monitor_nodes().
  receive
      {nodeup, Node} -> handle_node_up(Node);
      {nodedown, Node} -> handle_node_down(Node)
  end.

propose_crdt_update(Client, Key, Value)

(since v0.10.0)
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term()) ->
                             ok | {error, Reason :: term()}.

Propose a CRDT update to Platform Layer shared state.

Updates platform-managed shared state using Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution across nodes.

Default CRDT type is lww_register (Last-Write-Wins Register). See propose_crdt_update/4 for other CRDT types.

Examples

  %% Store simple value (LWW-Register)
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.config.max_users">>,
      1000
  ).
 
  %% Later read it back
  {ok, 1000} = macula:read_crdt(Client, <<"my.app.config.max_users">>).

propose_crdt_update(Client, Key, Value, Opts)

(since v0.10.0)
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term(), Opts :: options()) ->
                             ok | {error, Reason :: term()}.

Propose a CRDT update with specific CRDT type.

Updates platform-managed shared state using the specified CRDT type for automatic conflict resolution.

CRDT Types

  • lww_register - Last-Write-Wins Register (default)
    • Use for: Configuration values, latest status
    • Conflict resolution: Latest timestamp wins
  • g_counter - Grow-Only Counter
    • Use for: Metrics, totals (never decrease)
    • Operations: increment only
  • pn_counter - Positive-Negative Counter
    • Use for: Bidirectional counters (can increase/decrease)
    • Operations: increment, decrement
  • g_set - Grow-Only Set
    • Use for: Accumulating collections (never remove)
    • Operations: add elements only
  • or_set - Observed-Remove Set
    • Use for: Sets with add/remove operations
    • Operations: add, remove elements

Examples

  %% Increment a counter
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.active_games">>,
      {increment, 1},
      #{crdt_type => pn_counter}
  ).
 
  %% Add to a set
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.player_ids">>,
      {add, <<"player123">>},
      #{crdt_type => or_set}
  ).

publish(Client, Topic, Data)

-spec publish(Client :: client(), Topic :: topic(), Data :: event_data()) ->
                 ok | {error, Reason :: term()}.

Publish an event to a topic.

Publishes data to the specified topic. All subscribers to this topic will receive the event.

Topic Design

Topics should describe EVENT TYPES, not entity instances:

  • Good: <<"my.app.user.registered">> (event type)
  • Bad: <<"my.app.user.123.registered">> (entity ID in topic)

Entity IDs belong in the event payload, not the topic name.

Examples

  %% Publish with default options
  ok = macula:publish(Client, <<"my.app.events">>, #{
      type => <<"user.registered">>,
      user_id => <<"user-123">>,
      email => <<"user@example.com">>
  }).
 
  %% Publish with options
  ok = macula:publish(Client, <<"my.app.events">>, #{
      data => <<"important">>
  }, #{acknowledge => true}).

publish(Client, Topic, Data, Opts)

-spec publish(Client :: client(), Topic :: topic(), Data :: event_data(), Opts :: options()) ->
                 ok | {error, Reason :: term()}.

Publish an event with options.

This is fire-and-forget - returns ok immediately without blocking. Uses gen_server:cast to avoid blocking the caller (prevents LiveView freezes). Both macula_local_client and macula_peer handle {publish_async, ...} casts.

read_crdt(Client, Key)

(since v0.10.0)
-spec read_crdt(Client :: client(), Key :: binary()) -> {ok, term()} | {error, not_found | term()}.

Read the current value of a CRDT-managed shared state entry.

Reads from the local CRDT replica. The value reflects all converged updates from across the platform cluster.

Returns {error, not_found} if the key has never been written.

Examples

  %% Read LWW-Register value
  {ok, MaxUsers} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
 
  %% Read counter value
  {ok, GameCount} = macula:read_crdt(Client, <<"my.app.active_games">>).
 
  %% Read set value
  {ok, PlayerSet} = macula:read_crdt(Client, <<"my.app.player_ids">>).

register_workload(Client, Opts)

(since v0.10.0)
-spec register_workload(Client :: client(), Opts :: options()) ->
                           {ok, map()} | {error, Reason :: term()}.

Register this workload with the Platform Layer.

Registers the workload application with Macula's Platform Layer and returns information about the current platform cluster state, including the current leader node.

Options

  • workload_name - Required. Binary name identifying this workload type (e.g., <<"macula_arcade">>, <<"my_app">>)
  • capabilities - Optional. List of atoms describing workload capabilities (e.g., [coordinator, game_server])

Returns

  • leader_node - Binary node ID of the current Platform Layer leader
  • cluster_size - Integer count of nodes in the platform cluster
  • platform_version - Binary version string (e.g., <<"0.10.0">>)

Examples

  {ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
  {ok, Info} = macula:register_workload(Client, #{
      workload_name => <<"my_app_coordinator">>,
      capabilities => [coordinator, matchmaking]
  }),
  #{leader_node := Leader, cluster_size := Size} = Info.

set_cookie(Cookie)

(since v0.16.3)
-spec set_cookie(atom() | binary()) -> ok.

Set the Erlang cookie for this node and persist it.

Sets the cookie for the current node and attempts to persist it to ~/.erlang.cookie for future sessions.

Examples:

  ok = macula:set_cookie(my_secret_cookie).

subscribe(Client, Topic, Callback)

-spec subscribe(Client :: client(), Topic :: topic(), Callback :: fun((event_data()) -> ok)) ->
                   {ok, subscription_ref()} | {error, Reason :: term()}.

Subscribe to a topic.

Subscribes to events on the specified topic. The callback function will be invoked for each event received.

Callback Function

The callback receives the event data and should return ok.

Examples

  %% Simple subscription
  {ok, SubRef} = macula:subscribe(Client, <<"my.app.events">>,
      fun(EventData) ->
          io:format("Event: ~p~n", [EventData]),
          ok
      end).
 
  %% Unsubscribe later
  ok = macula:unsubscribe(Client, SubRef).

subscribe_leader_changes(Client, Callback)

(since v0.10.0)
-spec subscribe_leader_changes(Client :: client(), Callback :: fun((map()) -> ok)) ->
                                  {ok, subscription_ref()} | {error, Reason :: term()}.

Subscribe to Platform Layer leader change notifications.

Registers a callback function to be invoked whenever the Platform Layer leader changes due to election or node failure.

The callback receives a map with:

  • old_leader - Previous leader node ID (may be undefined)
  • new_leader - New leader node ID
  • term - Raft term number (monotonically increasing)

Examples

  {ok, SubRef} = macula:subscribe_leader_changes(Client,
      fun(#{old_leader := Old, new_leader := New}) ->
          io:format("Leader changed: ~p -> ~p~n", [Old, New]),
          handle_leadership_transition(New),
          ok
      end).

unadvertise(Client, Procedure)

-spec unadvertise(Client :: client(), Procedure :: procedure()) -> ok | {error, Reason :: term()}.

Stop advertising a service.

Removes the local handler and stops advertising to the DHT.

Examples

  ok = macula:unadvertise(Client, <<"my.app.get_user">>).

unmonitor_nodes()

(since v0.16.3)
-spec unmonitor_nodes() -> ok.

Unsubscribe from node up/down events.

Stops the calling process from receiving nodeup/nodedown messages.

Examples:

  ok = macula:unmonitor_nodes().

unsubscribe(Client, SubRef)

-spec unsubscribe(Client :: client(), SubRef :: subscription_ref()) -> ok | {error, Reason :: term()}.

Unsubscribe from a topic.

Removes the subscription identified by the subscription reference.