eventsourcing_glyn
Types
Configuration for Glyn-based distributed event sourcing.
This configuration determines how services discover each other and coordinate event distribution across the cluster.
Fields
pubsub_scope: A unique identifier for the PubSub topic namespace. All services participating in the same event sourcing cluster should use the same scope.pubsub_group: The subscriber group name used for service discovery. Query actors subscribe to this group to receive distributed events.
Example
let config = GlynConfig(
  pubsub_scope: "banking_events",
  pubsub_group: "banking_services"
)
pub type GlynConfig {
  GlynConfig(pubsub_scope: String, pubsub_group: String)
}
      
      Constructors
- 
          
GlynConfig(pubsub_scope: String, pubsub_group: String) 
A Glyn-enhanced event store that wraps an existing event store implementation and adds distributed PubSub capabilities for query actors and event broadcasting.
This opaque type maintains the same interface as the underlying event store while transparently adding distributed features:
- Events committed to this store are automatically broadcast to all query subscribers
 - Query actors across multiple nodes receive events via Glyn PubSub
 - All event store operations are proxied to the underlying implementation
 
Type Parameters
underlying_store: The type of the wrapped event storeentity: The aggregate entity typecommand: The command type for this aggregateevent: The event type for this aggregateerror: The error type for this aggregatetransaction_handle: The transaction handle type of the underlying store
pub opaque type GlynStore(underlying_store, entity, command, event, error, transaction_handle)
      
    Values
pub fn publish(
  glyn_store: GlynStore(
    underlying_store,
    entity,
    command,
    event,
    error,
    transaction_handle,
  ),
  group: String,
  message: eventsourcing.QueryMessage(event),
) -> Result(Int, eventsourcing.EventSourcingError(error))
    
    Manually publish an event message to distributed query subscribers.
This function allows you to send custom messages to query actors without going through the normal event store commit process. This is useful for:
- Broadcasting system events or notifications
 - Triggering query actor maintenance or refresh operations
 - Testing distributed event handling
 
Example
import eventsourcing_glyn
import eventsourcing.{ProcessEvents}
// Send a custom event to all query actors
let message = ProcessEvents("system", [maintenance_event])
case eventsourcing_glyn.publish(glyn_store, "bank_services", message) {
  Ok(count) -> io.println("Notified " <> int.to_string(count) <> " subscribers")
  Error(error) -> io.println("Failed to publish: " <> string.inspect(error))
}
Important Notes
- Normal Usage: Most events should go through 
eventsourcing.execute()for proper persistence - Manual Publishing: Only use this for system events or testing scenarios
 - Group Matching: The group parameter should typically match your 
GlynConfig.pubsub_group - Return Value: The count indicates how many query actors received the message
 
pub fn subscribers(
  glyn_store: GlynStore(
    underlying_store,
    entity,
    command,
    event,
    error,
    transaction_handle,
  ),
) -> List(process.Pid)
    
    Get the list of subscribers to this Glyn event store’s PubSub scope.
This function returns information about all query actors currently subscribed to receive events from this distributed event store across all connected nodes.
Example
let subscriber_info = eventsourcing_glyn.subscribers(glyn_eventstore)
// Use for monitoring or debugging distributed event flow
Use Cases
- Health Monitoring: Check if expected query actors are connected
 - Debugging: Verify event distribution is working correctly
 - Load Balancing: Understand the distribution of query actors
 - Troubleshooting: Identify disconnected or failed nodes
 
pub fn supervised(
  config: GlynConfig,
  underlying_store: eventsourcing.EventStore(
    underlying_store,
    entity,
    command,
    event,
    error,
    transaction_handle,
  ),
  queries: List(
    #(
      process.Name(eventsourcing.QueryMessage(event)),
      fn(String, List(eventsourcing.EventEnvelop(event))) -> Nil,
    ),
  ),
  event_decoder: decode.Decoder(event),
) -> Result(
  #(
    eventsourcing.EventStore(
      GlynStore(
        underlying_store,
        entity,
        command,
        event,
        error,
        transaction_handle,
      ),
      entity,
      command,
      event,
      error,
      transaction_handle,
    ),
    supervision.ChildSpecification(static_supervisor.Supervisor),
  ),
  String,
)
    
    Creates a supervised Glyn-enhanced event store that wraps an existing event store and adds distributed PubSub capabilities for query actors and event broadcasting.
This function creates a complete supervision tree that includes:
- Query actors that automatically subscribe to Glyn PubSub for event notifications
 - The enhanced event store that publishes committed events to all subscribers
 - Integration with Glyn’s distributed coordination system
 - Fault-tolerant supervision of all distributed components
 
Example
import eventsourcing_glyn
import eventsourcing/memory_store
import gleam/otp/static_supervisor
// 1. Set up underlying store
let #(underlying_store, memory_spec) = memory_store.supervised(
  process.new_name("events"),
  process.new_name("snapshots"), 
  static_supervisor.OneForOne
)
// 2. Configure distributed capabilities
let config = eventsourcing_glyn.GlynConfig(
  pubsub_scope: "bank_events",
  pubsub_group: "bank_services"
)
// 3. Define query actors
let queries = [
  #(process.new_name("balance_query"), balance_query_fn),
  #(process.new_name("audit_query"), audit_query_fn),
]
// 4. Create distributed event store
let assert Ok(#(glyn_eventstore, glyn_spec)) = eventsourcing_glyn.supervised(
  config,
  underlying_store,
  queries,
  my_event_decoder()
)
// 5. Start supervision tree
let assert Ok(_) =
  static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(memory_spec)
  |> static_supervisor.add(glyn_spec)
  |> static_supervisor.start()
Query Actor Behavior
Each query actor receives a QueryMessage(event) containing:
ProcessEvents(aggregate_id, events): List of events to process
Query functions have the signature:
fn(aggregate_id: String, events: List(EventEnvelop(event))) -> Nil
Distributed Event Flow
- Service A commits events via 
eventsourcing.execute() - Events are persisted to the underlying store
 - Events are automatically broadcast via Glyn PubSub
 - All query actors across all nodes receive and process the events
 - Query actors update their projections/views accordingly