eventsourcing
Types
Represents the current state of an aggregate in the event sourcing system.
Fields
aggregate_id
: Unique identifier for the aggregateentity
: The current state of the entitysequence
: The current sequence number of the aggregate
pub type Aggregate(entity, command, event, error) {
Aggregate(
aggregate_id: AggregateId,
entity: entity,
sequence: Int,
)
}
Constructors
-
Aggregate( aggregate_id: AggregateId, entity: entity, sequence: Int, )
Type representing an aggregate’s unique identifier. This is used to identify different aggregates in the event sourcing system.
pub type AggregateId =
String
Wrapper around domain events that includes metadata and sequencing information. Used by Event Stores to persist and retrieve events.
Variants
MemoryStoreEventEnvelop
: Used for in-memory event storageSerializedEventEnvelop
: Used for persistent storage with serialization support
pub type EventEnvelop(event) {
MemoryStoreEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
metadata: List(#(String, String)),
)
SerializedEventEnvelop(
aggregate_id: AggregateId,
sequence: Int,
payload: event,
metadata: List(#(String, String)),
event_type: String,
event_version: String,
aggregate_type: String,
)
}
Constructors
-
MemoryStoreEventEnvelop( aggregate_id: AggregateId, sequence: Int, payload: event, metadata: List(#(String, String)), )
-
SerializedEventEnvelop( aggregate_id: AggregateId, sequence: Int, payload: event, metadata: List(#(String, String)), event_type: String, event_version: String, aggregate_type: String, )
The main record of the library. It holds everything together and serves as a reference point for other functions such as execute, load_aggregate_entity, and load_events
pub opaque type EventSourcing(
eventstore,
entity,
command,
event,
error,
transaction_handle,
)
Represents errors that can occur in the event sourcing system.
Variants
DomainError
: Domain-specific errors from command handlingEventStoreError
: Errors related to event storage operationsEntityNotFound
: When attempting to load a non-existent aggregate
pub type EventSourcingError(domainerror) {
DomainError(domainerror)
EventStoreError(String)
EntityNotFound
TransactionFailed
TransactionRolledBack
}
Constructors
-
DomainError(domainerror)
-
EventStoreError(String)
-
EntityNotFound
-
TransactionFailed
-
TransactionRolledBack
The main type of the event sourcing system that coordinates all operations.
Fields
event_store
: The storage implementation for events and snapshotsqueries
: List of query handlers to process eventshandle
: Command handler functionapply
: Event application functionempty_state
: Initial state for new aggregatessnapshot_config
: Optional configuration for snapshot creation
pub type EventStore(
eventstore,
entity,
command,
event,
error,
transaction_handle,
) {
EventStore(
execute_transaction: fn(
fn(transaction_handle) ->
Result(Nil, EventSourcingError(error)),
) ->
Result(Nil, EventSourcingError(error)),
load_aggregate_transaction: fn(
fn(transaction_handle) ->
Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
) ->
Result(
Aggregate(entity, command, event, error),
EventSourcingError(error),
),
load_events_transaction: fn(
fn(transaction_handle) ->
Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
) ->
Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
get_latest_snapshot_transaction: fn(
fn(transaction_handle) ->
Result(
Option(Snapshot(entity)),
EventSourcingError(error),
),
) ->
Result(Option(Snapshot(entity)), EventSourcingError(error)),
commit_events: fn(
transaction_handle,
Aggregate(entity, command, event, error),
List(event),
List(#(String, String)),
) ->
Result(
#(List(EventEnvelop(event)), Int),
EventSourcingError(error),
),
load_events: fn(
eventstore,
transaction_handle,
AggregateId,
Int,
) ->
Result(
List(EventEnvelop(event)),
EventSourcingError(error),
),
load_snapshot: fn(transaction_handle, AggregateId) ->
Result(Option(Snapshot(entity)), EventSourcingError(error)),
save_snapshot: fn(transaction_handle, Snapshot(entity)) ->
Result(Nil, EventSourcingError(error)),
eventstore: eventstore,
)
}
Constructors
-
EventStore( execute_transaction: fn( fn(transaction_handle) -> Result(Nil, EventSourcingError(error)), ) -> Result(Nil, EventSourcingError(error)), load_aggregate_transaction: fn( fn(transaction_handle) -> Result( Aggregate(entity, command, event, error), EventSourcingError(error), ), ) -> Result( Aggregate(entity, command, event, error), EventSourcingError(error), ), load_events_transaction: fn( fn(transaction_handle) -> Result( List(EventEnvelop(event)), EventSourcingError(error), ), ) -> Result(List(EventEnvelop(event)), EventSourcingError(error)), get_latest_snapshot_transaction: fn( fn(transaction_handle) -> Result(Option(Snapshot(entity)), EventSourcingError(error)), ) -> Result(Option(Snapshot(entity)), EventSourcingError(error)), commit_events: fn( transaction_handle, Aggregate(entity, command, event, error), List(event), List(#(String, String)), ) -> Result( #(List(EventEnvelop(event)), Int), EventSourcingError(error), ), load_events: fn( eventstore, transaction_handle, AggregateId, Int, ) -> Result(List(EventEnvelop(event)), EventSourcingError(error)), load_snapshot: fn(transaction_handle, AggregateId) -> Result(Option(Snapshot(entity)), EventSourcingError(error)), save_snapshot: fn(transaction_handle, Snapshot(entity)) -> Result(Nil, EventSourcingError(error)), eventstore: eventstore, )
Represents a snapshot of an aggregate’s state at a specific point in time. Snapshots are used to optimize aggregate rebuilding by providing a starting point.
Fields
aggregate_id
: The aggregate this snapshot belongs toentity
: The state of the entity at the time of snapshotsequence
: The sequence number at which this snapshot was takentimestamp
: Unix timestamp when the snapshot was created
pub type Snapshot(entity) {
Snapshot(
aggregate_id: AggregateId,
entity: entity,
sequence: Int,
timestamp: Int,
)
}
Constructors
-
Snapshot( aggregate_id: AggregateId, entity: entity, sequence: Int, timestamp: Int, )
Configuration for snapshot creation behavior.
Fields
snapshot_frequency
: Number of events after which a new snapshot should be created
pub type SnapshotConfig {
SnapshotConfig(snapshot_frequency: Int)
}
Constructors
-
SnapshotConfig(snapshot_frequency: Int)
Functions
pub fn add_query(
eventsourcing eventsourcing: EventSourcing(a, b, c, d, e, f),
query query: fn(String, List(EventEnvelop(d))) -> Nil,
) -> EventSourcing(a, b, c, d, e, f)
Add a query to the EventSourcing instance.
Queries are functions that run when events are committed. They can be used for things like updating read models or sending notifications.
pub fn execute(
event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
aggregate_id aggregate_id: String,
command command: c,
) -> Result(Nil, EventSourcingError(e))
Executes a command against an aggregate.
Arguments
event_sourcing
: The EventSourcing instanceaggregate_id
: ID of the aggregate to execute command againstcommand
: The command to execute
Returns
Ok(Nil) if successful, or an error if command handling fails
pub fn execute_with_metadata(
event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
aggregate_id aggregate_id: String,
command command: c,
metadata metadata: List(#(String, String)),
) -> Result(Nil, EventSourcingError(e))
Executes a command with additional metadata.
Arguments
event_sourcing
: The EventSourcing instanceaggregate_id
: ID of the aggregate to execute command againstcommand
: The command to executemetadata
: Additional metadata to store with generated events
Returns
Ok(Nil) if successful, or an error if command handling fails
pub fn get_latest_snapshot(
event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
aggregate_id aggregate_id: String,
) -> Result(Option(Snapshot(b)), EventSourcingError(e))
Retrieves the most recent snapshot for an aggregate if it exists.
This function attempts to load the latest snapshot for an aggregate, which can be used as a starting point for rebuilding aggregate state without replaying all events from the beginning.
Arguments
event_sourcing
: The EventSourcing instanceaggregate_id
: ID of the aggregate to get the snapshot for
Returns
A Result containing:
- Ok(Some(Snapshot)): The latest snapshot if one exists
- Ok(None): If no snapshot exists for the aggregate
- Error(EventSourcingError): If snapshot retrieval fails
Example
let assert Ok(maybe_snapshot) = get_latest_snapshot(event_sourcing, "account-123")
case maybe_snapshot {
Some(snapshot) -> // Use snapshot as starting point
None -> // No snapshot exists, start from initial state
}
pub fn load_aggregate(
event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
aggregate_id aggregate_id: String,
) -> Result(Aggregate(b, c, d, e), EventSourcingError(e))
Loads the current state of an aggregate.
Arguments
event_sourcing
: The EventSourcing instanceaggregate_id
: ID of the aggregate to load
Returns
The current state of the aggregate, or an error if loading fails
pub fn load_events(
event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
aggregate_id aggregate_id: String,
) -> Result(List(EventEnvelop(d)), EventSourcingError(e))
Loads all events for an aggregate from a specified sequence number.
This function retrieves all events for an aggregate starting from a given sequence number, allowing for partial event stream loading and event replay from a specific point in time.
Arguments
event_sourcing
: The EventSourcing instanceaggregate_id
: ID of the aggregate whose events should be loadedstart_from
: The sequence number to start loading events from
Returns
A Result containing:
- Ok(List(EventEnvelop(event))): List of events if successful
- Error(EventSourcingError): If loading fails
Example
let assert Ok(events) = load_events(event_sourcing, "account-123", 5)
// events will contain all events for account-123 starting from sequence 5
pub fn new(
event_store event_store: EventStore(a, b, c, d, e, f),
queries queries: List(fn(String, List(EventEnvelop(d))) -> Nil),
handle handle: fn(b, c) -> Result(List(d), e),
apply apply: fn(b, d) -> b,
empty_state empty_state: b,
) -> EventSourcing(a, b, c, d, e, f)
Creates a new EventSourcing instance with the provided configuration.
Arguments
event_store
: The storage implementation to usequeries
: List of query handlers to process eventshandle
: Function to handle commandsapply
: Function to apply eventsempty_state
: Initial state for new aggregates
Returns
A new EventSourcing instance without snapshot support
pub fn with_snapshots(
event_sourcing: EventSourcing(a, b, c, d, e, f),
config: SnapshotConfig,
) -> EventSourcing(a, b, c, d, e, f)
Arguments
event_sourcing
: The EventSourcing instance to modifyconfig
: Snapshot configuration specifying creation frequency
Returns
A new EventSourcing instance with snapshot support enabled