Eventsourcing

Event Sourcing Library for Gleam
A Gleam library for building event-sourced systems with supervision trees.

Available on Hex Documentation

Table of contents

Introduction

Eventsourcing is a Gleam library for building robust, concurrent event-sourced systems using OTP supervision trees. Event sourcing stores changes to application state as a sequence of immutable events, providing excellent auditability, debugging capabilities, and system resilience.

Version 9.0 introduces API simplifications with cleaner function signatures and streamlined message passing for improved developer experience while maintaining all the production-ready features from v8.0.

Architecture

The library is built on a supervised actor architecture:

Features

🏗️ Supervision & Fault Tolerance

Concurrent Processing

📊 Event Sourcing Core

🔧 Event Store Support

🛡️ Type Safety

Quick Start

import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

// 1. Create memory store with supervision
let events_actor_name = process.new_name("events_actor")
let snapshot_actor_name = process.new_name("snapshot_actor")
let #(eventstore, memory_store_spec) = memory_store.supervised(
  events_actor_name,
  snapshot_actor_name, 
  static_supervisor.OneForOne
)

// 2. Start memory store supervisor
let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(memory_store_spec)
  |> static_supervisor.start()

// 3. Create event sourcing system
let name = process.new_name("eventsourcing_actor")
let queries = [#(process.new_name("my_query"), my_query)]
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore,
  handle: my_handle,
  apply: my_apply,
  empty_state: MyEmptyState,
  queries: queries,
  snapshot_config: None
)

// 4. Start event sourcing supervisor
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(eventsourcing_spec)
  |> static_supervisor.start()

// 5. Get actor from name
let eventsourcing_actor = process.named_subject(name)

// 6. Execute commands
eventsourcing.execute(eventsourcing_actor, "aggregate-123", MyCommand)

// 7. Load events and monitor system
let events_subject = eventsourcing.load_events(eventsourcing_actor, "aggregate-123")
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)

Example

Define Your Domain

pub type BankAccount {
  BankAccount(balance: Float)
  UnopenedBankAccount
}

pub type BankAccountCommand {
  OpenAccount(account_id: String)
  DepositMoney(amount: Float)
  WithDrawMoney(amount: Float)
}

pub type BankAccountEvent {
  AccountOpened(account_id: String)
  CustomerDepositedCash(amount: Float, balance: Float)
  CustomerWithdrewCash(amount: Float, balance: Float)
}

pub type BankAccountError {
  CantDepositNegativeAmount
  CantOperateOnUnopenedAccount
  CantWithdrawMoreThanCurrentBalance
}

Command Handling

pub fn handle(
  bank_account: BankAccount,
  command: BankAccountCommand,
) -> Result(List(BankAccountEvent), BankAccountError) {
  case bank_account, command {
    UnopenedBankAccount, OpenAccount(account_id) ->
      Ok([AccountOpened(account_id)])
    BankAccount(balance), DepositMoney(amount) -> {
      case amount >. 0.0 {
        True -> {
          let new_balance = balance +. amount
          Ok([CustomerDepositedCash(amount, new_balance)])
        }
        False -> Error(CantDepositNegativeAmount)
      }
    }
    BankAccount(balance), WithDrawMoney(amount) -> {
      case amount >. 0.0 && balance >=. amount {
        True -> {
          let new_balance = balance -. amount
          Ok([CustomerWithdrewCash(amount, new_balance)])
        }
        False -> Error(CantWithdrawMoreThanCurrentBalance)
      }
    }
    _, _ -> Error(CantOperateOnUnopenedAccount)
  }
}

Event Application

pub fn apply(bank_account: BankAccount, event: BankAccountEvent) -> BankAccount {
  case event {
    AccountOpened(_) -> BankAccount(0.0)
    CustomerDepositedCash(_, balance) -> BankAccount(balance)
    CustomerWithdrewCash(_, balance) -> BankAccount(balance)
  }
}

Supervised Usage (Recommended)

import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

