PubSubx (pub_subx v0.2.4)
PubSubx is a simple publish-subscribe (PubSub) system built on top of Elixir's GenServer and Registry.
The module allows processes to subscribe to topics, publish messages to those topics, and manage subscriptions. It efficiently handles message delivery to subscribed processes and automatically cleans up subscriptions when processes terminate.
Features
- Subscribe/Unsubscribe: Processes can subscribe or unsubscribe from topics.
- Publish: Messages can be published to a topic, and all subscribers to that topic will receive the message.
- Dynamic Topics: Topics are dynamically created as they are subscribed to, and they are removed when no subscribers exist.
- Process Monitoring: Automatically removes subscribers when the process is no longer alive.
Auto Module
The PubSubx.Auto module is a utility that helps developers reduce boilerplate code when defining PubSubx modules.
Example Usage with the Auto Module
Define a PubSubx module:
defmodule MyApp.MyPubSub do
use PubSubx.Auto, name: MyApp.MyPubSub
endInclude it in your supervisor tree
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# Start the PubSubx server
{MyApp.MyPubSub, []}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endNow you can use the MyPubSub module:
# If you didn't include it in the supervisor tree, you can start it as follows:
{:ok, _pid} = MyApp.MyPubSub.start_link()
# Subscribe a pid (e.g., self()) to a topic
MyApp.MyPubSub.subscribe(:my_topic, self())
# List subscribers
subscribers = MyApp.MyPubSub.subscribers(:my_topic)
# List topics
topics = MyApp.MyPubSub.topics()
# Publish a message
MyApp.MyPubSub.publish(:my_topic, "Hello, world!")
# Unsubscribe a process from a topic
# This is optional. This happens automatically if the subscribed process dies.
MyApp.MyPubSub.unsubscribe(:my_topic, self())Distributed Publish
You can broadcast messages in all interconnected nodes with PubSubx.distribute_publish/4 (e.g., when using libcluster).
MyApp.MyPubSub.distribute_publish(:my_topic, "Hello, world!")Example Usage
Start the PubSubx server:
{:ok, pid} = PubSubx.start_link(name: :my_pubsub)Subscribe a process to a topic:
# You can use the pid or the process name
PubSubx.subscribe(:my_pubsub, :my_topic, self())
PubSubx.subscribe(pid, :other_topic, self())Publish a message to the topic:
PubSubx.publish(:my_pubsub, :my_topic, "Hello, subscribers!")Get the list of subscribers:
subscribers = PubSubx.subscribers(:my_pubsub, :my_topic)Unsubscribe a process from a topic:
PubSubx.unsubscribe(:my_pubsub, :my_topic, self())
Summary
Functions
Returns a specification to start this module under a supervisor.
Distributes a publish event across nodes in a distributed system.
Publishes a message to the specified topic.
Starts the PubSubx server.
Subscribes a given process (pid) to a specific topic.
Returns a list of PIDs that are subscribed to the specified topic.
Lists all topics that have active subscribers.
Unsubscribes a given process (pid) from the specified topic.
Types
process()
topic()
Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor.
distribute_publish(module, publish_args, node_filter \\ & &1, node_opts \\ [:visible, :this])
Distributes a publish event across nodes in a distributed system.
This uses the PubSubx.Utils.distribute_publish/4 function to send messages to the specified topic across nodes, filtered by node_filter.
Parameters:
module: The module that handles the PubSubx logic.topic: The topic to which the message is published.message: The message to be published.node_filter: A function to filter nodes (default: all visible nodes).node_opts: Options for selecting nodes, default[:visible, :this].
publish(process, topic, message)
Publishes a message to the specified topic.
All subscribers to that topic will receive the message.
start_link(opts \\ [])
@spec start_link(Keyword.t()) :: GenServer.on_start()
Starts the PubSubx server.
Options
subscribe(pubsub, topic, pid)
Subscribes a given process (pid) to a specific topic.
subscribers(process, topic)
Returns a list of PIDs that are subscribed to the specified topic.
topics(process)
Lists all topics that have active subscribers.
unsubscribe(process, topic, pid)
Unsubscribes a given process (pid) from the specified topic.