Combo.PubSub (combo_pubsub v0.1.0)

View Source

Distributed Pub/Sub system.

Getting started

To start a pubsub server, add Combo.PubSub to your supervision tree:

children = [
  # ...
  {Combo.PubSub, name: :my_pubsub}
]

You can now use the functions in this module to subscribe and broadcast messages:

iex> alias Combo.PubSub

iex> PubSub.subscribe(:my_pubsub, "user:123")
:ok
iex> Process.info(self(), :messages)
{:messages, []}

iex> PubSub.broadcast(:my_pubsub, "user:123", {:user_update, %{id: 123, name: "Shane"}})
:ok
iex> Process.info(self(), :messages)
{:messages, [{:user_update, %{id: 123, name: "Shane"}}]}

Adapters

Combo PubSub was designed to be flexible and support multiple backends. There is one built-in backend:

  • Combo.PubSub.PG - the default adapter that ships as part of Combo.PubSub. It runs on Distributed Erlang, directly exchanging notifications between servers. It supports a :pool_size option to be given alongside the name, defaults to 1. Note the :pool_size must be the same throughout the cluster, therefore don't configure the pool size based on System.schedulers_online/0, especially if you are using machines with different specs.

See Combo.PubSub.Adapter to implement a custom adapter.

Custom dispatching

Combo.PubSub allows developers to perform custom dispatching by passing a dispatcher module which is responsible for local message deliveries.

The dispatcher must be available on all nodes running the PubSub system. The dispatch/3 function of the given module will be invoked with the subscriptions entries, the broadcaster identifier (either a pid or :none) , and the message to broadcast.

You may want to use the dispatcher to perform special delivery for certain subscriptions. This can be done by passing the :metadata option during subscriptions. For instance, Combo Channels use a custom value to provide "fastlaning", allowing messages broadcast to thousands or even millions of users to be encoded once and written directly to sockets instead of being encoded per channel.

Safe pool size migration (when using Combo.PubSub.PG adapter)

When you need to change the pool size in a running cluster, you can use the broadcast_pool_size option to ensure no messages are lost during deployment. This is particularly important when increasing the pool size.

Here's how to safely increase the pool size from 1 to 2:

  1. Initial state - Current configuration with pool_size: 1:
    {Combo.PubSub, name: :my_pubsub, pool_size: 1}
graph TD
    subgraph "Initial State"
        subgraph "Node 1"
            A1[Shard 1<br/>Broadcast & Receive]
        end
        subgraph "Node 2"
            B1[Shard 1<br/>Broadcast & Receive]
        end
        A1 <--> B1
    end
  1. First deployment - Set the new pool size but keep broadcasting on the old size:
    {Combo.PubSub, name: :my_pubsub, pool_size: 2, broadcast_pool_size: 1}
graph TD
    subgraph "First Deployment"
        subgraph "Node 1"
            A1[Shard 1<br/>Broadcast & Receive]
            A2[Shard 2<br/>Broadcast & Receive]
        end
        subgraph "Node 2"
            B1[Shard 1<br/>Broadcast & Receive]
            B2[Shard 2<br/>Receive Only]
        end
        A1 <--> B1
        A2 --> B2
    end
  1. Final deployment - All nodes running with new pool size:
    {Combo.PubSub, name: :my_pubsub, pool_size: 2}
graph TD
    subgraph "Final State"
        subgraph "Node 1"
            A1[Shard 1<br/>Broadcast & Receive]
            A2[Shard 2<br/>Broadcast & Receive]
        end
        subgraph "Node 2"
            B1[Shard 1<br/>Broadcast & Receive]
            B2[Shard 2<br/>Broadcast & Receive]
        end
        A1 <--> B1
        A2 <--> B2
    end

This two-step process ensures that:

  • All nodes can receive messages from both old and new pool sizes.
  • No messages are lost during the transition.
  • The cluster remains fully functional throughout the deployment.

To decrease the pool size, follow the same process in reverse order.

Summary

Functions

Broadcasts message on given topic across the whole cluster.

Broadcasts message on given topic from the given process across the whole cluster.

Returns a child specification for pubsub with the given options.

Broadcasts message on given topic to a given node.

Broadcasts message on given topic only for the current node.

Broadcasts message on given topic from a given process only for the current node.

Returns the node name of the pubsub server.

Subscribes the caller to the topic on the pubsub server.

Unsubscribes the caller from the topic on the pubsub server.

Types

dispatcher()

@type dispatcher() :: module()

message()

@type message() :: term()

node_name()

@type node_name() :: atom() | binary()

t()

@type t() :: atom()

topic()

@type topic() :: binary()

Functions

broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)

