PubSubx (pub_subx v0.2.4)

PubSubx is a simple publish-subscribe (PubSub) system built on top of Elixir's GenServer and Registry.

The module allows processes to subscribe to topics, publish messages to those topics, and manage subscriptions. It efficiently handles message delivery to subscribed processes and automatically cleans up subscriptions when processes terminate.

Features

  • Subscribe/Unsubscribe: Processes can subscribe or unsubscribe from topics.
  • Publish: Messages can be published to a topic, and all subscribers to that topic will receive the message.
  • Dynamic Topics: Topics are dynamically created as they are subscribed to, and they are removed when no subscribers exist.
  • Process Monitoring: Automatically removes subscribers when the process is no longer alive.

Auto Module

The PubSubx.Auto module is a utility that helps developers reduce boilerplate code when defining PubSubx modules.

Example Usage with the Auto Module

Define a PubSubx module:

defmodule MyApp.MyPubSub do
  use PubSubx.Auto, name: MyApp.MyPubSub
end

Include it in your supervisor tree

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # Start the PubSubx server
      {MyApp.MyPubSub, []}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Now you can use the MyPubSub module:

# If you didn't include it in the supervisor tree, you can start it as follows:
{:ok, _pid} = MyApp.MyPubSub.start_link()

# Subscribe a pid (e.g., self()) to a topic
MyApp.MyPubSub.subscribe(:my_topic, self())

# List subscribers
subscribers = MyApp.MyPubSub.subscribers(:my_topic)

# List topics
topics = MyApp.MyPubSub.topics()

# Publish a message
MyApp.MyPubSub.publish(:my_topic, "Hello, world!")

# Unsubscribe a process from a topic
# This is optional. This happens automatically if the subscribed process dies.
MyApp.MyPubSub.unsubscribe(:my_topic, self())

Distributed Publish

You can broadcast messages in all interconnected nodes with PubSubx.distribute_publish/4 (e.g., when using libcluster).

 MyApp.MyPubSub.distribute_publish(:my_topic, "Hello, world!")

Example Usage

Start the PubSubx server:

{:ok, pid} = PubSubx.start_link(name: :my_pubsub)

Subscribe a process to a topic:

# You can use the pid or the process name
PubSubx.subscribe(:my_pubsub, :my_topic, self())
PubSubx.subscribe(pid, :other_topic, self())

Publish a message to the topic:

PubSubx.publish(:my_pubsub, :my_topic, "Hello, subscribers!")

Get the list of subscribers:

subscribers = PubSubx.subscribers(:my_pubsub, :my_topic)

Unsubscribe a process from a topic:

PubSubx.unsubscribe(:my_pubsub, :my_topic, self())

Summary

Functions

Returns a specification to start this module under a supervisor.

Distributes a publish event across nodes in a distributed system.

Publishes a message to the specified topic.

Starts the PubSubx server.

Subscribes a given process (pid) to a specific topic.

Returns a list of PIDs that are subscribed to the specified topic.

Lists all topics that have active subscribers.

Unsubscribes a given process (pid) from the specified topic.

Types

@type process() :: atom() | pid()
@type topic() :: atom() | binary()

Functions

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

distribute_publish(module, publish_args, node_filter \\ & &1, node_opts \\ [:visible, :this])

Distributes a publish event across nodes in a distributed system.

This uses the PubSubx.Utils.distribute_publish/4 function to send messages to the specified topic across nodes, filtered by node_filter.

Parameters:

  • module: The module that handles the PubSubx logic.
  • topic: The topic to which the message is published.
  • message: The message to be published.
  • node_filter: A function to filter nodes (default: all visible nodes).
  • node_opts: Options for selecting nodes, default [:visible, :this].
Link to this function

publish(process, topic, message)

@spec publish(process(), topic(), term()) :: :ok

Publishes a message to the specified topic.

All subscribers to that topic will receive the message.

Link to this function

start_link(opts \\ [])

@spec start_link(Keyword.t()) :: GenServer.on_start()

Starts the PubSubx server.

Options

  • :name - The name to register the GenServer under (default: PubSubx).
  • :registry_name - Option to possible change the inner registry name.
  • :registry_partitions - Option to possible change the inner registry partitions, default: System.schedulers_online()
Link to this function

subscribe(pubsub, topic, pid)

@spec subscribe(process(), topic(), process()) :: :ok

Subscribes a given process (pid) to a specific topic.

Link to this function

subscribers(process, topic)

@spec subscribers(process(), topic()) :: [pid()]

Returns a list of PIDs that are subscribed to the specified topic.

Link to this function

topics(process)

@spec topics(process()) :: [topic()]

Lists all topics that have active subscribers.

Link to this function

unsubscribe(process, topic, pid)

@spec unsubscribe(process(), topic(), process()) :: :ok

Unsubscribes a given process (pid) from the specified topic.