ExESDB Gater PubSub Architecture
Overview
ExESDB Gater implements a comprehensive PubSub (Publish-Subscribe) architecture using multiple dedicated Phoenix PubSub instances. This design provides superior decoupling, observability, and system resilience by segregating different types of events into specialized communication channels.
Architecture Benefits
1. Separation of Concerns
Each PubSub instance handles a specific type of event, preventing cross-contamination and ensuring that consumers only receive the events they need. This reduces noise and improves system clarity.
2. Independent Scaling
Different event types have different volume characteristics and consumer patterns. Separate instances allow for independent tuning, monitoring, and scaling based on specific requirements.
3. Selective Subscription
Consumers can subscribe only to the event types they care about, reducing unnecessary message processing and improving performance.
4. Fault Isolation
Issues with one type of event (e.g., high-volume metrics) won't affect other critical systems (e.g., security alerts), improving overall system resilience.
5. Independent Configuration
Each PubSub instance can have different retention policies, routing strategies, and performance characteristics optimized for its specific use case.
PubSub Instances
Core Event Data
Instance: :ex_esdb_events
Purpose: Handles core business events and domain data flowing through the ExESDB system.
Typical Topics:
event_stream:#{stream_id}
projection_updates:#{projection_name}
aggregate_events:#{aggregate_id}
Use Cases:
- Event sourcing data distribution
- Real-time projection updates
- Inter-service communication for business logic
Volume: High - Core business events Criticality: High - Essential for business operations
System Events
Instance: :ex_esdb_system
Purpose: General system-level events that don't fit into other specific categories.
Typical Topics:
system_status:#{component}
configuration_changes:#{service}
service_discovery:#{node}
Use Cases:
- Service coordination
- Configuration management
- General system notifications
Volume: Medium - Periodic system events Criticality: Medium - Important for system coordination
Log Aggregation
Instance: :ex_esdb_logging
Purpose: Centralized log message distribution and aggregation across the system.
Typical Topics:
logs:#{service}:#{level}
structured_logs:#{component}
error_reports:#{service}
Use Cases:
- Centralized logging
- Log analysis and monitoring
- Debugging and troubleshooting
Volume: Very High - Continuous log streams Criticality: Medium - Important for observability
Health Monitoring
Instance: :ex_esdb_health
Purpose: Health status and monitoring events for system components and processes.
Typical Topics:
subscription_health:#{store_id}:#{subscription_name}
process_health:#{component}
circuit_breaker_status:#{service}
Use Cases:
- Subscription proxy health monitoring
- Circuit breaker status tracking
- Component health dashboards
- Automated health checks
Volume: Medium - Regular health check events Criticality: High - Critical for system reliability
Example Event:
%{
store_id: "production_store",
subscription_name: "user_projection",
event_type: :registration_success,
timestamp: 1672531200000,
metadata: %{
proxy_pid: #PID<0.123.0>,
subscriber_pid: #PID<0.124.0>,
registration_time: 1672531200000
}
}
Performance Metrics
Instance: :ex_esdb_metrics
Purpose: Performance metrics, statistics, and measurement data for system optimization.
Typical Topics:
subscription_metrics:#{store_id}:#{subscription_name}
performance_metrics:#{component}
throughput_stats:#{service}
Use Cases:
- Performance monitoring
- Capacity planning
- SLA compliance tracking
- System optimization
Volume: Very High - Continuous metrics streams Criticality: Medium - Important for optimization
Example Event:
%{
store_id: "production_store",
subscription_name: "user_projection",
event_type: :registration_attempt,
timestamp: 1672531200000,
metadata: %{
result: :ok,
source: :subscription_metrics,
processing_time_ms: 45
}
}
Security Events
Instance: :ex_esdb_security
Purpose: Security-related events including authentication, authorization, and threat detection.
Typical Topics:
security_events:authentication
security_events:authorization
threat_detection:#{service}
access_violations:#{resource}
Use Cases:
- Security monitoring
- Threat detection
- Compliance reporting
- Incident response
Volume: Low to Medium - Security events as they occur Criticality: Very High - Critical for security
Example Event:
%{
event_type: :authentication_failure,
user_id: "suspicious_user",
ip_address: "***REDACTED***",
timestamp: 1672531200000,
reason: :invalid_credentials,
metadata: %{
attempt_count: 5,
user_agent: "curl/7.68.0"
}
}
Audit Trail
Instance: :ex_esdb_audit
Purpose: Audit trail events for compliance, governance, and regulatory requirements.
Typical Topics:
audit_trail:user_actions
audit_trail:admin_operations
compliance_events:#{regulation}
Use Cases:
- Regulatory compliance
- Audit reporting
- Governance tracking
- Legal requirements
Volume: Medium - User and admin actions Criticality: Very High - Required for compliance
Example Event:
%{
event_type: :user_action,
user_id: "admin_user",
action: :delete_subscription,
resource: "subscription_123",
timestamp: 1672531200000,
metadata: %{
store_id: "production_store",
ip_address: "***REDACTED***",
session_id: "sess_abc123"
}
}
Critical Alerts
Instance: :ex_esdb_alerts
Purpose: High-priority alerts and notifications that require immediate attention.
Typical Topics:
system_alerts:critical
system_alerts:warning
incident_alerts:#{severity}
Use Cases:
- Incident management
- Pager duty integration
- Emergency notifications
- SLA breach alerts
Volume: Low - Only critical situations Criticality: Very High - Requires immediate action
Example Event:
%{
event_type: :critical_alert,
severity: :high,
component: :subscription_system,
message: "Circuit breaker opened for multiple subscriptions",
timestamp: 1672531200000,
metadata: %{
affected_subscriptions: 5,
estimated_impact: "50% of real-time projections offline",
escalation_required: true
}
}
Diagnostic Information
Instance: :ex_esdb_diagnostics
Purpose: Deep diagnostic information for debugging, profiling, and system analysis.
Typical Topics:
diagnostics:performance_trace
diagnostics:memory_analysis
diagnostics:connection_pool
Use Cases:
- Performance profiling
- Memory analysis
- Connection monitoring
- Deep system debugging
Volume: Variable - Can be very high during debugging Criticality: Low - Diagnostic purposes only
Process Lifecycle
Instance: :ex_esdb_lifecycle
Purpose: Process lifecycle events including starts, stops, crashes, and supervision tree changes.
Typical Topics:
process_lifecycle:subscription_proxies
process_lifecycle:supervisors
process_lifecycle:#{component}
Use Cases:
- Process monitoring
- Supervision tree analysis
- Crash detection and recovery
- System administration
Volume: Medium - Process lifecycle events Criticality: High - Important for system stability
Example Event:
%{
event_type: :process_started,
process_type: :subscription_proxy,
pid: #PID<0.789.0>,
timestamp: 1672531200000,
metadata: %{
store_id: "production_store",
subscription_name: "user_projection",
supervisor: #PID<0.456.0>,
restart_count: 0
}
}
Consumer Patterns
Health Dashboard Consumer
Phoenix.PubSub.subscribe(:ex_esdb_health, "subscription_health:*")
Phoenix.PubSub.subscribe(:ex_esdb_lifecycle, "process_lifecycle:*")
Security Operations Center (SOC)
Phoenix.PubSub.subscribe(:ex_esdb_security, "security_events:*")
Phoenix.PubSub.subscribe(:ex_esdb_audit, "audit_trail:*")
Phoenix.PubSub.subscribe(:ex_esdb_alerts, "system_alerts:critical")
Performance Monitoring
Phoenix.PubSub.subscribe(:ex_esdb_metrics, "subscription_metrics:*")
Phoenix.PubSub.subscribe(:ex_esdb_diagnostics, "diagnostics:performance_trace")
Incident Response
Phoenix.PubSub.subscribe(:ex_esdb_alerts, "system_alerts:*")
Phoenix.PubSub.subscribe(:ex_esdb_health, "subscription_health:*")
Phoenix.PubSub.subscribe(:ex_esdb_lifecycle, "process_lifecycle:*")
Implementation Details
PubSub Manager
The system uses ExESDBGater.PubSubManager
to ensure that PubSub instances are created only once and shared across multiple supervisors. This prevents duplication and ensures consistent behavior.
Topic Naming Conventions
- Use colons (
:
) to separate topic hierarchy levels - Include relevant identifiers (store_id, subscription_name, etc.)
- Use consistent naming patterns within each instance
Message Format
All events should follow a consistent structure:
{:event_type, %{
# Common fields
timestamp: System.system_time(:millisecond),
# Event-specific data
...
}}
Error Handling
All PubSub operations are wrapped in try-catch blocks to prevent failures from affecting the primary business logic. Failed publications are logged but don't interrupt normal operation.
Monitoring and Observability
Metrics to Track
- Message throughput per instance
- Consumer lag and processing times
- Failed publications
- Instance availability and health
Alerting
- High message volumes that might indicate issues
- Failed publications above threshold
- Instance unavailability
- Consumer disconnections
Best Practices
For Publishers
- Always use appropriate PubSub instance for event type
- Include comprehensive metadata for debugging
- Handle publication failures gracefully
- Use consistent topic naming
For Consumers
- Subscribe only to necessary topics
- Handle message processing failures gracefully
- Implement backpressure mechanisms for high-volume streams
- Use appropriate acknowledgment patterns
For Operations
- Monitor each instance independently
- Set up appropriate alerting thresholds
- Regular health checks for all instances
- Capacity planning based on volume patterns
Migration and Compatibility
When adding new event types or modifying existing ones:
- Use additive changes when possible
- Maintain backward compatibility
- Version event schemas appropriately
- Coordinate changes with consumer teams
Testing
The system includes comprehensive tests for:
- Instance isolation and independence
- Message routing and delivery
- Error handling and resilience
- Performance characteristics
- Typical usage patterns
See test/ex_esdb_gater/pubsub_instances_test.exs
for detailed test coverage.