macula (macula v0.14.3)

View Source

Macula - Main API for distributed workloads on Macula platform.

This is the ONLY module workload applications should import. It provides a stable, versioned API for all platform operations:

- Mesh networking (connect, publish, subscribe, RPC) - Platform Layer (leader election, CRDTs, workload registration) - Service discovery (DHT queries, node identity)

Quick Start

Connect to local platform and publish events:

   {ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
   ok = macula:publish(Client, <<"my.events">>, #{type => <<"test">>}).

Architecture

Workload applications run in the same BEAM VM as the Macula platform. Use connect_local/1` to connect via process-to-process communication: ``` Workload App → macula:connect_local/1 → macula_gateway → Mesh (QUIC/HTTP3)''

Platform Layer (v0.10.0+)

Register with platform for coordination features:

   {ok, #{leader_node := Leader}} = macula:register_workload(Client, #{
       workload_name => <<"my_app">>
   }).

DHT Network Bootstrap

The platform handles DHT bootstrapping via MACULA_BOOTSTRAP_PEERS`. Workloads dont need to manage peer discovery.

Summary

Types

Arguments for RPC calls.

Reference to a connected Macula mesh client.

Event payload data. Typically a map that will be JSON-encoded.

Connection or operation options.

RPC procedure name. Example: "my.app.get_user".

Reference to an active subscription for unsubscribe operations.

Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).

Functions

Advertise a service that this client provides.

Advertise a service with options.

Make a synchronous RPC call.

Make an RPC call with options.

Connect to a Macula mesh network.

Connect to the local Macula gateway (for in-VM workloads).

Disconnect from the Macula mesh.

Discover subscribers to a topic via DHT query.

Get the current Platform Layer leader node.

Get the node ID of this client.

Propose a CRDT update to Platform Layer shared state.

Propose a CRDT update with specific CRDT type.

Publish an event to a topic.

Publish an event with options.

Read the current value of a CRDT-managed shared state entry.

Register this workload with the Platform Layer.

Subscribe to a topic.

Subscribe to Platform Layer leader change notifications.

Stop advertising a service.

Unsubscribe from a topic.

Types

args/0

-type args() :: map() | list() | binary().

Arguments for RPC calls.

client/0

-type client() :: pid().

Reference to a connected Macula mesh client.

event_data/0

-type event_data() :: map() | binary().

Event payload data. Typically a map that will be JSON-encoded.

options/0

-type options() :: map().

Connection or operation options.

procedure/0

-type procedure() :: binary().

RPC procedure name. Example: "my.app.get_user".

subscription_ref/0

-type subscription_ref() :: reference().

Reference to an active subscription for unsubscribe operations.

topic/0

-type topic() :: binary().

Topic name for pub/sub operations. Topics should describe event types, not entity IDs. Example: "my.app.user.registered" (good), not "my.app.user.123.registered" (bad - ID belongs in payload).

Functions

advertise(Client, Procedure, Handler)

-spec advertise(Client :: client(),
                Procedure :: procedure(),
                Handler :: macula_service_registry:handler_fn()) ->
                   {ok, reference()} | {error, Reason :: term()}.

Advertise a service that this client provides.

Registers a handler function for the specified procedure and advertises it to the DHT so other clients can discover and call it.

The handler function receives a map of arguments and must return {ok, Result} or {error, Reason}.

Options

  • ttl - Advertisement TTL in seconds (default: 300)
  • metadata - Custom metadata map (default: #{})

Examples

  %% Define a handler function
  Handler = fun(#{user_id := UserId}) ->
      {ok, #{user_id => UserId, name => <<"Alice">>}}
  end.
 
  %% Advertise the service
  {ok, Ref} = macula:advertise(
      Client,
      <<"my.app.get_user">>,
      Handler
  ).
 
  %% Other clients can now call:
  %% {ok, User} = macula:call(OtherClient, <<"my.app.get_user">>,
  %%     #{user_id => <<"user-123">>}).

advertise(Client, Procedure, Handler, Opts)

-spec advertise(Client :: client(),
                Procedure :: procedure(),
                Handler :: macula_service_registry:handler_fn(),
                Opts :: options()) ->
                   {ok, reference()} | {error, Reason :: term()}.

Advertise a service with options.

call(Client, Procedure, Args)

-spec call(Client :: client(), Procedure :: procedure(), Args :: args()) ->
              {ok, Result :: term()} | {error, Reason :: term()}.

Make a synchronous RPC call.

Calls a remote procedure and waits for the result.

Examples

  %% Simple RPC call
  {ok, User} = macula:call(Client, <<"my.app.get_user">>, #{
      user_id => <<"user-123">>
  }).
 
  %% With timeout
  {ok, Result} = macula:call(Client, <<"my.app.process">>,
      #{data => <<"large">>},
      #{timeout => 30000}).

call(Client, Procedure, Args, Opts)

-spec call(Client :: client(), Procedure :: procedure(), Args :: args(), Opts :: options()) ->
              {ok, Result :: term()} | {error, Reason :: term()}.

Make an RPC call with options.

connect(Url, Opts)

-spec connect(Url :: binary(), Opts :: options()) -> {ok, client()} | {error, Reason :: term()}.

Connect to a Macula mesh network.

Creates a new HTTP/3 (QUIC) connection to the specified mesh endpoint.

Options

  • realm - Required. Binary realm identifier (e.g., <<"my.app.realm">>)
  • auth - Optional. Authentication map with api_key or other auth methods
  • timeout - Optional. Connection timeout in milliseconds (default: 5000)
  • node_id - Optional. 32-byte node ID (generated if not provided)

Examples

  %% Basic connection
  {ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
      realm => <<"my.realm">>
  }).
 
  %% With API key authentication
  {ok, Client} = macula:connect(<<"https://mesh.local:443">>, #{
      realm => <<"my.realm">>,
      auth => #{api_key => <<"secret-key">>}
  }).

connect_local(Opts)

(since v0.8.9)
-spec connect_local(Opts :: options()) -> {ok, client()} | {error, Reason :: term()}.

Connect to the local Macula gateway (for in-VM workloads).

This function is used by applications running in the same BEAM VM as the Macula platform. Instead of creating a QUIC connection to localhost, it connects directly to the local macula_gateway process via process-to-process communication.

Architecture

  Phoenix/Elixir App  macula_local_client  macula_gateway
                                               (QUIC)
                                         Other Peers

When to Use

  • ✅ Use connect_local/1 when your application runs in the same VM as Macula
  • ✅ Phoenix applications deployed with Macula in the same container
  • ❌ Do NOT use connect/2 with localhost URL - it creates unnecessary QUIC overhead

Options

  • realm - Required. Binary realm identifier (e.g., <<"my.app.realm">>)
  • event_handler - Optional. PID to receive events (default: caller PID)

Examples

  %% Elixir Phoenix application
  {:ok, client} = :macula.connect_local(%{
      realm: "macula.arcade.dev"
  })
 
  %% Erlang application
  {ok, Client} = macula:connect_local(#{
      realm => <<"my.app.realm">>
  }).

disconnect(Client)

-spec disconnect(Client :: client()) -> ok | {error, Reason :: term()}.

Disconnect from the Macula mesh.

Cleanly closes the HTTP/3 connection and cleans up all subscriptions.

discover_subscribers(Client, Topic)

-spec discover_subscribers(Client :: client(), Topic :: topic()) ->
                              {ok, [#{node_id := binary(), endpoint := binary()}]} |
                              {error, Reason :: term()}.

Discover subscribers to a topic via DHT query.

Queries the DHT for all nodes subscribed to the given topic. Returns a list of subscriber nodes with their node IDs and endpoints.

This is used for P2P discovery before sending direct messages.

get_leader(Client)

(since v0.10.0)
-spec get_leader(Client :: client()) -> {ok, binary()} | {error, no_leader | term()}.

Get the current Platform Layer leader node.

Queries the Platform Layer for the current leader node ID. The leader is elected via Raft consensus and handles coordination tasks.

Returns {error, no_leader} if leader election is in progress.

Examples

  case macula:get_leader(Client) of
      {ok, LeaderNodeId} ->
          %% Check if we're the leader
          {ok, OurNodeId} = macula:get_node_id(Client),
          case LeaderNodeId == OurNodeId of
              true -> coordinate_globally();
              false -> defer_to_leader()
          end;
      {error, no_leader} ->
          wait_for_leader_election()
  end.

get_node_id(Client)

-spec get_node_id(Client :: client()) -> {ok, binary()} | {error, Reason :: term()}.

Get the node ID of this client.

Returns the 32-byte node ID assigned to this client.

propose_crdt_update(Client, Key, Value)

(since v0.10.0)
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term()) ->
                             ok | {error, Reason :: term()}.

Propose a CRDT update to Platform Layer shared state.

Updates platform-managed shared state using Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution across nodes.

Default CRDT type is lww_register (Last-Write-Wins Register). See propose_crdt_update/4 for other CRDT types.

Examples

  %% Store simple value (LWW-Register)
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.config.max_users">>,
      1000
  ).
 
  %% Later read it back
  {ok, 1000} = macula:read_crdt(Client, <<"my.app.config.max_users">>).

propose_crdt_update(Client, Key, Value, Opts)

(since v0.10.0)
-spec propose_crdt_update(Client :: client(), Key :: binary(), Value :: term(), Opts :: options()) ->
                             ok | {error, Reason :: term()}.

Propose a CRDT update with specific CRDT type.

Updates platform-managed shared state using the specified CRDT type for automatic conflict resolution.

CRDT Types

  • lww_register - Last-Write-Wins Register (default)
    • Use for: Configuration values, latest status
    • Conflict resolution: Latest timestamp wins
  • g_counter - Grow-Only Counter
    • Use for: Metrics, totals (never decrease)
    • Operations: increment only
  • pn_counter - Positive-Negative Counter
    • Use for: Bidirectional counters (can increase/decrease)
    • Operations: increment, decrement
  • g_set - Grow-Only Set
    • Use for: Accumulating collections (never remove)
    • Operations: add elements only
  • or_set - Observed-Remove Set
    • Use for: Sets with add/remove operations
    • Operations: add, remove elements

Examples

  %% Increment a counter
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.active_games">>,
      {increment, 1},
      #{crdt_type => pn_counter}
  ).
 
  %% Add to a set
  ok = macula:propose_crdt_update(
      Client,
      <<"my.app.player_ids">>,
      {add, <<"player123">>},
      #{crdt_type => or_set}
  ).

publish(Client, Topic, Data)

-spec publish(Client :: client(), Topic :: topic(), Data :: event_data()) ->
                 ok | {error, Reason :: term()}.

Publish an event to a topic.

Publishes data to the specified topic. All subscribers to this topic will receive the event.

Topic Design

Topics should describe EVENT TYPES, not entity instances:

  • Good: <<"my.app.user.registered">> (event type)
  • Bad: <<"my.app.user.123.registered">> (entity ID in topic)

Entity IDs belong in the event payload, not the topic name.

Examples

  %% Publish with default options
  ok = macula:publish(Client, <<"my.app.events">>, #{
      type => <<"user.registered">>,
      user_id => <<"user-123">>,
      email => <<"user@example.com">>
  }).
 
  %% Publish with options
  ok = macula:publish(Client, <<"my.app.events">>, #{
      data => <<"important">>
  }, #{acknowledge => true}).

publish(Client, Topic, Data, Opts)

-spec publish(Client :: client(), Topic :: topic(), Data :: event_data(), Opts :: options()) ->
                 ok | {error, Reason :: term()}.

Publish an event with options.

This is fire-and-forget - returns ok immediately without blocking. Uses gen_server:cast to avoid blocking the caller (prevents LiveView freezes). Both macula_local_client and macula_peer handle {publish_async, ...} casts.

read_crdt(Client, Key)

(since v0.10.0)
-spec read_crdt(Client :: client(), Key :: binary()) -> {ok, term()} | {error, not_found | term()}.

Read the current value of a CRDT-managed shared state entry.

Reads from the local CRDT replica. The value reflects all converged updates from across the platform cluster.

Returns {error, not_found} if the key has never been written.

Examples

  %% Read LWW-Register value
  {ok, MaxUsers} = macula:read_crdt(Client, <<"my.app.config.max_users">>).
 
  %% Read counter value
  {ok, GameCount} = macula:read_crdt(Client, <<"my.app.active_games">>).
 
  %% Read set value
  {ok, PlayerSet} = macula:read_crdt(Client, <<"my.app.player_ids">>).

register_workload(Client, Opts)

(since v0.10.0)
-spec register_workload(Client :: client(), Opts :: options()) ->
                           {ok, map()} | {error, Reason :: term()}.

Register this workload with the Platform Layer.

Registers the workload application with Macula's Platform Layer and returns information about the current platform cluster state, including the current leader node.

Options

  • workload_name - Required. Binary name identifying this workload type (e.g., <<"macula_arcade">>, <<"my_app">>)
  • capabilities - Optional. List of atoms describing workload capabilities (e.g., [coordinator, game_server])

Returns

  • leader_node - Binary node ID of the current Platform Layer leader
  • cluster_size - Integer count of nodes in the platform cluster
  • platform_version - Binary version string (e.g., <<"0.10.0">>)

Examples

  {ok, Client} = macula:connect_local(#{realm => <<"my.app">>}),
  {ok, Info} = macula:register_workload(Client, #{
      workload_name => <<"my_app_coordinator">>,
      capabilities => [coordinator, matchmaking]
  }),
  #{leader_node := Leader, cluster_size := Size} = Info.

subscribe(Client, Topic, Callback)

-spec subscribe(Client :: client(), Topic :: topic(), Callback :: fun((event_data()) -> ok)) ->
                   {ok, subscription_ref()} | {error, Reason :: term()}.

Subscribe to a topic.

Subscribes to events on the specified topic. The callback function will be invoked for each event received.

Callback Function

The callback receives the event data and should return ok.

Examples

  %% Simple subscription
  {ok, SubRef} = macula:subscribe(Client, <<"my.app.events">>,
      fun(EventData) ->
          io:format("Event: ~p~n", [EventData]),
          ok
      end).
 
  %% Unsubscribe later
  ok = macula:unsubscribe(Client, SubRef).

subscribe_leader_changes(Client, Callback)

(since v0.10.0)
-spec subscribe_leader_changes(Client :: client(), Callback :: fun((map()) -> ok)) ->
                                  {ok, subscription_ref()} | {error, Reason :: term()}.

Subscribe to Platform Layer leader change notifications.

Registers a callback function to be invoked whenever the Platform Layer leader changes due to election or node failure.

The callback receives a map with:

  • old_leader - Previous leader node ID (may be undefined)
  • new_leader - New leader node ID
  • term - Raft term number (monotonically increasing)

Examples

  {ok, SubRef} = macula:subscribe_leader_changes(Client,
      fun(#{old_leader := Old, new_leader := New}) ->
          io:format("Leader changed: ~p -> ~p~n", [Old, New]),
          handle_leadership_transition(New),
          ok
      end).

unadvertise(Client, Procedure)

-spec unadvertise(Client :: client(), Procedure :: procedure()) -> ok | {error, Reason :: term()}.

Stop advertising a service.

Removes the local handler and stops advertising to the DHT.

Examples

  ok = macula:unadvertise(Client, <<"my.app.get_user">>).

unsubscribe(Client, SubRef)

-spec unsubscribe(Client :: client(), SubRef :: subscription_ref()) -> ok | {error, Reason :: term()}.

Unsubscribe from a topic.

Removes the subscription identified by the subscription reference.