Grove.PubSub (Grove v0.1.1)
View SourcePhoenix.PubSub integration for CRDT delta synchronization.
This module provides a thin wrapper around Phoenix.PubSub for broadcasting
CRDT deltas across a cluster. It's an alternative to the default :pg-based
broadcasting used by Grove.Cluster.Membership.
Requirements
Add phoenix_pubsub to your dependencies:
{:phoenix_pubsub, "~> 2.1"}Usage with Replication.Server
# Start a PubSub (typically in your supervision tree)
Phoenix.PubSub.Supervisor.start_link(name: MyApp.PubSub)
# Start a replication server using PubSub
Grove.Replication.Server.start_link(
crdt_module: Grove.Counter.GCounter,
actor: :node_a,
group: {:counter, "page_views"},
pubsub: MyApp.PubSub
)Topic Naming
Topics are derived from the group using topic_for_group/1:
{:counter, "views"} -> "grove:counter:views"
{:set, "users"} -> "grove:set:users"
"my_topic" -> "grove:my_topic"Message Format
Messages are broadcast as {:grove_delta, delta, from_pid} tuples,
matching the format used by the :pg-based membership system.
Summary
Functions
Broadcasts a message to all subscribers of a topic.
Broadcasts a CRDT delta to all replicas subscribed to the group's topic.
Broadcasts a message from a specific pid (excluded from receiving).
Subscribes the current process to a topic.
Subscribes the current process to a group's topic.
Converts a group identifier to a PubSub topic string.
Unsubscribes the current process from a topic.
Unsubscribes the current process from a group's topic.
Types
Functions
Broadcasts a message to all subscribers of a topic.
Returns :ok on success.
Example
Grove.PubSub.broadcast(MyApp.PubSub, "grove:counter:views", {:grove_delta, delta, self()})
Broadcasts a CRDT delta to all replicas subscribed to the group's topic.
This is a convenience function that combines topic_for_group/1 and broadcast_from/4
with the standard Grove delta message format.
Example
Grove.PubSub.broadcast_delta(MyApp.PubSub, {:counter, "views"}, delta)
Broadcasts a message from a specific pid (excluded from receiving).
The from_pid will not receive the broadcast message.
Example
Grove.PubSub.broadcast_from(MyApp.PubSub, self(), "grove:counter:views", {:grove_delta, delta, self()})
Subscribes the current process to a topic.
Returns :ok on success.
Example
Grove.PubSub.subscribe(MyApp.PubSub, "grove:counter:views")
Subscribes the current process to a group's topic.
This is a convenience function that combines topic_for_group/1 and subscribe/2.
Example
Grove.PubSub.subscribe_to_group(MyApp.PubSub, {:counter, "views"})
Converts a group identifier to a PubSub topic string.
Examples
iex> Grove.PubSub.topic_for_group({:counter, "views"})
"grove:counter:views"
iex> Grove.PubSub.topic_for_group({:set, :users})
"grove:set:users"
iex> Grove.PubSub.topic_for_group("my_custom_topic")
"grove:my_custom_topic"
iex> Grove.PubSub.topic_for_group(:simple_group)
"grove:simple_group"
Unsubscribes the current process from a topic.
Returns :ok on success.
Example
Grove.PubSub.unsubscribe(MyApp.PubSub, "grove:counter:views")
Unsubscribes the current process from a group's topic.
This is a convenience function that combines topic_for_group/1 and unsubscribe/2.
Example
Grove.PubSub.unsubscribe_from_group(MyApp.PubSub, {:counter, "views"})