@spec broadcast(t(), topic(), message(), dispatcher()) :: :ok | {:error, term()}

Broadcasts message on given topic across the whole cluster.

Arguments

  • pubsub - the name of the pubsub server.
  • topic - the topic to broadcast to, such as "users:123".
  • message - the payload of the broadcast.

A custom dispatcher may also be given as a fourth, optional argument. See the "Custom dispatching" section in the module documentation.

broadcast!(pubsub, topic, message, dispatcher \\ __MODULE__)

@spec broadcast!(t(), topic(), message(), dispatcher()) :: :ok

Raising version of broadcast/4.

broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)

@spec broadcast_from(t(), pid(), topic(), message(), dispatcher()) ::
  :ok | {:error, term()}

Broadcasts message on given topic from the given process across the whole cluster.

Arguments

  • pubsub - the name of the pubsub server.
  • from - the pid to send the message.
  • topic - the topic to broadcast to, such as "users:123".
  • message - the payload of the broadcast.

The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast.

A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.

broadcast_from!(pubsub, from, topic, message, dispatcher \\ __MODULE__)

@spec broadcast_from!(t(), pid(), topic(), message(), dispatcher()) :: :ok

Raising version of broadcast_from/5.

child_spec(options)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for pubsub with the given options.

The :name is required as part of options. The remaining options are described below.

Options

  • :name - the name of the pubsub server to be started.
  • :adapter - the adapter to use. Defaults to Combo.PubSub.PG.
  • :pool_size - the number of pubsub partitions to launch. Defaults to one partition for every 4 cores.
  • :registry_size - the number of Registry partitions to launch. This controls the number of Registry partitions used for storing subscriptions and can be tuned independently from :pool_size for better performance characteristics. Defaults to the value of :pool_size.
  • :broadcast_pool_size - the number of pubsub partitions used for broadcasting messages. This option is used during pool size migrations to ensure no messages are lost. See the "Safe Pool Size Migration" section in the module documentation. Defaults to the value of :pool_size.

direct_broadcast(node_name, pubsub, topic, message, dispatcher \\ __MODULE__)

@spec direct_broadcast(node_name(), t(), topic(), message(), dispatcher()) ::
  :ok | {:error, term()}

Broadcasts message on given topic to a given node.

Arguments

  • node_name - the name of the target node.
  • pubsub - the name of the pubsub server.
  • topic - the topic to broadcast to, such as "users:123".
  • message - the payload of the broadcast.

DO NOT use this function if you wish to broadcast to the current node, as it is always serialized, use local_broadcast/4 instead.

A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.

direct_broadcast!(node_name, pubsub, topic, message, dispatcher \\ __MODULE__)

@spec direct_broadcast!(node_name(), t(), topic(), message(), dispatcher()) :: :ok

Raising version of direct_broadcast/5.

local_broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)

@spec local_broadcast(t(), topic(), message(), dispatcher()) :: :ok

Broadcasts message on given topic only for the current node.

Arguments

  • pubsub - the name of the pubsub server.
  • topic - the topic to broadcast to, such as "users:123".
  • message - the payload of the broadcast.

A custom dispatcher may also be given as a fourth, optional argument. See the "Custom dispatching" section in the module documentation.

local_broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)

@spec local_broadcast_from(t(), pid(), topic(), message(), dispatcher()) :: :ok

Broadcasts message on given topic from a given process only for the current node.

Arguments

  • pubsub - the name of the pubsub server.
  • from - the pid to send the message.
  • topic - the topic to broadcast to, such as "users:123".
  • message - the payload of the broadcast.

The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast.

A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation.

node_name(pubsub)

@spec node_name(t()) :: node_name()

Returns the node name of the pubsub server.

subscribe(pubsub, topic, opts \\ [])

@spec subscribe(t(), topic(), keyword()) :: :ok | {:error, term()}

Subscribes the caller to the topic on the pubsub server.

Arguments

  • pubsub - the name of the pubsub server.
  • topic - the topic to subscribe to, such as "users:123".
  • opts - the optional list of options. See below.

Duplicate Subscriptions

Callers should only subscribe to a given topic a single time. Duplicate subscriptions for a Pid/topic pair are allowed and will cause duplicate events to be sent. However, when using Combo.PubSub.unsubscribe/2, all duplicate subscriptions will be dropped.

Options

  • :metadata - provides metadata to be attached to this subscription. The metadata can be used by custom dispatching mechanisms. See the "Custom dispatching" section in the module documentation.

unsubscribe(pubsub, topic)

@spec unsubscribe(t(), topic()) :: :ok

Unsubscribes the caller from the topic on the pubsub server.