Group (Group v0.2.0)

Copy Markdown View Source

Distributed process groups, registry, lifecycle monitoring, and isolated subclusters.

This module provides:

  • Distributed registry: Unique key => process mapping across all nodes
  • Process groups: Allow processes to join/leave keys (many processes per key)
  • Isolated subclusters: Partition groups and registries into named subclusters for isolated messaging
  • Lifecycle monitoring: Monitor lifecycle events for registry and group changes

Consistency Model

All operations are eventually consistent. The built-in replication layer uses Erlang distribution to propagate state across nodes, which means:

  • Writes (register, join, etc.) return immediately after local update
  • Other nodes receive updates asynchronously via Erlang distribution
  • During network partitions, nodes may have divergent views
  • When partitions heal, conflicts are resolved. The losing process is killed with {:group_registry_conflict, key, meta}

Clusters

By default, all operations use the default cluster (nil). You can optionally create isolated subclusters where only connected nodes receive events.

Named clusters use string names (e.g., "game_servers").

connect/3 supports ttl: milliseconds for named clusters. This is useful when a node only needs a cluster while it has local interest in that cluster. The initial connect still does the normal ETS-first membership check, so repeated connect/3 calls while already connected stay a cheap noop and do not refresh the TTL. When a lease expires, Group only disconnects if the local node has no cluster-scoped monitors, no local registrations, and no local group memberships in that named cluster.

Important: DurableServer Registration

DurableServers always register in the default cluster to ensure global uniqueness via the distributed locking mechanism. Named clusters are purely for isolating your own registries, process groups, and subscriptions to isolated subclusters. If a DurableServer wants to participate in an isolated cluster, it can call connect/2 and join/4 inside its init callback.

Core Concepts

Monitoring vs Memberships

  • Monitoring (monitor/2, demonitor/2): Receive events in your mailbox when DurableServers or other processes register/join matching keys anywhere in the cluster. Supports pattern matching on keys.

  • Memberships (join/3, leave/2): Make your process discoverable cluster-wide via members/2. Triggers :joined/:left events to monitors.

These are independent - joining a key does NOT automatically monitor events, and monitoring does NOT make you discoverable via members/2.

Event Types

Events are delivered as {:group, events, info} tuples:

{:group, [
  %Group.Event{
    type: event_type,
    supervisor: name,
    cluster: cluster_name,  # nil for default cluster
    key: key,
    pid: pid,
    meta: meta,             # always user-provided meta (internal keys stripped)
    previous_meta: ...,     # nil for new, old meta for re-register/re-join
    reason: ...             # set on :unregistered/:left events
  }
], %{name: name}}
EventTriggerExtra Fields
:registeredregister/3 (new or re-register):previous_meta
:unregisteredProcess unregistered or died:reason
:joinedjoin/3 (new or re-join):previous_meta
:leftProcess left group or died:reason

:previous_meta is nil for new registrations/joins, or the old metadata map when re-registering/re-joining.

DurableServers automatically register/unregister during their lifecycle, so these events can be used to track DurableServer start/stop.

Pattern Types

Monitors support three pattern types:

  • "user/123" - Exact match, only events for this specific key
  • "user/" - Prefix match, all keys starting with "user/"
  • :all - All events for this supervisor

Self-Events

A process that monitors a pattern and then joins a matching key will receive its own :joined event. Similarly for :left when leaving. Filter these in your handler if needed:

def handle_info({:group, events, _info}, state) do
  Enum.each(events, fn
    %Group.Event{type: :joined, pid: pid} when pid != self() ->
      # Handle other processes' join events
      :ok
    _ -> :ok
  end)
  {:noreply, state}
end

Examples

Basic Monitoring

# Monitor all events for a specific key
:ok = Group.monitor(MySup, "user/123")

# Monitor all keys under a prefix
:ok = Group.monitor(MySup, "chat/")

# Monitor all events
:ok = Group.monitor(MySup, :all)

# Handle events in a GenServer
def handle_info({:group, events, _info}, state) do
  Enum.each(events, fn
    %Group.Event{type: :registered, key: key} ->
      IO.puts("DurableServer started: #{key}")
    %Group.Event{type: :unregistered, key: key, reason: reason} ->
      IO.puts("DurableServer stopped: #{key}, reason: #{inspect(reason)}")
    _ -> :ok
  end)
  {:noreply, state}
end

Joining as a Member

# Join a key to be discoverable by other processes
:ok = Group.join(MySup, "game/room/42", %{role: :spectator})

# Query all members of a key (joined processes only)
members = Group.members(MySup, "game/room/42")
# => [{#PID<0.200.0>, %{role: :spectator}}]

