Resolving Phoenix.PubSub Conflicts
Problem
When multiple applications in the same BEAM VM attempt to start the same Phoenix.PubSub instance, you'll encounter an :already_started
error. This commonly occurs when:
- One application includes another as a dependency
- Both applications try to start Phoenix.PubSub with the same name
- Multiple services share the same PubSub instance
Solution: BCUtils.PubSubManager
BCUtils.PubSubManager
provides utilities to gracefully handle Phoenix.PubSub conflicts by checking if an instance is already running before attempting to start it.
Quick Start
1. Add Dependencies
# In your mix.exs
defp deps do
[
{:bc_utils, "~> 0.6.0"},
{:phoenix_pubsub, "~> 2.1"}
]
end
2. Update Your Supervision Tree
Before (causes conflicts):
defmodule MyApp.Application do
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: :my_pubsub}, # Can cause :already_started error
# other children...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
After (conflict-free):
defmodule MyApp.Application do
def start(_type, _args) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:my_pubsub),
# other children...
]
|> Enum.filter(& &1) # Remove nil entries
Supervisor.start_link(children, strategy: :one_for_one)
end
end
API Reference
maybe_child_spec(pubsub_name, opts \\ [])
Returns a child spec for Phoenix.PubSub if not already started, otherwise nil
.
Parameters:
pubsub_name
- Atom name for the PubSub instance, ornil
opts
- Additional options to pass to Phoenix.PubSub
Returns:
{Phoenix.PubSub, keyword()}
- Child spec if PubSub needs to be startednil
- If PubSub is already running or pubsub_name is nil
Example:
# In a supervision tree
children = [
BCUtils.PubSubManager.maybe_child_spec(:my_app_pubsub),
MyApp.SomeWorker,
MyApp.AnotherWorker
]
|> Enum.filter(& &1) # Remove nil entries
Supervisor.start_link(children, strategy: :one_for_one)
already_started?(pubsub_name)
Checks if a PubSub instance is already running.
Example:
if BCUtils.PubSubManager.already_started?(:my_pubsub) do
IO.puts("PubSub is running")
else
IO.puts("PubSub is not running")
end
ensure_started(pubsub_name, opts \\ [])
Ensures a PubSub instance is available, starting it if necessary.
Example:
case BCUtils.PubSubManager.ensure_started(:my_pubsub) do
{:ok, pid} ->
IO.puts("PubSub ready: #{inspect(pid)}")
{:error, reason} ->
IO.puts("Failed to start PubSub: #{inspect(reason)}")
end
health_check(pubsub_name)
Validates that a PubSub instance is healthy and responding.
Example:
case BCUtils.PubSubManager.health_check(:my_pubsub) do
:ok ->
IO.puts("PubSub is healthy")
{:error, :not_started} ->
IO.puts("PubSub is not running")
{:error, :unresponsive} ->
IO.puts("PubSub is not responding")
end
list_running()
Lists all currently running PubSub processes.
Example:
running_pubsubs = BCUtils.PubSubManager.list_running()
IO.puts("Running PubSub instances: #{inspect(running_pubsubs)}")
Common Use Cases
Case 1: Application with Dependency
When your main application includes another application as a dependency, and both need PubSub:
# Main Application
defmodule MainApp.Application do
def start(_type, _args) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:shared_pubsub),
MainApp.SomeService
]
|> Enum.filter(& &1)
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Dependency Application
defmodule DepApp.Application do
def start(_type, _args) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:shared_pubsub), # Same name!
DepApp.SomeWorker
]
|> Enum.filter(& &1)
Supervisor.start_link(children, strategy: :one_for_one)
end
end
Result: Only one PubSub instance is started, shared by both applications.
Case 2: Microservices Sharing PubSub
When multiple services in the same VM need to share a PubSub instance:
# Service A
defmodule ServiceA.Application do
def start(_type, _args) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:cluster_pubsub),
ServiceA.MessageHandler
]
|> Enum.filter(& &1)
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Service B
defmodule ServiceB.Application do
def start(_type, _args) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:cluster_pubsub), # Reuses existing
ServiceB.EventProcessor
]
|> Enum.filter(& &1)
Supervisor.start_link(children, strategy: :one_for_one)
end
end
Case 3: Development vs Production
Use different PubSub configurations based on environment:
defmodule MyApp.Application do
def start(_type, _args) do
pubsub_name = Application.get_env(:my_app, :pubsub_name, :my_app_pubsub)
children = [
BCUtils.PubSubManager.maybe_child_spec(pubsub_name),
MyApp.Worker
]
|> Enum.filter(& &1)
Supervisor.start_link(children, strategy: :one_for_one)
end
end
Config:
# config/dev.exs
config :my_app, pubsub_name: :dev_pubsub
# config/prod.exs
config :my_app, pubsub_name: :prod_pubsub
# config/test.exs
config :my_app, pubsub_name: :test_pubsub
Advanced Usage
Custom PubSub Options
Pass additional options to Phoenix.PubSub:
children = [
BCUtils.PubSubManager.maybe_child_spec(
:my_pubsub,
adapter: Phoenix.PubSub.PG2,
pool_size: 10
)
]
|> Enum.filter(& &1)
Health Monitoring
Add PubSub health checks to your application monitoring:
defmodule MyApp.HealthCheck do
def check_pubsub do
case BCUtils.PubSubManager.health_check(:my_pubsub) do
:ok ->
%{status: :healthy, timestamp: DateTime.utc_now()}
{:error, reason} ->
%{status: :unhealthy, reason: reason, timestamp: DateTime.utc_now()}
end
end
end
Conditional Logic
Handle cases where PubSub availability affects application behavior:
defmodule MyApp.Worker do
def start_link(opts) do
pubsub_available = BCUtils.PubSubManager.already_started?(:my_pubsub)
if pubsub_available do
GenServer.start_link(__MODULE__, {:with_pubsub, opts}, name: __MODULE__)
else
GenServer.start_link(__MODULE__, {:without_pubsub, opts}, name: __MODULE__)
end
end
def init({:with_pubsub, opts}) do
Phoenix.PubSub.subscribe(:my_pubsub, "important_events")
{:ok, %{pubsub: true, opts: opts}}
end
def init({:without_pubsub, opts}) do
{:ok, %{pubsub: false, opts: opts}}
end
end
Testing
Unit Tests
Test PubSub manager functionality:
defmodule BCUtils.PubSubManagerTest do
use ExUnit.Case
test "maybe_child_spec returns nil when PubSub already running" do
# Start PubSub manually
{:ok, _pid} = Phoenix.PubSub.start_link(name: :test_pubsub)
# Should return nil since it's already running
assert BCUtils.PubSubManager.maybe_child_spec(:test_pubsub) == nil
# Cleanup
GenServer.stop(:test_pubsub)
end
test "maybe_child_spec returns child spec when PubSub not running" do
# Should return child spec
child_spec = BCUtils.PubSubManager.maybe_child_spec(:not_running_pubsub)
assert {Phoenix.PubSub, _opts} = child_spec
end
end
Integration Tests
Test application startup with shared PubSub:
defmodule MyApp.IntegrationTest do
use ExUnit.Case
test "multiple applications can share PubSub" do
# Start first application
{:ok, _pid1} = MyApp.Application.start(:normal, [])
# Start second application (should not conflict)
{:ok, _pid2} = MyOtherApp.Application.start(:normal, [])
# Verify both can use PubSub
assert BCUtils.PubSubManager.already_started?(:shared_pubsub)
assert :ok = BCUtils.PubSubManager.health_check(:shared_pubsub)
end
end
Error Handling
When Phoenix.PubSub is Not Available
If phoenix_pubsub
is not included in dependencies:
# Returns nil gracefully
child_spec = BCUtils.PubSubManager.maybe_child_spec(:my_pubsub)
assert child_spec == nil
# Returns error
{:error, :phoenix_pubsub_not_available} =
BCUtils.PubSubManager.ensure_started(:my_pubsub)
Recovery from Failed PubSub
Handle cases where PubSub fails after startup:
defmodule MyApp.PubSubSupervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
children = [
BCUtils.PubSubManager.maybe_child_spec(:my_pubsub),
{MyApp.PubSubMonitor, :my_pubsub}
]
|> Enum.filter(& &1)
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule MyApp.PubSubMonitor do
use GenServer
def start_link(pubsub_name) do
GenServer.start_link(__MODULE__, pubsub_name)
end
def init(pubsub_name) do
schedule_health_check()
{:ok, %{pubsub_name: pubsub_name}}
end
def handle_info(:health_check, %{pubsub_name: name} = state) do
case BCUtils.PubSubManager.health_check(name) do
:ok ->
:ok
{:error, _reason} ->
Logger.warning("PubSub #{name} unhealthy, attempting restart")
BCUtils.PubSubManager.ensure_started(name)
end
schedule_health_check()
{:noreply, state}
end
defp schedule_health_check do
Process.send_after(self(), :health_check, 30_000) # 30 seconds
end
end
Migration Guide
From Manual Conflict Handling
If you previously handled PubSub conflicts manually:
Before:
def start_pubsub(name) do
case Phoenix.PubSub.start_link(name: name) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
error -> error
end
end
After:
# In supervision tree
BCUtils.PubSubManager.maybe_child_spec(name)
# Or for manual start
BCUtils.PubSubManager.ensure_started(name)
From Hard-coded PubSub
Replace direct Phoenix.PubSub references:
Before:
children = [
{Phoenix.PubSub, name: :my_pubsub},
MyApp.Worker
]
After:
children = [
BCUtils.PubSubManager.maybe_child_spec(:my_pubsub),
MyApp.Worker
]
|> Enum.filter(& &1)
Best Practices
1. Consistent Naming
Use consistent PubSub names across your applications:
# Good: Use application-specific names
:my_app_pubsub
:user_service_pubsub
:billing_pubsub
# Better: Use configurable names
Application.get_env(:my_app, :pubsub_name, :my_app_pubsub)
2. Resource Management
Monitor PubSub resource usage:
defmodule MyApp.PubSubMetrics do
def collect_metrics do
pubsubs = BCUtils.PubSubManager.list_running()
Enum.map(pubsubs, fn name ->
%{
name: name,
status: BCUtils.PubSubManager.health_check(name),
pid: Process.whereis(name),
message_count: get_message_count(name)
}
end)
end
defp get_message_count(name) do
case Process.whereis(name) do
nil -> 0
pid ->
{:message_queue_len, count} = Process.info(pid, :message_queue_len)
count
end
end
end
3. Graceful Degradation
Design your application to work without PubSub when necessary:
defmodule MyApp.EventBus do
def broadcast(topic, message) do
if BCUtils.PubSubManager.already_started?(:my_pubsub) do
Phoenix.PubSub.broadcast(:my_pubsub, topic, message)
else
Logger.warning("PubSub not available, message not broadcast: #{inspect(message)}")
:pubsub_unavailable
end
end
def subscribe(topic) do
if BCUtils.PubSubManager.already_started?(:my_pubsub) do
Phoenix.PubSub.subscribe(:my_pubsub, topic)
else
Logger.warning("PubSub not available, cannot subscribe to: #{topic}")
{:error, :pubsub_unavailable}
end
end
end
Conclusion
BCUtils.PubSubManager
provides a robust solution for handling Phoenix.PubSub conflicts in multi-application environments. By using the conditional startup pattern, you can:
- Eliminate
:already_started
errors - Share PubSub instances efficiently
- Maintain clean, conflict-free supervision trees
- Add health monitoring and diagnostics
- Ensure graceful degradation when PubSub is unavailable
The utility is designed to be transparent to your application logic while providing the flexibility to handle complex deployment scenarios.