pub fn main() {
  // Define query for read model updates
  let balance_query = fn(aggregate_id, events) {
    io.println(
      "Account " <> aggregate_id <> " processed " 
      <> int.to_string(list.length(events)) <> " events"
    )
  }

  // 1. Create memory store with supervision
  let events_actor_name = process.new_name("events_actor")
  let snapshot_actor_name = process.new_name("snapshot_actor")
  let #(eventstore, memory_store_spec) = memory_store.supervised(
    events_actor_name,
    snapshot_actor_name,
    static_supervisor.OneForOne
  )

  // 2. Start memory store supervisor
  let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(memory_store_spec)
    |> static_supervisor.start()

  // 3. Create supervised event sourcing system
  let name = process.new_name("eventsourcing_actor")
  let queries = [#(process.new_name("balance_query"), balance_query)]
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore,
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: queries,
    snapshot_config: None
  )

  // 4. Start event sourcing supervision tree
  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  // 5. Get actor from name
  let eventsourcing_actor = process.named_subject(name)

  // Give time for actors to start
  process.sleep(100)

  // 6. Execute commands - they will be processed asynchronously
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    OpenAccount("account-123")
  )
  
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    DepositMoney(100.0)
  )
}

Async API Usage

All data operations now use an asynchronous message-passing pattern:

pub fn async_example() {
  // Set up supervised system (memory store setup omitted for brevity)
  let name = process.new_name("eventsourcing_actor")
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore, // from memory_store.supervised()
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: [],
    snapshot_config: None
  )

  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  let eventsourcing_actor = process.named_subject(name)
  process.sleep(100) // Give time for actors to start

  // Load aggregate state asynchronously
  let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "account-123")
  case process.receive(load_subject, 1000) {
    Ok(Ok(aggregate)) -> io.println("Account loaded: " <> aggregate.aggregate_id)
    Ok(Error(eventsourcing.EntityNotFound)) -> io.println("Account not found")
    Ok(Error(other)) -> io.println("Error: " <> string.inspect(other))
    Error(_) -> io.println("Timeout waiting for response")
  }
  
  // Load events asynchronously
  let events_subject = eventsourcing.load_events_from(eventsourcing_actor, "account-123", 0)
  case process.receive(events_subject, 1000) {
    Ok(Ok(events)) -> io.println("Loaded " <> int.to_string(list.length(events)) <> " events")
    Ok(Error(error)) -> io.println("Error loading events: " <> string.inspect(error))
    Error(_) -> io.println("Timeout waiting for events")
  }
}

Snapshot Configuration

// Create validated frequency (snapshots every 100 events)
let assert Ok(frequency) = eventsourcing.frequency(100)
let snapshot_config = eventsourcing.SnapshotConfig(frequency)

// Enable snapshots during supervised system setup
let name = process.new_name("eventsourcing_actor")
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore, // from memory_store.supervised()
  handle: handle,
  apply: apply,
  empty_state: UnopenedBankAccount,
  queries: [],
  snapshot_config: Some(snapshot_config) // Enable snapshots
)

// Get actor and load latest snapshot asynchronously
let eventsourcing_actor = process.named_subject(name)
let snapshot_subject = eventsourcing.latest_snapshot(eventsourcing_actor, "account-123")
case process.receive(snapshot_subject, 1000) {
  Ok(Ok(Some(snapshot))) -> {
    io.println("Using snapshot from sequence " <> int.to_string(snapshot.sequence))
  }
  Ok(Ok(None)) -> io.println("No snapshot available, loading from events")
  Ok(Error(error)) -> io.println("Error loading snapshot: " <> string.inspect(error))
  Error(_) -> io.println("Timeout waiting for snapshot")
}

Enhanced API Features

Execute Commands with Metadata