# Leave when done
:ok = Group.leave(MySup, "game/room/42")

Using Named Clusters

# Connect this node to a named cluster
:ok = Group.connect(MySup, "game_servers")

# Or lease it while this node still has local interest
:ok = Group.connect(MySup, "game_servers", ttl: 30_000)

# Join a group in the named cluster
:ok = Group.join(MySup, "room/123", %{role: :member}, cluster: "game_servers")

# Monitor events in the named cluster
:ok = Group.monitor(MySup, :all, cluster: "game_servers")

# Members and dispatch also support cluster option
Group.members(MySup, "room/123", cluster: "game_servers")
Group.dispatch(MySup, "room/123", {:msg, "hi"}, cluster: "game_servers")

Architecture Notes

  • Events are cluster-wide: Replication callbacks fire on ALL nodes in the cluster. This means a monitor on Node A receives events when a DurableServer registers on Node B.

  • Monitors are stored per-node in an Elixir Registry, enabling pattern matching and automatic cleanup when monitoring processes die.

  • Memberships use built-in process groups for cluster-wide distribution and automatic cleanup when member processes die.

Summary

Functions

Returns a child spec for starting Group under a supervisor.

Connect the local node to a named cluster.

Check if the local node is connected to a named cluster.

Stop monitoring lifecycle events for the given pattern.

Disconnect the local node from one or more named clusters.

Dispatch a message to all members of a key.

Like dispatch/4, but only sends to members on the local node.

Join a group as a member.

Leave a group that was previously joined.

List all entries owned by the local node across all shards.

Count processes in a group on the local node.

Count processes registered in the local node's registry.

Set the log level at runtime. Accepts :info, :verbose, or false.

Look up a registered process by key.

Count group members in this node's local replicated ETS view.

List all members of a group (process group entries only).

Monitor lifecycle events matching the given pattern.

List all nodes running this Group instance.

List all nodes in a named cluster.

Register the calling process in the cluster registry.

Count processes registered in this node's local replicated ETS view.

Unregister a process from the cluster registry.

Functions

child_spec(opts)

Returns a child spec for starting Group under a supervisor.

Options

  • :name (required) — atom identifying this Group instance
  • :shards — number of GenServer shards (default: 8). Must match across all nodes
  • :log — logging level. :info (default) logs peer discovery, node events, and cluster membership changes. :verbose additionally logs per-shard replication messages. false disables all Group log output.
  • :resolve_registry_conflict{module, function, extra_args} callback invoked when two nodes hold the same registry key (partition heal or concurrent registration). Called as apply(module, function, [name, key, {pid1, meta1, time1}, {pid2, meta2, time2} | extra_args]). Must return the winner pid. The loser is killed with {:group_registry_conflict, key, meta}. Important: This callback runs synchronously inside the shard GenServer — it must return quickly and never block. Any information needed for the decision should be carried in the registration metadata, not fetched at resolution time.
  • :extract_meta{module, function, args} to transform metadata on reads
  • :replicated_pg_receiver_buffer_size — max buffered replicated PG join/leave ops per shard before the receiver flushes immediately (default: 64)
  • :replicated_pg_receiver_flush_interval — max time in milliseconds a shard will buffer replicated PG join/leave ops before flushing (default: 5)
  • :replicated_registry_receiver_buffer_size — max buffered replicated registry register/unregister ops per shard before the receiver flushes immediately (default: 64)
  • :replicated_registry_receiver_flush_interval — max time in milliseconds a shard will buffer replicated register/unregister ops before flushing (default: 5)
  • :replicated_sender_buffer_size — max buffered replicated outbound ops per shard before the sender flushes immediately (default: 64)
  • :replicated_sender_flush_interval — max time in milliseconds a shard will buffer replicated outbound ops before flushing during idle periods. Sender buffers also flush on size, overdue enqueue, and control/routing barriers (default: 5)
  • :busy_dist_retry_attempts — max reconnect attempts after a shard hits send_nosuspend == false to a remote node and forces a disconnect (default: 300)
  • :busy_dist_retry_interval — interval in milliseconds between reconnect attempts after a busy-dist disconnect (default: 1_000)
  • :replicated_pg_receiver_local_request_quota — max queued local PG shard requests drained after each replicated PG flush before yielding (default: 8)

connect(name, cluster_or_clusters, opts \\ [])

Connect the local node to a named cluster.

This adds the current node to the cluster, allowing it to send and receive process group events within that cluster.

Parameters

  • name - The Group name
  • cluster_name - The name of the cluster to connect to (binary string)
  • opts - Keyword list of options

