Combo.PubSub (combo_pubsub v0.1.0)
View SourceDistributed Pub/Sub system.
Getting started
To start a pubsub server, add Combo.PubSub to your supervision tree:
children = [
# ...
{Combo.PubSub, name: :my_pubsub}
]You can now use the functions in this module to subscribe and broadcast messages:
iex> alias Combo.PubSub
iex> PubSub.subscribe(:my_pubsub, "user:123")
:ok
iex> Process.info(self(), :messages)
{:messages, []}
iex> PubSub.broadcast(:my_pubsub, "user:123", {:user_update, %{id: 123, name: "Shane"}})
:ok
iex> Process.info(self(), :messages)
{:messages, [{:user_update, %{id: 123, name: "Shane"}}]}Adapters
Combo PubSub was designed to be flexible and support multiple backends. There is one built-in backend:
Combo.PubSub.PG- the default adapter that ships as part ofCombo.PubSub. It runs on Distributed Erlang, directly exchanging notifications between servers. It supports a:pool_sizeoption to be given alongside the name, defaults to1. Note the:pool_sizemust be the same throughout the cluster, therefore don't configure the pool size based onSystem.schedulers_online/0, especially if you are using machines with different specs.
See Combo.PubSub.Adapter to implement a custom adapter.
Custom dispatching
Combo.PubSub allows developers to perform custom dispatching by passing a
dispatcher module which is responsible for local message deliveries.
The dispatcher must be available on all nodes running the PubSub system.
The dispatch/3 function of the given module will be invoked with the
subscriptions entries, the broadcaster identifier (either a pid or :none)
, and the message to broadcast.
You may want to use the dispatcher to perform special delivery for certain
subscriptions. This can be done by passing the :metadata option during
subscriptions. For instance, Combo Channels use a custom value to provide
"fastlaning", allowing messages broadcast to thousands or even millions of
users to be encoded once and written directly to sockets instead of being
encoded per channel.
Safe pool size migration (when using Combo.PubSub.PG adapter)
When you need to change the pool size in a running cluster, you can use the
broadcast_pool_size option to ensure no messages are lost during deployment.
This is particularly important when increasing the pool size.
Here's how to safely increase the pool size from 1 to 2:
- Initial state - Current configuration with
pool_size: 1:{Combo.PubSub, name: :my_pubsub, pool_size: 1}
graph TD
subgraph "Initial State"
subgraph "Node 1"
A1[Shard 1<br/>Broadcast & Receive]
end
subgraph "Node 2"
B1[Shard 1<br/>Broadcast & Receive]
end
A1 <--> B1
end- First deployment - Set the new pool size but keep broadcasting on the old size:
{Combo.PubSub, name: :my_pubsub, pool_size: 2, broadcast_pool_size: 1}
graph TD
subgraph "First Deployment"
subgraph "Node 1"
A1[Shard 1<br/>Broadcast & Receive]
A2[Shard 2<br/>Broadcast & Receive]
end
subgraph "Node 2"
B1[Shard 1<br/>Broadcast & Receive]
B2[Shard 2<br/>Receive Only]
end
A1 <--> B1
A2 --> B2
end- Final deployment - All nodes running with new pool size:
{Combo.PubSub, name: :my_pubsub, pool_size: 2}
graph TD
subgraph "Final State"
subgraph "Node 1"
A1[Shard 1<br/>Broadcast & Receive]
A2[Shard 2<br/>Broadcast & Receive]
end
subgraph "Node 2"
B1[Shard 1<br/>Broadcast & Receive]
B2[Shard 2<br/>Broadcast & Receive]
end
A1 <--> B1
A2 <--> B2
endThis two-step process ensures that:
- All nodes can receive messages from both old and new pool sizes.
- No messages are lost during the transition.
- The cluster remains fully functional throughout the deployment.
To decrease the pool size, follow the same process in reverse order.
Summary
Functions
Broadcasts message on given topic across the whole cluster.
Raising version of broadcast/4.
Broadcasts message on given topic from the given process across the whole cluster.
Raising version of broadcast_from/5.
Returns a child specification for pubsub with the given options.
Broadcasts message on given topic to a given node.
Raising version of direct_broadcast/5.
Broadcasts message on given topic only for the current node.
Broadcasts message on given topic from a given process only for the current node.
Returns the node name of the pubsub server.
Subscribes the caller to the topic on the pubsub server.
Unsubscribes the caller from the topic on the pubsub server.
Types
Functions
@spec broadcast(t(), topic(), message(), dispatcher()) :: :ok | {:error, term()}
Broadcasts message on given topic across the whole cluster.
Arguments
pubsub- the name of the pubsub server.topic- the topic to broadcast to, such as"users:123".message- the payload of the broadcast.
A custom dispatcher may also be given as a fourth, optional argument. See the "Custom dispatching" section in the module documentation.
@spec broadcast!(t(), topic(), message(), dispatcher()) :: :ok
Raising version of broadcast/4.
Broadcasts message on given topic from the given process across the whole cluster.
Arguments
pubsub- the name of the pubsub server.from- the pid to send the message.topic- the topic to broadcast to, such as"users:123".message- the payload of the broadcast.
The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast.
A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.
@spec broadcast_from!(t(), pid(), topic(), message(), dispatcher()) :: :ok
Raising version of broadcast_from/5.
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child specification for pubsub with the given options.
The :name is required as part of options. The remaining options
are described below.
Options
:name- the name of the pubsub server to be started.:adapter- the adapter to use. Defaults toCombo.PubSub.PG.:pool_size- the number of pubsub partitions to launch. Defaults to one partition for every 4 cores.:registry_size- the number ofRegistrypartitions to launch. This controls the number of Registry partitions used for storing subscriptions and can be tuned independently from:pool_sizefor better performance characteristics. Defaults to the value of:pool_size.:broadcast_pool_size- the number of pubsub partitions used for broadcasting messages. This option is used during pool size migrations to ensure no messages are lost. See the "Safe Pool Size Migration" section in the module documentation. Defaults to the value of:pool_size.
@spec direct_broadcast(node_name(), t(), topic(), message(), dispatcher()) :: :ok | {:error, term()}
Broadcasts message on given topic to a given node.
Arguments
node_name- the name of the target node.pubsub- the name of the pubsub server.topic- the topic to broadcast to, such as"users:123".message- the payload of the broadcast.
DO NOT use this function if you wish to broadcast to the current
node, as it is always serialized, use local_broadcast/4 instead.
A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.
@spec direct_broadcast!(node_name(), t(), topic(), message(), dispatcher()) :: :ok
Raising version of direct_broadcast/5.
@spec local_broadcast(t(), topic(), message(), dispatcher()) :: :ok
Broadcasts message on given topic only for the current node.
Arguments
pubsub- the name of the pubsub server.topic- the topic to broadcast to, such as"users:123".message- the payload of the broadcast.
A custom dispatcher may also be given as a fourth, optional argument. See the "Custom dispatching" section in the module documentation.
@spec local_broadcast_from(t(), pid(), topic(), message(), dispatcher()) :: :ok
Broadcasts message on given topic from a given process only for the current node.
Arguments
pubsub- the name of the pubsub server.from- the pid to send the message.topic- the topic to broadcast to, such as"users:123".message- the payload of the broadcast.
The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast.
A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.
Returns the node name of the pubsub server.
Subscribes the caller to the topic on the pubsub server.
Arguments
pubsub- the name of the pubsub server.topic- the topic to subscribe to, such as"users:123".opts- the optional list of options. See below.
Duplicate Subscriptions
Callers should only subscribe to a given topic a single time. Duplicate
subscriptions for a Pid/topic pair are allowed and will cause duplicate
events to be sent. However, when using Combo.PubSub.unsubscribe/2, all
duplicate subscriptions will be dropped.
Options
:metadata- provides metadata to be attached to this subscription. The metadata can be used by custom dispatching mechanisms. See the "Custom dispatching" section in the module documentation.
Unsubscribes the caller from the topic on the pubsub server.