signal

Types

An aggregate is an actor managed by signal that holds the state, processes commands and events.

You can send messasges to this aggregate and interact with it, but signal provides a number of pre-built functions to help with that.

pub type Aggregate(aggregate, command, event) =
  process.Subject(AggregateMessage(aggregate, command, event))

Configures the aggregate processed by the signal service.

pub type AggregateConfig(aggregate, command, event) {
  AggregateConfig(
    initial_state: aggregate,
    command_handler: CommandHandler(aggregate, command, event),
    event_handler: EventHandler(aggregate, event),
  )
}

Constructors

  • AggregateConfig(
      initial_state: aggregate,
      command_handler: CommandHandler(aggregate, command, event),
      event_handler: EventHandler(aggregate, event),
    )
pub type AggregateMessage(aggregate, command, event) {
  State(reply_with: process.Subject(aggregate))
  Identity(reply_with: process.Subject(String))
  HandleCommand(
    reply_with: process.Subject(Result(aggregate, String)),
    command,
  )
  ShutdownAggregate
}

Constructors

  • State(reply_with: process.Subject(aggregate))
  • Identity(reply_with: process.Subject(String))
  • HandleCommand(
      reply_with: process.Subject(Result(aggregate, String)),
      command,
    )
  • ShutdownAggregate

This is where you put your domain / business logic, a function that has access to the current state of your aggregate, and can decided what to do with a given command.

The command handler may return an Error(String), or a list of events. Most of the time, you will be producing one event but in some cases you will find a need for producing multiple.

Commands are only triggered once, so they can contain side-effects.

Basic example:

pub fn handle_post_commands(command: PostCommands, post: Post) {
  case command {
    UpdatePostContent(title, text) -> Ok([PostUpdated(title, text)])
    PublishPost -> {
      case state {
        s if s.title == "" -> Error("Cannot publish a post without a title!")
        _ -> Ok([PostPublished()])
      }
    }
  }
} 

⚠️ It is best practice to wrap the handler in a higher order function, this lets you inject dependencies and improves extensibility. Best practice:

pub fn handle_post_commands(notify: NotificationService) -> signal.CommandHandler(Post, PostCommands, PostEvents) {
  fn (command: PostCommands, post: Post) {
    case command {
      UpdatePostContent(title, text) -> Ok([PostUpdated(title, text)])
      PublishPost -> {
        case state {
          s if s.title == "" -> Error("Cannot publish a post without a title!")
          _ -> { 
            notification.send(notify, "Just published a new post - " <> post.title)
            Ok([PostPublished()]) }
        }
      }
    }
  }
} 
pub type CommandHandler(state, command, event) =
  fn(command, state) -> Result(List(event), String)

Consumers are actors that should receive and handle these messages.

  • Consume: is the only message triggered by signal, and it is triggered on all events processed by the service

Other messages are there for user convenience.

pub type ConsumerMessage(state, event) {
  Consume(Event(event))
  GetConsumerState(reply: process.Subject(state))
  ShutdownConsumer
}

Constructors

  • Consume(Event(event))
  • GetConsumerState(reply: process.Subject(state))
  • ShutdownConsumer

Configures the internals of an signal service.

pub opaque type EmitConfig(aggregate, state, command, event)

The base signal process, it handles the internal operation of signal for a given aggregate type.

pub opaque type EmitMessages(aggregate, command, event)

Represents a base event type that is used throughout signal, in event handlers you are able to use this information if needs be.

pub type Event(event) {
  Event(
    aggregate_version: Int,
    aggregate_id: String,
    event_name: String,
    data: event,
  )
}

Constructors

  • Event(
      aggregate_version: Int,
      aggregate_id: String,
      event_name: String,
      data: event,
    )

A function that describes how events translate into state of your aggregate. Events handlers are used to update the aggregate after processing commands, and to hydrate the aggregate from storage.

Most of the time these are simple data mapping functions.

Basic example:

pub fn handle_post_events(post: Post, event: PostEvents) {
  case command {
    PostUpdated(title, text) -> Post(..post, title: title, text: text)
    PostPublished -> Post(..post, published: True) 
  }
} 
pub type EventHandler(state, event) =
  fn(state, Event(event)) -> state

When implementing a custom persistance layer, signal expects an actor that handles these messages

  • GetStoredEvents: used for hydrating aggregates from storage
  • IsIdentityAvailable: used to ensure duplicate ids cannot be created
  • StoreEvents: used to persist a list of new events
  • ShutdownPersistanceLayer: helper to let you shut down your actor, signal will not trigger this message

⚠️ Persistance actor has to report the result of the StoreEvents message in form of a PersistanceState event. This allows signal to handle a write ahead log and batch event storage operations.

pub type PersistanceInterface(event) {
  GetStoredEvents(
    process.Subject(Result(List(Event(event)), String)),
    String,
  )
  IsIdentityAvailable(
    process.Subject(Result(Bool, String)),
    String,
  )
  StoreEvents(List(Event(event)))
  ShutdownPersistanceLayer
}

Constructors

  • GetStoredEvents(
      process.Subject(Result(List(Event(event)), String)),
      String,
    )
  • IsIdentityAvailable(
      process.Subject(Result(Bool, String)),
      String,
    )
  • StoreEvents(List(Event(event)))
  • ShutdownPersistanceLayer

A base signal process which supervises the behind the scenes stuff and exposes some functionality.

pub type Signal(aggregate, command, event) =
  process.Subject(EmitMessages(aggregate, command, event))

An internal actor used to manage storage of events.

pub type Store(event) =
  process.Subject(StoreMessages(event))

Messages handled by the Store actor, when creating custom persistance layers, you should report persistance state to PersistanceState.

pub type StoreMessages(event) {
  StoreEvent(event: Event(event))
  GetEvents(
    reply_with: process.Subject(
      Result(List(Event(event)), String),
    ),
    aggregate_id: String,
  )
  IdExists(
    reply_with: process.Subject(Result(Bool, String)),
    aggregate_id: String,
  )
  PersistanceState(ids: List(Event(event)), completed: Bool)
  ShutdownStore
}

Constructors

  • StoreEvent(event: Event(event))
  • GetEvents(
      reply_with: process.Subject(Result(List(Event(event)), String)),
      aggregate_id: String,
    )
  • IdExists(
      reply_with: process.Subject(Result(Bool, String)),
      aggregate_id: String,
    )
  • PersistanceState(ids: List(Event(event)), completed: Bool)
  • ShutdownStore

Subscribers are triggered on all events produced by all aggregates, and serve as a great way to extend your system.

Subscribers cannot modify or produce events.

These are generally great for creating different read models of your data, reporting, and reacting to certain events.

  • Consumer: is an actor that consumes events, and can do whatever it wants with them, and give the user full control of the state, lifecycle and everything elese.
  • Policy: is a one-of task that should run on an event, at the moment there is no retries and signal will ignore the return values of these tasks.

🛑 Policies are early in development, not well tested and might result in performance bottlenecks.

pub type Subscriber(state, event) {
  Consumer(process.Subject(ConsumerMessage(state, event)))
  Policy(task.Task(Event(event)))
}

Constructors

  • Consumer(process.Subject(ConsumerMessage(state, event)))
  • Policy(task.Task(Event(event)))

Functions

pub fn aggregate(
  signal: Subject(EmitMessages(a, b, c)),
  id: String,
) -> Result(Subject(AggregateMessage(a, b, c)), String)

Use this function to retrieve a particular aggregate from the signal event store. This will return a subject which can then be used to interact with state of you aggregate, or process further commands.

Command handling example:

let result = signal.get_aggregate(em, "how-to-gleam")
|> signal.handle_command(CommentOnPost("how-to-gleam"))

Getting state example:

let post = signal.get_aggregate(em, "how-to-gleam")
|> signal.get_state()
pub fn configure(
  agg: AggregateConfig(a, b, c),
) -> EmitConfig(a, d, b, c)

