Grove.PubSub (Grove v0.1.1)

View Source

Phoenix.PubSub integration for CRDT delta synchronization.

This module provides a thin wrapper around Phoenix.PubSub for broadcasting CRDT deltas across a cluster. It's an alternative to the default :pg-based broadcasting used by Grove.Cluster.Membership.

Requirements

Add phoenix_pubsub to your dependencies:

{:phoenix_pubsub, "~> 2.1"}

Usage with Replication.Server

# Start a PubSub (typically in your supervision tree)
Phoenix.PubSub.Supervisor.start_link(name: MyApp.PubSub)

# Start a replication server using PubSub
Grove.Replication.Server.start_link(
  crdt_module: Grove.Counter.GCounter,
  actor: :node_a,
  group: {:counter, "page_views"},
  pubsub: MyApp.PubSub
)

Topic Naming

Topics are derived from the group using topic_for_group/1:

{:counter, "views"} -> "grove:counter:views"
{:set, "users"}     -> "grove:set:users"
"my_topic"          -> "grove:my_topic"

Message Format

Messages are broadcast as {:grove_delta, delta, from_pid} tuples, matching the format used by the :pg-based membership system.

Summary

Functions

Broadcasts a message to all subscribers of a topic.

Broadcasts a CRDT delta to all replicas subscribed to the group's topic.

Broadcasts a message from a specific pid (excluded from receiving).

Subscribes the current process to a topic.

Subscribes the current process to a group's topic.

Converts a group identifier to a PubSub topic string.

Unsubscribes the current process from a topic.

Unsubscribes the current process from a group's topic.

Types

group()

@type group() :: atom() | String.t() | {atom(), term()}

message()

@type message() :: term()

pubsub()

@type pubsub() :: atom()

topic()

@type topic() :: String.t()

Functions

broadcast(pubsub, topic, message)

@spec broadcast(pubsub(), topic(), message()) :: :ok | {:error, term()}

Broadcasts a message to all subscribers of a topic.

Returns :ok on success.

Example

Grove.PubSub.broadcast(MyApp.PubSub, "grove:counter:views", {:grove_delta, delta, self()})

broadcast_delta(pubsub, group, delta)

@spec broadcast_delta(pubsub(), group(), term()) :: :ok | {:error, term()}

Broadcasts a CRDT delta to all replicas subscribed to the group's topic.

This is a convenience function that combines topic_for_group/1 and broadcast_from/4 with the standard Grove delta message format.

Example

Grove.PubSub.broadcast_delta(MyApp.PubSub, {:counter, "views"}, delta)

broadcast_from(pubsub, from_pid, topic, message)

@spec broadcast_from(pubsub(), pid(), topic(), message()) :: :ok | {:error, term()}

Broadcasts a message from a specific pid (excluded from receiving).

The from_pid will not receive the broadcast message.

Example

Grove.PubSub.broadcast_from(MyApp.PubSub, self(), "grove:counter:views", {:grove_delta, delta, self()})

subscribe(pubsub, topic)

@spec subscribe(pubsub(), topic()) :: :ok | {:error, term()}

Subscribes the current process to a topic.

Returns :ok on success.

Example

Grove.PubSub.subscribe(MyApp.PubSub, "grove:counter:views")

subscribe_to_group(pubsub, group)

@spec subscribe_to_group(pubsub(), group()) :: :ok | {:error, term()}

Subscribes the current process to a group's topic.

This is a convenience function that combines topic_for_group/1 and subscribe/2.

Example

Grove.PubSub.subscribe_to_group(MyApp.PubSub, {:counter, "views"})

topic_for_group(group)

@spec topic_for_group(group()) :: topic()

Converts a group identifier to a PubSub topic string.

Examples

iex> Grove.PubSub.topic_for_group({:counter, "views"})
"grove:counter:views"

iex> Grove.PubSub.topic_for_group({:set, :users})
"grove:set:users"

iex> Grove.PubSub.topic_for_group("my_custom_topic")
"grove:my_custom_topic"

iex> Grove.PubSub.topic_for_group(:simple_group)
"grove:simple_group"

unsubscribe(pubsub, topic)

@spec unsubscribe(pubsub(), topic()) :: :ok

Unsubscribes the current process from a topic.

Returns :ok on success.

Example

Grove.PubSub.unsubscribe(MyApp.PubSub, "grove:counter:views")

unsubscribe_from_group(pubsub, group)

@spec unsubscribe_from_group(pubsub(), group()) :: :ok

Unsubscribes the current process from a group's topic.

This is a convenience function that combines topic_for_group/1 and unsubscribe/2.

Example

Grove.PubSub.unsubscribe_from_group(MyApp.PubSub, {:counter, "views"})