// Execute with additional tracking information
eventsourcing.execute_with_metadata(
  eventsourcing_actor,
  "account-123",
  DepositMoney(100.0),
  [#("user_id", "user-456"), #("source", "mobile_app"), #("trace_id", "abc-123")]
)

System Monitoring and Stats

// Get system health statistics
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
case process.receive(stats_subject, 1000) {
  Ok(stats) -> {
    io.println("Query actors: " <> int.to_string(stats.query_actors_count))
    io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
  }
  Error(_) -> io.println("Timeout getting stats")
}

// Get individual aggregate statistics  
let agg_stats_subject = eventsourcing.aggregate_stats(eventsourcing_actor, "account-123")
case process.receive(agg_stats_subject, 1000) {
  Ok(Ok(stats)) -> {
    io.println("Aggregate: " <> stats.aggregate_id)
    io.println("Has snapshot: " <> string.inspect(stats.has_snapshot))
  }
  Ok(Error(error)) -> io.println("Error: " <> string.inspect(error))
  Error(_) -> io.println("Timeout")
}

Migration Guide

Migrating from v8 to v9

Version 9.0 introduces API simplifications to improve the developer experience:

Key Changes

Migration Steps

1. Update Function Calls - Remove .data accessor

// v8.0
eventsourcing.execute(eventsourcing_actor.data, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor.data, "agg-123")

// v9.0 - Direct subject passing
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")

2. Update Statistics Function Names

// v8.0
let stats = eventsourcing.get_system_stats(eventsourcing_actor.data)
let agg_stats = eventsourcing.get_aggregate_stats(eventsourcing_actor.data, "agg-123")
let snapshot = eventsourcing.get_latest_snapshot(eventsourcing_actor.data, "agg-123")

// v9.0 - Cleaner names
let stats = eventsourcing.system_stats(eventsourcing_actor)
let agg_stats = eventsourcing.aggregate_stats(eventsourcing_actor, "agg-123") 
let snapshot = eventsourcing.latest_snapshot(eventsourcing_actor, "agg-123")

3. Add Named Actor Support

// v8.0
let assert Ok(spec) = eventsourcing.supervised(
  eventstore, handle: handle, apply: apply,
  empty_state: state, queries: [balance_query],
  eventsourcing_actor_receiver: receiver1,
  query_actors_receiver: receiver2,
  snapshot_config: None
)

// v9.0 - Named actors required
let assert Ok(spec) = eventsourcing.supervised(
  name: process.new_name("eventsourcing_actor"),
  eventstore: eventstore, handle: handle, apply: apply,
  empty_state: state, 
  queries: [#(process.new_name("balance_query"), balance_query)],
  snapshot_config: None
)

Migrating from v7 to v9

Key Changes

Migration Steps

1. Update Initialization

// v7.x
let eventsourcing = eventsourcing.new(store, queries, handle, apply, empty_state)

// v8.0 - Supervised (Recommended)  
let eventsourcing_spec = eventsourcing.supervised(
  store, handle, apply, empty_state, queries,
  eventsourcing_receiver, query_receiver
)

2. Query Registration (No Longer Needed)

// v8.0 - Required after supervisor start
eventsourcing.register_queries(eventsourcing_actor, query_actors)

// v9.0 - Not needed! Queries are automatically registered through supervised() function

3. Update Command Execution

// v7.x
eventsourcing.execute(eventsourcing, "agg-123", command)

// v8.0  
eventsourcing.execute(eventsourcing_actor, "agg-123", command)

4. Update Data Loading (Now Async)

// v7.x - Synchronous
let assert Ok(aggregate) = eventsourcing.load_aggregate(eventsourcing, "agg-123")

// v8.0 - Asynchronous message passing
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
let assert Ok(Ok(aggregate)) = process.receive(load_subject, 1000)

5. Update Snapshot Configuration

// v7.x
let config = eventsourcing.SnapshotConfig(5)

// v8.0
let assert Ok(frequency) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(frequency)

Philosophy

Eventsourcing v8 embraces OTP supervision principles to build production-ready, fault-tolerant event-sourced systems:

This design makes your event-sourced systems naturally concurrent, fault-tolerant, and maintainable while preserving Gleam’s excellent type safety.

Installation

Eventsourcing is published on Hex! You can add it to your Gleam projects from the command line:

gleam add eventsourcing

Support

Eventsourcing is built by Renatillas. Contributions are very welcome! If you’ve spotted a bug, or would like to suggest a feature, please open an issue or a pull request.

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch (git checkout -b my-feature-branch).
  3. Make your changes and commit them (git commit -m 'Add new feature').
  4. Push to the branch (git push origin my-feature-branch).
  5. Open a pull request.

Please ensure your code adheres to the project’s coding standards and includes appropriate tests.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Search Document