This is a configuration object that lets you set up your signal instance.

You should put the configuration somewhere in your app’s startup code.

let aggregate_config = AggregateConfig(
  initial_state: my_default_aggregate,
  command_handler: my_command_handler,
  event_handler: my_event_handler
)

let store = signal.configure(aggregate_config)
|> with_persistance_layer(my_storage)
|> with_subscriber(my_notification_client)
|> with_subscriber(my_metrics_counter)
|> start()
pub fn create(
  signal: Subject(EmitMessages(a, b, c)),
  id: String,
) -> Result(Subject(AggregateMessage(a, b, c)), String)

Creates a new aggregate with a given ID.

The ID needs to be unique, otherwise creation will fail.

pub fn get_current_pool_size(
  signal: Subject(EmitMessages(a, b, c)),
) -> Int

Gets the current size of the aggregate pool in memory, mainly for testing.

pub fn get_id(agg: Subject(AggregateMessage(a, b, c))) -> String

Gets the id of the aggregate actor.

pub fn get_state(agg: Subject(AggregateMessage(a, b, c))) -> a

Use this function to get the current state of your aggregate.

let post = signal.get_aggregate(em, "how-to-gleam")
|> signal.get_state()
pub fn handle_command(
  agg: Subject(AggregateMessage(a, b, c)),
  command: b,
) -> Result(a, String)

Use this function to have your aggregate process a command.

let result = signal.get_aggregate(em, "how-to-gleam")
|> signal.handle_command(CreatePost("how-to-gleam"))
pub fn start(
  config: EmitConfig(a, b, c, d),
) -> Result(Subject(EmitMessages(a, c, d)), StartError)

Starts the signal services and returns a subject used to interact with the event store.

pub fn with_persistance_layer(
  config: EmitConfig(a, b, c, d),
  persist: Subject(PersistanceInterface(d)),
) -> EmitConfig(a, b, c, d)

Configures signal to store events using a particular persistance layer.

Signal will default to an in-memory store which is recommended for development.

WIP - I am working on some persistance layers, but for now, you can bring your own, or play around with in-memory persistance.

pub fn with_pool_size_limit(
  config: EmitConfig(a, b, c, d),
  aggregates_in_memory: Int,
) -> EmitConfig(a, b, c, d)

Defines the maximum number of aggregates kept in memory. Defaults to 100, lower it if you desire lower memory consumption, increase it if you desire higher performance.

When an aggregate which is not in the pool is requested, signal has to rebuild it from events in the database.

⚠️ Large aggregates that contain a lot of data are an anti-pattern in event sourcing, instead of lowering the pool size, you might want to consider breaking up your aggregate and redesigning it, or storing some data using a different persistance method.

pub fn with_subscriber(
  config: EmitConfig(a, b, c, d),
  sub: Subscriber(b, d),
) -> EmitConfig(a, b, c, d)

Subscribers can be one-of tasks (policies) or actors (consumers) that consume events generated by the aggregate. They are called for each event produced by all aggregates.

This is a great way of projecting state in a very specific way. Think of it as letting you create different read models of your data, or trigger some other specifics when an event is generated.

You can event use this method to trigger commands to your other aggregates, but be careful, that can make it difficult to track the state of you application!

Example policy:

fn email_readers(event: MyBlogEvent) {
  case event {
    PostPublished -> {
      email_everyone_interested(...)
      Ok(Nil)
    }
    _ -> Ok(Nil)
  }
}

Couple of notes on policies:

  • Signal will ignore return values of these functions
  • WIP Signal currently does not retry failed policies, it will do in the future

Example consumer:

fn event_counter( message: MyBlogEvent, event_count: Int ) {
  case messasge {
    Shutdown -> actor.stop(process.Normal)
    _ -> actor.continue(event_count + 1)
  }
} 

There are a few things to not about Consumers:

  • Signal will not start or stop your consumers, their lifetime is in your control.
  • Signal will ignore any returned data
  • Your actor should accept the messages which are actually the Events you defined at configuration time
Search Document