Options

  • :timeout - timeout passed to the shard request (default: 60_000)
  • :ttl - for named clusters, keep the local connection leased for this many milliseconds. The lease is only created on a new connect; repeated connect/3 calls while already connected stay a cheap noop and do not refresh the TTL. Expired leases only disconnect if the local node has no cluster-scoped monitors, no local registrations, and no local group memberships in that cluster.

Returns

  • :ok on success

connected?(name, cluster_name)

Check if the local node is connected to a named cluster.

Parameters

  • name - The Group name
  • cluster_name - The name of the cluster to check (binary string)

Returns

  • true if connected
  • false if not connected

demonitor(name, pattern_string, opts \\ [])

Stop monitoring lifecycle events for the given pattern.

Options

  • :cluster - The cluster to demonitor from (default: nil for default cluster)

Returns

  • :ok always (demonitoring a non-existent monitor is a no-op)

disconnect(name, cluster_or_clusters, opts \\ [])

Disconnect the local node from one or more named clusters.

Accepts a single cluster name (binary) or a list of cluster names.

Parameters

  • name - The Group name
  • cluster_name - The cluster name or list of cluster names to disconnect from
  • opts - Keyword list of options

Options

  • :timeout - timeout passed to the shard request (default: 60_000)

dispatch(name, key, message, opts \\ [])

Dispatch a message to all members of a key.

Sends message to all processes that have joined the key via join/3, as well as any DurableServer registered at that key. This is useful for application-level messaging between a DurableServer and connected clients (e.g., Phoenix Channels).

Dispatch vs Monitor

There are two ways to receive messages in this module:

  • monitor/2 - Receive lifecycle events (:registered, :unregistered, etc.) when DurableServers or processes join/leave keys matching a pattern. These are system-generated events.

  • dispatch/3 - Receive application messages sent explicitly by your code. Only members of the exact key receive the message.

Use monitor to react to lifecycle changes. Use dispatch to send your own messages to members.

Filtering by Metadata

dispatch/3 sends to all members. If you need to filter by metadata (e.g., only send to members with %{type: :channel}), use members/2 directly:

for {pid, %{type: :channel}} <- Group.members(sup, key) do
  send(pid, message)
end

Options

  • :cluster - Dispatch to a named cluster instead of the default cluster

Returns

  • :ok always

dispatch_local(name, key, message, opts \\ [])

Like dispatch/4, but only sends to members on the local node.

Skips all cross-node messaging. Useful when you know remote nodes will handle their own local dispatch (e.g., a broadcast originating on each node).

Options

  • :cluster - Dispatch to a named cluster instead of the default cluster

Returns

  • :ok always

join(name, group, meta \\ %{}, opts \\ [])

Join a group as a member.

The process will:

  • Be discoverable via members/2
  • Be automatically removed when it dies

Re-joining an already-joined group updates the metadata in place.

Note: Joining does NOT automatically monitor events. Call monitor/2 separately if you want to receive events.

Parameters

  • name - The Group name
  • group - The group to join (string)
  • meta - Metadata map (default: %{})
  • opts - Keyword list of options

Options

  • :cluster - Join a named cluster instead of the default cluster
  • :timeout - timeout passed to the shard request (default: 5_000)

Returns

  • :ok on success
  • {:error, reason} on failure

leave(name, group, opts \\ [])

Leave a group that was previously joined.

Parameters

  • name - The Group name
  • group - The group to leave (string)
  • opts - Keyword list of options

Options

  • :cluster - Leave from a named cluster instead of the default cluster
  • :timeout - timeout passed to the shard request (default: 5_000)

Returns

  • :ok on success
  • {:error, :not_in_group} if not a member

local_entries(name)

List all entries owned by the local node across all shards.

Returns a flat tagged list containing both registry and process-group entries:

{:registry, cluster, key, pid, meta}
{:pg, cluster, key, pid, meta}

This includes only entries whose owner node is the local node, not the full replicated cluster view. Metadata is passed through the configured extract_meta callback, if any.

Parameters

  • name - The Group name

Returns

  • List of {:registry | :pg, cluster, key, pid, meta} tuples

local_member_count(name, group, opts \\ [])

Count processes in a group on the local node.

Supports prefix matching: if group ends with "/", counts all local members whose group key starts with that prefix.

Parameters

  • name - The Group name
  • group - The group to count (string). Append "/" for prefix matching.
  • opts - Keyword list of options

Options

  • :cluster - Count in a named cluster instead of the default cluster

Returns

  • Integer count

local_registry_count(name, opts \\ [])

