ExESDB PubSub Quick Reference
This is a quick reference guide for the ExESDB PubSub integration with ExESDBGater.
Quick Setup
1. Enable Integration
# config/config.exs
config :ex_esdb,
pubsub_integration: true,
health_broadcast_interval: 30_000,
metrics_broadcast_interval: 60_000
# Optional: Configure message security
config :ex_esdb_gater,
secret_key_base: System.get_env("SECRET_KEY_BASE")
2. Runtime Control
# Enable/disable at runtime
ExESDB.PubSubIntegration.enable!()
ExESDB.PubSubIntegration.disable!()
# Check status
ExESDB.PubSubIntegration.enabled?()
Broadcasting Events
System Events
# System lifecycle
ExESDB.PubSubIntegration.broadcast_system_lifecycle(:started, :ex_esdb, "0.8.0")
# Configuration changes
ExESDB.PubSubIntegration.broadcast_system_config(:store, %{enabled: true})
Health Events
# Component health
ExESDB.PubSubIntegration.broadcast_health_update(:store_worker, :healthy, %{store_id: "main"})
# Cluster health
ExESDB.PubSubIntegration.broadcast_cluster_health(nodes, failed_nodes, :node_down)
Performance Metrics
# Generic metrics
ExESDB.PubSubIntegration.broadcast_metrics(:persistence, %{
operations_count: 1000,
duration_ms: 250,
success_count: 995,
error_count: 5
})
Alerts
# System alerts
ExESDB.PubSubIntegration.broadcast_alert(:node_failure, :critical, "Node down", %{node: :node1})
Subscribing to Events
Basic Subscription
# Subscribe to specific topics
Phoenix.PubSub.subscribe(:ex_esdb_system, "lifecycle")
Phoenix.PubSub.subscribe(:ex_esdb_health, "cluster_health")
Phoenix.PubSub.subscribe(:ex_esdb_metrics, "persistence")
Phoenix.PubSub.subscribe(:ex_esdb_alerts, "node_failure")
LiveView Integration
defmodule MyApp.DashboardLive do
use Phoenix.LiveView
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(:ex_esdb_health, "cluster_health")
Phoenix.PubSub.subscribe(:ex_esdb_alerts, "node_failure")
end
{:ok, assign(socket, :status, %{})}
end
def handle_info({:node_health_updated, payload}, socket) do
{:noreply, assign(socket, :status, payload)}
end
def handle_info({:system_alert, alert}, socket) do
{:noreply, put_flash(socket, :error, alert.message)}
end
end
GenServer Integration
defmodule MyApp.MonitoringWorker do
use GenServer
def init(_) do
Phoenix.PubSub.subscribe(:ex_esdb_metrics, "performance")
Phoenix.PubSub.subscribe(:ex_esdb_alerts, "node_failure")
{:ok, %{}}
end
def handle_info({:performance_metric, metric}, state) do
# Process metric
{:noreply, state}
end
def handle_info({:system_alert, alert}, state) do
# Handle alert
{:noreply, state}
end
end
Message Validation
Secure Messages
def handle_info(raw_message, state) do
case ExESDBGater.Messages.HealthMessages.validate_secure_message(raw_message) do
{:ok, {:node_health_updated, payload}} ->
handle_health_update(payload, state)
{:error, :invalid_signature} ->
Logger.warning("Invalid message signature")
{:noreply, state}
{:error, :no_secret_configured} ->
Logger.warning("No secret configured for validation")
{:noreply, state}
end
end
PubSub Instances Reference
Instance | Topics | Message Types |
---|---|---|
:ex_esdb_system | "lifecycle", "config" | SystemLifecycle, SystemConfig |
:ex_esdb_health | "cluster_health", "component_health" | NodeHealth, ClusterHealth |
:ex_esdb_metrics | "performance", "persistence" | PerformanceMetric, ThroughputMetric |
:ex_esdb_alerts | "node_failure", "persistence_failure" | SystemAlert |
:ex_esdb_audit | "data_change", "admin_action" | DataChange, AdminAction |
:ex_esdb_diagnostics | "debug_trace", "performance_analysis" | DebugTrace |
:ex_esdb_lifecycle | "process_lifecycle" | ProcessLifecycle |
:ex_esdb_security | "auth_event", "access_violation" | AuthEvent |
:ex_esdb_logging | "log_entry", "log_summary" | LogEntry |
Common Message Patterns
Health Messages
# Node health
{:node_health_updated, %HealthMessages.NodeHealth{
node: :node1,
status: :healthy,
checks: %{persistence: :pass, network: :pass},
memory_usage: 45.2,
timestamp: ~U[2024-08-15 12:00:00Z]
}}
# Cluster health
{:cluster_health_updated, %HealthMessages.ClusterHealth{
status: :healthy,
healthy_nodes: 3,
total_nodes: 4,
degraded_nodes: [],
unhealthy_nodes: [:node4],
timestamp: ~U[2024-08-15 12:00:00Z]
}}
Metric Messages
# Performance metric
{:performance_metric, %MetricsMessages.PerformanceMetric{
metric_name: :persistence_operation,
value: 1000,
unit: "operations",
component: :persistence,
node: :node1,
timestamp: ~U[2024-08-15 12:00:00Z]
}}
# Throughput metric
{:throughput_metric, %MetricsMessages.ThroughputMetric{
operation: :event_emission,
count: 1250,
duration_ms: 1000,
rate_per_sec: 1250.0,
node: :node1,
timestamp: ~U[2024-08-15 12:00:00Z]
}}
Alert Messages
# System alert
{:system_alert, %AlertMessages.SystemAlert{
alert_type: :node_failure,
severity: :critical,
message: "Node node2 has become unresponsive",
context: %{failed_node: :node2, cluster_size: 3},
timestamp: ~U[2024-08-15 12:00:00Z]
}}
Troubleshooting
Common Issues
Messages not received
- Check topic names match exactly
- Verify PubSub instance names
- Ensure processes are still alive
Invalid signature errors
- Check SECRET_KEY_BASE configuration
- Verify same secret across all nodes
- Check clock synchronization
High memory usage
- Reduce broadcast intervals
- Implement selective subscription
- Monitor message queue lengths
Debug Commands
# Check integration status
ExESDB.PubSubIntegration.enabled?()
ExESDB.PubSubIntegration.health_broadcast_interval()
ExESDB.PubSubIntegration.metrics_broadcast_interval()
# List subscribers
Phoenix.PubSub.subscribers(:ex_esdb_health, "cluster_health")
# Check process info
Process.info(self(), [:message_queue_len, :memory])
Performance Tips
- Use topic-based filtering to reduce processing
- Implement message batching for high frequency events
- Set appropriate broadcast intervals
- Monitor subscriber performance
- Use structured logging for debugging