eventsourcing
Types
pub type Aggregate(entity, command, event, error) {
Aggregate(aggregate_id: String, entity: entity, sequence: Int)
}
Constructors
-
Aggregate(aggregate_id: String, entity: entity, sequence: Int)
pub type AggregateActorState(eventstore, entity, command, event, error, transaction_handle) {
AggregateActorState(
aggregate: Aggregate(entity, command, event, error),
eventsourcing: EventSourcing(
eventstore,
entity,
command,
event,
error,
transaction_handle,
),
)
}
Constructors
-
AggregateActorState( aggregate: Aggregate(entity, command, event, error), eventsourcing: EventSourcing( eventstore, entity, command, event, error, transaction_handle, ), )
pub type AggregateId =
String
pub type AggregateMessage(entity, command, event, error) {
ExecuteCommand(
aggregate_id: String,
command: command,
metadata: List(#(String, String)),
)
LoadAggregate(
aggregate_id: String,
reply_to: process.Subject(
Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
),
)
LoadAllEvents(
aggregate_id: String,
reply_to: process.Subject(
Result(List(EventEnvelop(event)), EventSourcingError(error)),
),
)
LoadEvents(
aggregate_id: String,
start_from: Int,
reply_to: process.Subject(
Result(List(EventEnvelop(event)), EventSourcingError(error)),
),
)
LoadLatestSnapshot(
aggregate_id: String,
reply_to: process.Subject(
Result(
option.Option(Snapshot(entity)),
EventSourcingError(error),
),
),
)
GetSystemStats(reply_to: process.Subject(SystemStats))
GetAggregateStats(
aggregate_id: String,
reply_to: process.Subject(
Result(AggregateStats, EventSourcingError(error)),
),
)
}
Constructors
-
ExecuteCommand( aggregate_id: String, command: command, metadata: List(#(String, String)), )
-
LoadAggregate( aggregate_id: String, reply_to: process.Subject( Result( Aggregate(entity, command, event, error), EventSourcingError(error), ), ), )
-
LoadAllEvents( aggregate_id: String, reply_to: process.Subject( Result(List(EventEnvelop(event)), EventSourcingError(error)), ), )
-
LoadEvents( aggregate_id: String, start_from: Int, reply_to: process.Subject( Result(List(EventEnvelop(event)), EventSourcingError(error)), ), )
-
LoadLatestSnapshot( aggregate_id: String, reply_to: process.Subject( Result( option.Option(Snapshot(entity)), EventSourcingError(error), ), ), )
-
GetSystemStats(reply_to: process.Subject(SystemStats))
-
GetAggregateStats( aggregate_id: String, reply_to: process.Subject( Result(AggregateStats, EventSourcingError(error)), ), )
pub type AggregateStats {
AggregateStats(
aggregate_id: String,
event_count: Int,
current_sequence: Int,
has_snapshot: Bool,
)
}
Constructors
-
AggregateStats( aggregate_id: String, event_count: Int, current_sequence: Int, has_snapshot: Bool, )
pub type EventEnvelop(event) {
MemoryStoreEventEnvelop(
aggregate_id: String,
sequence: Int,
payload: event,
metadata: List(#(String, String)),
)
SerializedEventEnvelop(
aggregate_id: String,
sequence: Int,
payload: event,
metadata: List(#(String, String)),
event_type: String,
event_version: String,
aggregate_type: String,
)
}
Constructors
-
MemoryStoreEventEnvelop( aggregate_id: String, sequence: Int, payload: event, metadata: List(#(String, String)), )
-
SerializedEventEnvelop( aggregate_id: String, sequence: Int, payload: event, metadata: List(#(String, String)), event_type: String, event_version: String, aggregate_type: String, )
pub opaque type EventSourcing(eventstore, entity, command, event, error, transaction_handle)
pub type EventSourcingError(domainerror) {
DomainError(domainerror)
EventStoreError(String)
NonPositiveArgument
EntityNotFound
TransactionFailed
TransactionRolledBack
ActorTimeout(operation: String, timeout_ms: Int)
}
Constructors
-
DomainError(domainerror)
-
EventStoreError(String)
-
NonPositiveArgument
-
EntityNotFound
-
TransactionFailed
-
TransactionRolledBack
-
ActorTimeout(operation: String, timeout_ms: Int)
pub type EventStore(eventstore, entity, command, event, error, transaction_handle) {
EventStore(
execute_transaction: fn(
fn(transaction_handle) -> Result(
Nil,
EventSourcingError(error),
),
) -> Result(Nil, EventSourcingError(error)),
load_aggregate_transaction: fn(
fn(transaction_handle) -> Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
) -> Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
load_events_transaction: fn(
fn(transaction_handle) -> Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
) -> Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
get_latest_snapshot_transaction: fn(
fn(transaction_handle) -> Result(
option.Option(Snapshot(entity)),
EventSourcingError(error),
),
) -> Result(
option.Option(Snapshot(entity)),
EventSourcingError(error),
),
commit_events: fn(
transaction_handle,
Aggregate(entity, command, event, error),
List(event),
List(#(String, String)),
) -> Result(
#(List(EventEnvelop(event)), Int),
EventSourcingError(error),
),
load_events: fn(eventstore, transaction_handle, String, Int) -> Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
load_snapshot: fn(transaction_handle, String) -> Result(
option.Option(Snapshot(entity)),
EventSourcingError(error),
),
save_snapshot: fn(transaction_handle, Snapshot(entity)) -> Result(
Nil,
EventSourcingError(error),
),
eventstore: eventstore,
)
}
Constructors
-
EventStore( execute_transaction: fn( fn(transaction_handle) -> Result( Nil, EventSourcingError(error), ), ) -> Result(Nil, EventSourcingError(error)), load_aggregate_transaction: fn( fn(transaction_handle) -> Result( Aggregate(entity, command, event, error), EventSourcingError(error), ), ) -> Result( Aggregate(entity, command, event, error), EventSourcingError(error), ), load_events_transaction: fn( fn(transaction_handle) -> Result( List(EventEnvelop(event)), EventSourcingError(error), ), ) -> Result( List(EventEnvelop(event)), EventSourcingError(error), ), get_latest_snapshot_transaction: fn( fn(transaction_handle) -> Result( option.Option(Snapshot(entity)), EventSourcingError(error), ), ) -> Result( option.Option(Snapshot(entity)), EventSourcingError(error), ), commit_events: fn( transaction_handle, Aggregate(entity, command, event, error), List(event), List(#(String, String)), ) -> Result( #(List(EventEnvelop(event)), Int), EventSourcingError(error), ), load_events: fn(eventstore, transaction_handle, String, Int) -> Result( List(EventEnvelop(event)), EventSourcingError(error), ), load_snapshot: fn(transaction_handle, String) -> Result( option.Option(Snapshot(entity)), EventSourcingError(error), ), save_snapshot: fn(transaction_handle, Snapshot(entity)) -> Result( Nil, EventSourcingError(error), ), eventstore: eventstore, )
pub type ManagerMessage(entity, command, event, error) {
QueryActorStarted(QueryActor(event))
GetEventSourcingActor(
reply_to: process.Subject(
process.Subject(
AggregateMessage(entity, command, event, error),
),
),
)
}
Constructors
-
QueryActorStarted(QueryActor(event))
-
GetEventSourcingActor( reply_to: process.Subject( process.Subject( AggregateMessage(entity, command, event, error), ), ), )
pub type ManagerState(eventstore, entity, command, event, error, transaction_handle) {
ManagerState(
eventsourcing_actor: option.Option(
process.Subject(
AggregateMessage(entity, command, event, error),
),
),
expected_queries: Int,
registered_queries: Int,
eventstore: EventStore(
eventstore,
entity,
command,
event,
error,
transaction_handle,
),
handle: fn(entity, command) -> Result(List(event), error),
apply: fn(entity, event) -> entity,
empty_state: entity,
)
}
Constructors
-
ManagerState( eventsourcing_actor: option.Option( process.Subject( AggregateMessage(entity, command, event, error), ), ), expected_queries: Int, registered_queries: Int, eventstore: EventStore( eventstore, entity, command, event, error, transaction_handle, ), handle: fn(entity, command) -> Result(List(event), error), apply: fn(entity, event) -> entity, empty_state: entity, )
pub type QueryActor(event) {
QueryActor(actor: process.Subject(QueryMessage(event)))
}
Constructors
-
QueryActor(actor: process.Subject(QueryMessage(event)))
pub type QueryMessage(event) {
ProcessEvents(
aggregate_id: String,
events: List(EventEnvelop(event)),
)
}
Constructors
-
ProcessEvents( aggregate_id: String, events: List(EventEnvelop(event)), )
pub type Snapshot(entity) {
Snapshot(
aggregate_id: String,
entity: entity,
sequence: Int,
timestamp: timestamp.Timestamp,
)
}
Constructors
-
Snapshot( aggregate_id: String, entity: entity, sequence: Int, timestamp: timestamp.Timestamp, )
pub type SystemStats {
SystemStats(
query_actors_count: Int,
total_commands_processed: Int,
)
}
Constructors
-
SystemStats( query_actors_count: Int, total_commands_processed: Int, )
Values
pub fn aggregate_stats(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id: String,
) -> process.Subject(
Result(AggregateStats, EventSourcingError(error)),
)
Gets statistics for a specific aggregate including event count and snapshot status. Returns a subject that will receive the AggregateStats result. Use process.receive() to get the result. Useful for debugging and monitoring individual aggregate health.
Example
let result = eventsourcing.aggregate_stats(eventsourcing, "bank-account-123")
let assert Ok(stats) = process.receive(result, 1000)
io.println("Events: " <> int.to_string(stats.event_count))
pub fn execute(
eventsourcing_actor: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id: String,
command: command,
) -> Nil
Executes a command against an aggregate in the event sourcing system. The command will be validated, events generated if successful, and the events will be persisted and sent to all registered query actors. Commands that violate business rules will be rejected without affecting system stability.
Example
eventsourcing.execute(actor, "bank-account-123", OpenAccount("123"))
// Command is processed asynchronously via message passing
pub fn execute_with_metadata(
eventsourcing_actor: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id: String,
command: command,
metadata: List(#(String, String)),
) -> Nil
Executes a command against an aggregate with additional metadata. The metadata will be stored with the generated events and can be used for tracking, auditing, or enriching events with contextual information.
Example
let metadata = [("user_id", "alice"), ("session_id", "abc123")]
eventsourcing.execute_with_metadata(actor, "bank-123", DepositMoney(100.0), metadata)
pub fn frequency(
n n: Int,
) -> Result(Frequency, EventSourcingError(a))
Creates a validated frequency value for snapshot configuration. Snapshots will be created every N events when this frequency is used. Ensures that frequency values are positive to prevent division by zero or infinite loops.
Example
let assert Ok(freq) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(freq)
pub fn latest_snapshot(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id aggregate_id: String,
) -> process.Subject(
Result(
option.Option(Snapshot(entity)),
EventSourcingError(error),
),
)
Retrieves the most recent snapshot for an aggregate asynchronously if snapshots are enabled. Returns a subject that will receive the snapshot option. Snapshots provide a point-in-time capture of aggregate state for faster reconstruction. Use process.receive() to get the result.
Example
let result = eventsourcing.latest_snapshot(eventsourcing, "bank-account-123")
let assert Ok(Some(snapshot)) = process.receive(result, 1000)
// snapshot.entity contains the saved state, snapshot.sequence shows version
pub fn load_aggregate(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id: String,
) -> process.Subject(
Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
)
Loads the current state of an aggregate asynchronously by replaying all its events. Returns a subject that will receive the aggregate with its current entity state and sequence number. Use process.receive() to get the result. Returns EntityNotFound error if aggregate doesn’t exist.
Example
let result = eventsourcing.load_aggregate(actor, "bank-account-123")
let assert Ok(aggregate) = process.receive(result, 1000)
// aggregate.entity contains current state, aggregate.sequence shows current version
pub fn load_events(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id: String,
) -> process.Subject(
Result(List(EventEnvelop(event)), EventSourcingError(error)),
)
Loads all events for a specific aggregate asynchronously from the beginning. Returns a subject that will receive a chronologically ordered list of all events that have occurred for the aggregate. Use process.receive() to get the result.
Example
let result = eventsourcing.load_events(eventsourcing, "bank-account-123")
let assert Ok(events) = process.receive(result, 1000)
// events contains all EventEnvelop items for this aggregate
pub fn load_events_from(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
aggregate_id aggregate_id: String,
start_from start_from: Int,
) -> process.Subject(
Result(List(EventEnvelop(event)), EventSourcingError(error)),
)
Loads events for an aggregate asynchronously starting from a specific sequence number. Returns a subject that will receive the events list. Useful for pagination or continuing event processing from a known point. Use process.receive() to get the result.
Example
let result = eventsourcing.load_events_from(eventsourcing, "bank-account-123", start_from: 10)
let assert Ok(events) = process.receive(result, 1000)
// events contains EventEnvelop items starting from sequence 10
pub fn supervised(
name name: process.Name(
AggregateMessage(entity, command, event, error),
),
eventstore eventstore: EventStore(
eventstore,
entity,
command,
event,
error,
transaction_handle,
),
handle handle: fn(entity, command) -> Result(List(event), error),
apply apply: fn(entity, event) -> entity,
empty_state empty_state: entity,
queries queries: List(
#(
process.Name(QueryMessage(event)),
fn(String, List(EventEnvelop(event))) -> Nil,
),
),
snapshot_config snapshot_config: option.Option(SnapshotConfig),
) -> Result(
supervision.ChildSpecification(static_supervisor.Supervisor),
Nil,
)
Creates a supervised event sourcing architecture with fault tolerance. Sets up a supervision tree where the main event sourcing actor and query actors are managed by a supervisor that can restart them if they fail. This is the recommended approach for production applications. For the queries to work you have to register them after the supervisor is started using register_queries().
Example
// First set up memory store
let events_name = process.new_name("events_actor")
let snapshot_name = process.new_name("snapshot_actor")
let #(store, _) = memory_store.supervised(events_name, snapshot_name, static_supervisor.OneForOne)
// Then create event sourcing system
let balance_query = #(process.new_name("balance_query"), fn(aggregate_id, events) { /* update read model */ })
let assert Ok(spec) = eventsourcing.supervised(
name: process.new_name("eventsourcing_actor"),
eventstore: store,
handle: my_handle,
apply: my_apply,
empty_state: MyState,
queries: [balance_query],
snapshot_config: None
)
pub fn system_stats(
eventsourcing: process.Subject(
AggregateMessage(entity, command, event, error),
),
) -> process.Subject(SystemStats)
Gets system statistics including command count and query actor health. Returns a subject that will receive the SystemStats. Use process.receive() to get the result. Useful for monitoring system health and performance in production.
Example
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
let assert Ok(stats) = process.receive(stats_subject, 1000)
io.println("Query actors: " <> int.to_string(stats.query_actors_count))
io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
pub fn timeout(
ms ms: Int,
) -> Result(Timeout, EventSourcingError(a))
Creates a validated timeout value for use with process operations. Ensures that timeout values are positive, preventing invalid configurations that could cause system operations to behave unexpectedly.
Example
let assert Ok(timeout) = eventsourcing.timeout(5000)
let result = process.receive(subject, timeout)