Count processes registered in the local node's registry.

This counts only processes registered via register/5 on the local node.

Parameters

  • name - The Group name
  • opts - Keyword list of options

Options

  • :cluster - Count in a named cluster instead of the default cluster

Returns

  • Integer count

log_level(name, level)

Set the log level at runtime. Accepts :info, :verbose, or false.

Updates the persistent_term config so the change takes effect immediately on all shards without restart.

This calls :persistent_term.put/2, so treat it as an admin operation, not something to invoke in a hot loop.

lookup(name, key, opts \\ [])

Look up a registered process by key.

Returns the pid and metadata for the process registered at the given key, or nil if no process is registered.

Parameters

  • name - The Group name
  • key - The key to look up
  • opts - Keyword list of options

Options

  • :cluster - Look up in a named cluster instead of the default cluster
  • :extract_meta - Override the configured extract_meta callback for this call

Returns

  • {pid, meta} if a process is registered at the key
  • nil if no process is registered

member_count(name, group, opts \\ [])

Count group members in this node's local replicated ETS view.

This returns the caller node's current view of process group membership. It is eventually consistent across nodes and may differ briefly during propagation, partitions, or healing.

Supports prefix matching: if group ends with "/", counts all members whose group key starts with that prefix.

Parameters

  • name - The Group name
  • group - The group to count (string). Append "/" for prefix matching.
  • opts - Keyword list of options

Options

  • :cluster - Count in a named cluster instead of the default cluster

Returns

  • Integer count

members(name, group, opts \\ [])

List all members of a group (process group entries only).

Returns processes that have joined via join/3. Registry entries (via register/3) are not included — use lookup/3 for those.

Supports prefix matching: if group ends with "/", returns all members whose group key starts with that prefix. Prefix queries scan all shards. Exact queries hit a single shard.

Parameters

  • name - The Group name
  • group - The group to query (string). Append "/" for prefix matching.
  • opts - Keyword list of options

Options

  • :cluster - Query a named cluster instead of the default cluster
  • :extract_meta - Override the configured extract_meta callback for this call

Returns

  • List of {pid, meta} tuples

monitor(name, pattern_string, opts \\ [])

Monitor lifecycle events matching the given pattern.

The calling process will receive {:group, events, info} tuples containing matching events:

{:group, [%Group.Event{type: :registered, ...}], %{name: name}}

Patterns

  • "exact/key" - exact key match
  • "prefix/" - all keys starting with "prefix/"
  • :all - all keys

Options

  • :cluster - Monitor events from a named cluster (default: nil for default cluster)

Returns

  • :ok on success
  • {:error, reason} on failure

nodes(name)

List all nodes running this Group instance.

Returns nodes that have completed the peer discovery handshake, excluding the local node. Unlike Node.list(), this only includes nodes actually running this Group instance, not all connected Erlang nodes.

Parameters

  • name - The Group name

Returns

  • List of node atoms

nodes(name, cluster_name)

List all nodes in a named cluster.

Parameters

  • name - The Group name
  • cluster_name - The cluster name (binary string)

Returns

  • List of node atoms

register(name, key, meta, opts \\ [])

Register the calling process in the cluster registry.

This registers a process with a unique key in the cluster. Only one process can be registered with a given key at a time (cluster-wide uniqueness).

Use register/4 when you need exactly one process per key. Use join/4 when multiple processes should be able to share the same key.

Parameters

  • name - The Group name
  • key - The unique key to register under
  • meta - Metadata map to associate with the registration
  • opts - Keyword list of options

Options

  • :cluster - Register in a named cluster instead of the default cluster
  • :timeout - timeout passed to the shard request (default: 5_000)

Returns

  • :ok on success
  • {:error, :taken} if another process is already registered with this key

registry_count(name, opts \\ [])

Count processes registered in this node's local replicated ETS view.

This returns the caller node's current view of the cluster registry. It is eventually consistent across nodes and may differ briefly during propagation, partitions, or healing.

Parameters

  • name - The Group name
  • opts - Keyword list of options

Options

  • :cluster - Count in a named cluster instead of the default cluster

Returns

  • Integer count

start_link(opts)

unregister(name, key, opts \\ [])

Unregister a process from the cluster registry.

This removes a process registration. Typically not needed as registrations are automatically cleaned up when the process dies.

Parameters

  • name - The Group name
  • key - The key to unregister
  • opts - Keyword list of options

Options

  • :cluster - Unregister from a named cluster instead of the default cluster
  • :timeout - timeout passed to the shard request (default: 5_000)

Returns

  • :ok on success
  • {:error, reason} on failure