eventsourcing

Types

pub type Aggregate(entity, command, event, error) {
  Aggregate(aggregate_id: String, entity: entity, sequence: Int)
}

Constructors

  • Aggregate(aggregate_id: String, entity: entity, sequence: Int)
pub type AggregateActorState(eventstore, entity, command, event, error, transaction_handle) {
  AggregateActorState(
    aggregate: Aggregate(entity, command, event, error),
    eventsourcing: EventSourcing(
      eventstore,
      entity,
      command,
      event,
      error,
      transaction_handle,
    ),
  )
}

Constructors

  • AggregateActorState(
      aggregate: Aggregate(entity, command, event, error),
      eventsourcing: EventSourcing(
        eventstore,
        entity,
        command,
        event,
        error,
        transaction_handle,
      ),
    )
pub type AggregateId =
  String
pub type AggregateMessage(entity, command, event, error) {
  ExecuteCommand(
    aggregate_id: String,
    command: command,
    metadata: List(#(String, String)),
  )
  RegisterQueryActor(QueryActor(event))
  LoadAggregate(
    aggregate_id: String,
    reply_to: process.Subject(
      Result(
        Aggregate(entity, command, event, error),
        EventSourcingError(error),
      ),
    ),
  )
  LoadAllEvents(
    aggregate_id: String,
    reply_to: process.Subject(
      Result(List(EventEnvelop(event)), EventSourcingError(error)),
    ),
  )
  LoadEvents(
    aggregate_id: String,
    start_from: Int,
    reply_to: process.Subject(
      Result(List(EventEnvelop(event)), EventSourcingError(error)),
    ),
  )
  LoadLatestSnapshot(
    aggregate_id: String,
    reply_to: process.Subject(
      Result(
        option.Option(Snapshot(entity)),
        EventSourcingError(error),
      ),
    ),
  )
  GetSystemStats(reply_to: process.Subject(SystemStats))
  GetAggregateStats(
    aggregate_id: String,
    reply_to: process.Subject(
      Result(AggregateStats, EventSourcingError(error)),
    ),
  )
}

Constructors

pub type AggregateStats {
  AggregateStats(
    aggregate_id: String,
    event_count: Int,
    current_sequence: Int,
    has_snapshot: Bool,
  )
}

Constructors

  • AggregateStats(
      aggregate_id: String,
      event_count: Int,
      current_sequence: Int,
      has_snapshot: Bool,
    )
pub type EventEnvelop(event) {
  MemoryStoreEventEnvelop(
    aggregate_id: String,
    sequence: Int,
    payload: event,
    metadata: List(#(String, String)),
  )
  SerializedEventEnvelop(
    aggregate_id: String,
    sequence: Int,
    payload: event,
    metadata: List(#(String, String)),
    event_type: String,
    event_version: String,
    aggregate_type: String,
  )
}

Constructors

  • MemoryStoreEventEnvelop(
      aggregate_id: String,
      sequence: Int,
      payload: event,
      metadata: List(#(String, String)),
    )
  • SerializedEventEnvelop(
      aggregate_id: String,
      sequence: Int,
      payload: event,
      metadata: List(#(String, String)),
      event_type: String,
      event_version: String,
      aggregate_type: String,
    )
pub opaque type EventSourcing(eventstore, entity, command, event, error, transaction_handle)
pub type EventSourcingError(domainerror) {
  DomainError(domainerror)
  EventStoreError(String)
  NonPositiveArgument
  EntityNotFound
  TransactionFailed
  TransactionRolledBack
  ActorTimeout(operation: String, timeout_ms: Int)
}

Constructors

  • DomainError(domainerror)
  • EventStoreError(String)
  • NonPositiveArgument
  • EntityNotFound
  • TransactionFailed
  • TransactionRolledBack
  • ActorTimeout(operation: String, timeout_ms: Int)
pub type EventStore(eventstore, entity, command, event, error, transaction_handle) {
  EventStore(
    execute_transaction: fn(
      fn(transaction_handle) -> Result(
        Nil,
        EventSourcingError(error),
      ),
    ) -> Result(Nil, EventSourcingError(error)),
    load_aggregate_transaction: fn(
      fn(transaction_handle) -> Result(
        Aggregate(entity, command, event, error),
        EventSourcingError(error),
      ),
    ) -> Result(
      Aggregate(entity, command, event, error),
      EventSourcingError(error),
    ),
    load_events_transaction: fn(
      fn(transaction_handle) -> Result(
        List(EventEnvelop(event)),
        EventSourcingError(error),
      ),
    ) -> Result(
      List(EventEnvelop(event)),
      EventSourcingError(error),
    ),
    get_latest_snapshot_transaction: fn(
      fn(transaction_handle) -> Result(
        option.Option(Snapshot(entity)),
        EventSourcingError(error),
      ),
    ) -> Result(
      option.Option(Snapshot(entity)),
      EventSourcingError(error),
    ),
    commit_events: fn(
      transaction_handle,
      Aggregate(entity, command, event, error),
      List(event),
      List(#(String, String)),
    ) -> Result(
      #(List(EventEnvelop(event)), Int),
      EventSourcingError(error),
    ),
    load_events: fn(eventstore, transaction_handle, String, Int) -> Result(
      List(EventEnvelop(event)),
      EventSourcingError(error),
    ),
    load_snapshot: fn(transaction_handle, String) -> Result(
      option.Option(Snapshot(entity)),
      EventSourcingError(error),
    ),
    save_snapshot: fn(transaction_handle, Snapshot(entity)) -> Result(
      Nil,
      EventSourcingError(error),
    ),
    eventstore: eventstore,
  )
}

Constructors

pub opaque type Frequency
pub type ManagerMessage(entity, command, event, error) {
  QueryActorStarted(QueryActor(event))
  GetEventSourcingActor(
    reply_to: process.Subject(
      process.Subject(
        AggregateMessage(entity, command, event, error),
      ),
    ),
  )
}

Constructors

pub type ManagerState(eventstore, entity, command, event, error, transaction_handle) {
  ManagerState(
    eventsourcing_actor: option.Option(
      process.Subject(
        AggregateMessage(entity, command, event, error),
      ),
    ),
    expected_queries: Int,
    registered_queries: Int,
    eventstore: EventStore(
      eventstore,
      entity,
      command,
      event,
      error,
      transaction_handle,
    ),
    handle: fn(entity, command) -> Result(List(event), error),
    apply: fn(entity, event) -> entity,
    empty_state: entity,
  )
}

Constructors

  • ManagerState(
      eventsourcing_actor: option.Option(
        process.Subject(
          AggregateMessage(entity, command, event, error),
        ),
      ),
      expected_queries: Int,
      registered_queries: Int,
      eventstore: EventStore(
        eventstore,
        entity,
        command,
        event,
        error,
        transaction_handle,
      ),
      handle: fn(entity, command) -> Result(List(event), error),
      apply: fn(entity, event) -> entity,
      empty_state: entity,
    )
pub type QueryActor(event) {
  QueryActor(
    actor: actor.Started(process.Subject(QueryMessage(event))),
    query: fn(String, List(EventEnvelop(event))) -> Nil,
  )
}

Constructors

pub type QueryMessage(event) {
  ProcessEvents(
    aggregate_id: String,
    events: List(EventEnvelop(event)),
  )
}

Constructors

  • ProcessEvents(
      aggregate_id: String,
      events: List(EventEnvelop(event)),
    )
pub type Snapshot(entity) {
  Snapshot(
    aggregate_id: String,
    entity: entity,
    sequence: Int,
    timestamp: timestamp.Timestamp,
  )
}

Constructors

  • Snapshot(
      aggregate_id: String,
      entity: entity,
      sequence: Int,
      timestamp: timestamp.Timestamp,
    )
pub type SnapshotConfig {
  SnapshotConfig(snapshot_frequency: Frequency)
}

Constructors

  • SnapshotConfig(snapshot_frequency: Frequency)
pub type SystemStats {
  SystemStats(
    query_actors_count: Int,
    total_commands_processed: Int,
  )
}

Constructors

  • SystemStats(
      query_actors_count: Int,
      total_commands_processed: Int,
    )
pub opaque type Timeout

Values

pub fn aggregate_stats(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id: String,
) -> process.Subject(
  Result(AggregateStats, EventSourcingError(error)),
)

Gets statistics for a specific aggregate including event count and snapshot status. Returns a subject that will receive the AggregateStats result. Use process.receive() to get the result. Useful for debugging and monitoring individual aggregate health.

Example

let result = eventsourcing.aggregate_stats(actor, "bank-account-123")
let assert Ok(stats) = process.receive(result, 1000)
io.println("Events: " <> int.to_string(stats.event_count))
pub fn execute(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id: String,
  command: command,
) -> Nil

Executes a command against an aggregate in the event sourcing system. The command will be validated, events generated if successful, and the events will be persisted and sent to all registered query actors. Commands that violate business rules will be rejected without affecting system stability.

Example

eventsourcing.execute(actor, "bank-account-123", OpenAccount("123"))
// Command is processed asynchronously via message passing
pub fn execute_with_metadata(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id: String,
  command: command,
  metadata: List(#(String, String)),
) -> Nil

Executes a command against an aggregate with additional metadata. The metadata will be stored with the generated events and can be used for tracking, auditing, or enriching events with contextual information.

Example

let metadata = [("user_id", "alice"), ("session_id", "abc123")]
eventsourcing.execute_with_metadata(actor, "bank-123", DepositMoney(100.0), metadata)
pub fn frequency(
  n n: Int,
) -> Result(Frequency, EventSourcingError(a))

Creates a validated frequency value for snapshot configuration. Snapshots will be created every N events when this frequency is used. Ensures that frequency values are positive to prevent division by zero or infinite loops.

Example

let assert Ok(freq) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(freq)
pub fn latest_snapshot(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id aggregate_id: String,
) -> process.Subject(
  Result(
    option.Option(Snapshot(entity)),
    EventSourcingError(error),
  ),
)

Retrieves the most recent snapshot for an aggregate asynchronously if snapshots are enabled. Returns a subject that will receive the snapshot option. Snapshots provide a point-in-time capture of aggregate state for faster reconstruction. Use process.receive() to get the result.

Example

let result = eventsourcing.latest_snapshot(actor, "bank-account-123")
let assert Ok(Some(snapshot)) = process.receive(result, 1000)
// snapshot.entity contains the saved state, snapshot.sequence shows version
pub fn load_aggregate(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id: String,
) -> process.Subject(
  Result(
    Aggregate(entity, command, event, error),
    EventSourcingError(error),
  ),
)

Loads the current state of an aggregate asynchronously by replaying all its events. Returns a subject that will receive the aggregate with its current entity state and sequence number. Use process.receive() to get the result. Returns EntityNotFound error if aggregate doesn’t exist.

Example

let result = eventsourcing.load_aggregate(actor, "bank-account-123")
let assert Ok(aggregate) = process.receive(result, 1000)
// aggregate.entity contains current state, aggregate.sequence shows current version
pub fn load_events(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id: String,
) -> process.Subject(
  Result(List(EventEnvelop(event)), EventSourcingError(error)),
)

Loads all events for a specific aggregate asynchronously from the beginning. Returns a subject that will receive a chronologically ordered list of all events that have occurred for the aggregate. Use process.receive() to get the result.

Example

let result = eventsourcing.load_events(actor, "bank-account-123")
let assert Ok(events) = process.receive(result, 1000)
// events contains all EventEnvelop items for this aggregate
pub fn load_events_from(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  aggregate_id aggregate_id: String,
  start_from start_from: Int,
) -> process.Subject(
  Result(List(EventEnvelop(event)), EventSourcingError(error)),
)

Loads events for an aggregate asynchronously starting from a specific sequence number. Returns a subject that will receive the events list. Useful for pagination or continuing event processing from a known point. Use process.receive() to get the result.

Example

let result = eventsourcing.load_events_from(actor, "bank-account-123", start_from: 10)
let assert Ok(events) = process.receive(result, 1000)
// events contains EventEnvelop items starting from sequence 10
pub fn register_queries(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
  query_actors: List(QueryActor(event)),
) -> Nil

Registers query actors with the event sourcing system after supervisor startup. Queries must be added beforehand to the supervised() function to ensure their actors are started and supervised.

Example

let queries = [projection_query, analytics_query]
let assert Ok(query_actors) = list.try_map(queries, fn(_) { 
  process.receive(query_receiver, 1000) 
})
eventsourcing.register_queries(eventsourcing_actor, query_actors)
pub fn supervised(
  eventstore eventstore: EventStore(
    eventstore,
    entity,
    command,
    event,
    error,
    transaction_handle,
  ),
  handle handle: fn(entity, command) -> Result(List(event), error),
  apply apply: fn(entity, event) -> entity,
  empty_state empty_state: entity,
  queries queries: List(
    fn(String, List(EventEnvelop(event))) -> Nil,
  ),
  eventsourcing_actor_receiver eventsourcing_actor_receiver: process.Subject(
    actor.Started(
      process.Subject(
        AggregateMessage(entity, command, event, error),
      ),
    ),
  ),
  query_actors_receiver query_actors_receiver: process.Subject(
    QueryActor(event),
  ),
  snapshot_config snapshot_config: option.Option(SnapshotConfig),
) -> Result(
  supervision.ChildSpecification(static_supervisor.Supervisor),
  Nil,
)

Creates a supervised event sourcing architecture with fault tolerance. Sets up a supervision tree where the main event sourcing actor and query actors are managed by a supervisor that can restart them if they fail. This is the recommended approach for production applications.

Example

let assert Ok(eventstore) = memory_store.new()
let eventsourcing_actor_receiver = process.new_subject()
let query_actor_receiver = process.new_subject()
let assert Ok(spec) = eventsourcing.supervised(
  eventstore:,
  handle: my_handle,
  apply: my_apply,
  empty_state: MyState,
  queries: [],
  eventsourcing_actor_receiver:,
  query_actors_receiver:,
  snapshot_config: None
)
pub fn system_stats(
  eventsourcing_actor: actor.Started(
    process.Subject(
      AggregateMessage(entity, command, event, error),
    ),
  ),
) -> process.Subject(SystemStats)

Gets system statistics including uptime, command count, and query actor health. Returns a subject that will receive the SystemStats. Use process.receive() to get the result. Useful for monitoring system health and performance in production.

Example

let result = eventsourcing.system_stats(actor)
let assert Ok(stats) = process.receive(result, 1000)
io.println("Uptime: " <> int.to_string(stats.uptime_seconds) <> " seconds")
pub fn timeout(
  ms ms: Int,
) -> Result(Timeout, EventSourcingError(a))

Creates a validated timeout value for use with process operations. Ensures that timeout values are positive, preventing invalid configurations that could cause system operations to behave unexpectedly.

Example

let assert Ok(timeout) = eventsourcing.timeout(5000)
let result = process.receive(subject, timeout)
Search Document