signal

Types

An aggregate is an actor managed by signal that holds the state, processes commands and events.

You can send messasges to this aggregate and interact with it, but signal provides a number of pre-built functions to help with that.

pub type Aggregate(aggregate, command, event) =
  process.Subject(AggregateMessage(aggregate, command, event))

Configures the aggregate processed by the signal service.

pub type AggregateConfig(aggregate, command, event) {
  AggregateConfig(
    initial_state: aggregate,
    command_handler: CommandHandler(aggregate, command, event),
    event_handler: EventHandler(aggregate, event),
  )
}

Constructors

  • AggregateConfig(
      initial_state: aggregate,
      command_handler: CommandHandler(aggregate, command, event),
      event_handler: EventHandler(aggregate, event),
    )
pub type AggregateMessage(aggregate, command, event) {
  State(reply_with: process.Subject(aggregate))
  Identity(reply_with: process.Subject(String))
  HandleCommand(
    reply_with: process.Subject(Result(aggregate, String)),
    command,
  )
  ShutdownAggregate
}

Constructors

  • State(reply_with: process.Subject(aggregate))
  • Identity(reply_with: process.Subject(String))
  • HandleCommand(
      reply_with: process.Subject(Result(aggregate, String)),
      command,
    )
  • ShutdownAggregate

This is where you put your domain / business logic, a function that has access to the current state of your aggregate, and can decided what to do with a given command.

The command handler may return an Error(String), or a list of events. Most of the time, you will be producing one event but in some cases you will find a need for producing multiple.

Commands are only triggered once, so they can contain side-effects.

Basic example:

pub fn handle_post_commands(command: PostCommands, post: Post) {
  case command {
    UpdatePostContent(title, text) -> Ok([PostUpdated(title, text)])
    PublishPost -> {
      case state {
        s if s.title == "" -> Error("Cannot publish a post without a title!")
        _ -> Ok([PostPublished()])
      }
    }
  }
} 

⚠️ It is best practice to wrap the handler in a higher order function, this lets you inject dependencies and improves extensibility. Best practice:

pub fn handle_post_commands(notify: NotificationService) -> signal.CommandHandler(Post, PostCommands, PostEvents) {
  fn (command: PostCommands, post: Post) {
    case command {
      UpdatePostContent(title, text) -> Ok([PostUpdated(title, text)])
      PublishPost -> {
        case state {
          s if s.title == "" -> Error("Cannot publish a post without a title!")
          _ -> { 
            notification.send(notify, "Just published a new post - " <> post.title)
            Ok([PostPublished()]) }
        }
      }
    }
  }
} 
pub type CommandHandler(state, command, event) =
  fn(command, state) -> Result(List(event), String)

Consumers are actors that should receive and handle these messages.

  • Consume: is the only message triggered by signal, and it is triggered on all events processed by the service

Other messages are there for user convenience.

pub type ConsumerMessage(state, event) {
  Consume(Event(event))
  GetConsumerState(reply: process.Subject(state))
  ShutdownConsumer
}

Constructors

  • Consume(Event(event))
  • GetConsumerState(reply: process.Subject(state))
  • ShutdownConsumer

Represents a base event type that is used throughout signal, in event handlers you are able to use this information if needs be.

pub type Event(event) {
  Event(
    aggregate_version: Int,
    aggregate_id: String,
    event_name: String,
    data: event,
  )
}

Constructors

  • Event(
      aggregate_version: Int,
      aggregate_id: String,
      event_name: String,
      data: event,
    )

A function that describes how events translate into state of your aggregate. Events handlers are used to update the aggregate after processing commands, and to hydrate the aggregate from storage.

Most of the time these are simple data mapping functions.

Basic example:

pub fn handle_post_events(post: Post, event: PostEvents) {
  case command {
    PostUpdated(title, text) -> Post(..post, title: title, text: text)
    PostPublished -> Post(..post, published: True) 
  }
} 
pub type EventHandler(state, event) =
  fn(state, Event(event)) -> state

Just a basic log severity model

pub type LogLevel {
  LogDebug
  LogInfo
  LogWarning
  LogError
}

Constructors

  • LogDebug
  • LogInfo
  • LogWarning
  • LogError

A base signal process which supervises the behind the scenes stuff and exposes some functionality.

pub type Signal(aggregate, command, event) =
  process.Subject(SignalMessage(aggregate, command, event))

Configures the internals of an signal service.

pub opaque type SignalConfig(aggregate, state, command, event)

The base signal process, it handles the internal operation of signal for a given aggregate type.

pub opaque type SignalMessage(aggregate, command, event)

When implementing a custom persistance layer, signal expects an actor that handles these messages

  • GetStoredEvents: used for hydrating aggregates from storage
  • IsIdentityAvailable: used to ensure duplicate ids cannot be created
  • StoreEvents: used to persist a list of new events
  • ShutdownPersistanceLayer: helper to let you shut down your actor, signal will not trigger this message

⚠️ Persistance actor has to report the result of the StoreEvents message in form of a PersistanceState event. This allows signal to handle a write ahead log and batch event storage operations.

pub type StoreMessage(event) {
  GetStoredEvents(
    process.Subject(Result(List(Event(event)), String)),
    String,
  )
  IsIdentityAvailable(
    process.Subject(Result(Bool, String)),
    String,
  )
  StoreEvent(Event(event))
  ShutdownPersistanceLayer
}

Constructors

  • GetStoredEvents(
      process.Subject(Result(List(Event(event)), String)),
      String,
    )
  • IsIdentityAvailable(
      process.Subject(Result(Bool, String)),
      String,
    )
  • StoreEvent(Event(event))
  • ShutdownPersistanceLayer

Subscribers are triggered on all events produced by all aggregates, and serve as a great way to extend your system.

Subscribers cannot modify or produce events.

These are generally great for creating different read models of your data, reporting, and reacting to certain events.

  • Consumer: is an actor that consumes events, and can do whatever it wants with them, and give the user full control of the state, lifecycle and everything elese.
  • Policy: is a one-of task that should run on an event, at the moment there is no retries and signal will ignore the return values of these tasks.

🛑 Policies are early in development, not well tested and might result in performance bottlenecks.

pub type Subscriber(state, event) {
  Consumer(process.Subject(ConsumerMessage(state, event)))
  Policy(task.Task(Event(event)))
}

Constructors

  • Consumer(process.Subject(ConsumerMessage(state, event)))
  • Policy(task.Task(Event(event)))

Telemetry events produced by signal, these can be used for logging, metric collection and tracing

pub type TelemetryEvent {
  PoolCreatingAggregate(aggregate_id: String)
  PoolCreatedAggregate(aggregate_id: String)
  PoolCannotCreateAggregateWithId(aggregate_id: String)
  PoolHydratingAggregate(aggregate_id: String)
  PoolHydratedAggregate(aggregate_id: String)
  PoolAggregateNotFound(aggregate_id: String)
  PoolRebalancingStarted(size: Int)
  PoolEvictedAggregate(aggregate_id: String)
  PoolRebalancingCompleted(new_size: Int)
  AggregateProcessingCommand(
    command_name: String,
    aggregate_id: String,
  )
  AggregateProcessedCommand(
    command_name: String,
    aggregate_id: String,
  )
  AggregateCommandProcessingFailed(
    command_name: String,
    aggregate_id: String,
  )
  AggregateEventsProduced(
    event_name: String,
    aggregate_id: String,
  )
  BusTriggeringSubscribers(event_name: String, subscribers: Int)
  BusSubscribersInformed(event_name: String, subscribers: Int)
  StorePushedEventToWriteAheadLog(
    event_name: String,
    pool_size: Int,
  )
  StoreWriteAheadLogSizeWarning(pool_size: Int)
  StoreSubmittedBatchForPersistance(batch_size: Int)
  StorePersistanceCompleted(processed: Int, wal: Int)
}

Constructors

  • PoolCreatingAggregate(aggregate_id: String)
  • PoolCreatedAggregate(aggregate_id: String)
  • PoolCannotCreateAggregateWithId(aggregate_id: String)
  • PoolHydratingAggregate(aggregate_id: String)
  • PoolHydratedAggregate(aggregate_id: String)
  • PoolAggregateNotFound(aggregate_id: String)
  • PoolRebalancingStarted(size: Int)
  • PoolEvictedAggregate(aggregate_id: String)
  • PoolRebalancingCompleted(new_size: Int)
  • AggregateProcessingCommand(
      command_name: String,
      aggregate_id: String,
    )
  • AggregateProcessedCommand(
      command_name: String,
      aggregate_id: String,
    )
  • AggregateCommandProcessingFailed(
      command_name: String,
      aggregate_id: String,
    )
  • AggregateEventsProduced(event_name: String, aggregate_id: String)
  • BusTriggeringSubscribers(event_name: String, subscribers: Int)
  • BusSubscribersInformed(event_name: String, subscribers: Int)
  • StorePushedEventToWriteAheadLog(
      event_name: String,
      pool_size: Int,
    )
  • StoreWriteAheadLogSizeWarning(pool_size: Int)
  • StoreSubmittedBatchForPersistance(batch_size: Int)
  • StorePersistanceCompleted(processed: Int, wal: Int)

Simple message interface for logging events.

The template string is separated with | and is used to format the message. It can be populated with the telemetry event data using a helper function format_telemetry_message.

pub type TelemetryMessage {
  Report(event: TelemetryEvent, template: String)
  ShutdownTelemetry
}

Constructors

  • Report(event: TelemetryEvent, template: String)
  • ShutdownTelemetry

Functions

pub fn aggregate(
  signal: Subject(SignalMessage(a, b, c)),
  id: String,
) -> Result(Subject(AggregateMessage(a, b, c)), String)

Use this function to retrieve a particular aggregate from the signal event store. This will return a subject which can then be used to interact with state of you aggregate, or process further commands.

Command handling example:

let result = signal.aggregate(em, "how-to-gleam")
|> signal.handle_command(CommentOnPost("how-to-gleam"))

Getting state example:

let post = signal.aggregate(em, "how-to-gleam")
|> signal.get_state()
pub fn configure(
  agg: AggregateConfig(a, b, c),
) -> SignalConfig(a, d, b, c)

This is a configuration object that lets you set up your signal instance.

You should put the configuration somewhere in your app’s startup code.

let aggregate_config = AggregateConfig(
  initial_state: my_default_aggregate,
  command_handler: my_command_handler,
  event_handler: my_event_handler
)

let store = signal.configure(aggregate_config)
|> signal.with_persistance_layer(my_storage)
|> signal.with_subscriber(my_notification_client)
|> signal.with_subscriber(my_metrics_counter)
|> signal.start()
pub fn console_logger(
  log_info: Bool,
  log_debug: Bool,
) -> fn(TelemetryMessage, Nil) -> Next(a, Nil)

A simple console logger that logs telemetry events to the console.

pub fn create(
  signal: Subject(SignalMessage(a, b, c)),
  id: String,
) -> Result(Subject(AggregateMessage(a, b, c)), String)

Creates a new aggregate with a given ID.

The ID needs to be unique, otherwise creation will fail.

pub fn format_telemetry_message(
  data: TelemetryEvent,
  template: String,
) -> String

Formats a telemetry message using a template string and telemetry event data.

pub fn get_current_pool_size(
  signal: Subject(SignalMessage(a, b, c)),
) -> Int

Gets the current size of the aggregate pool in memory, mainly for testing.

pub fn get_id(agg: Subject(AggregateMessage(a, b, c))) -> String

Gets the id of the aggregate actor.

pub fn get_state(agg: Subject(AggregateMessage(a, b, c))) -> a

Use this function to get the current state of your aggregate.

let post = signal.aggregate(em, "how-to-gleam")
|> signal.get_state()
pub fn handle_command(
  agg: Subject(AggregateMessage(a, b, c)),
  command: b,
) -> Result(a, String)

Use this function to have your aggregate process a command.

let result = signal.aggregate(em, "how-to-gleam")
|> signal.handle_command(CreatePost("how-to-gleam"))
pub fn in_memory_persistance_handler(
  message: StoreMessage(a),
  state: List(Event(a)),
) -> Next(b, List(Event(a)))

Only public for testing purposes, you do not need to use this, it is a signal default.

pub fn start(
  config: SignalConfig(a, b, c, d),
) -> Result(Subject(SignalMessage(a, c, d)), StartError)

Starts the signal services and returns a subject used to interact with the event store.

pub fn telemetry_log_level(ev: TelemetryEvent) -> LogLevel

Default logging level for telemetry events, when implementing a custom logger, you can use this to filter out events.

pub fn with_custom_logger(
  config: SignalConfig(a, b, c, d),
  logger: Subject(TelemetryMessage),
) -> SignalConfig(a, b, c, d)

Allows for custom logging of telemetry events.

pub fn with_persistance_layer(
  config: SignalConfig(a, b, c, d),
  persist: Subject(StoreMessage(d)),
) -> SignalConfig(a, b, c, d)

Configures signal to store events using a particular persistance layer.

Signal will default to an in-memory store which is recommended for development.

WIP - I am working on some persistance layers, but for now, you can bring your own, or play around with in-memory persistance.

pub fn with_pool_size_limit(
  config: SignalConfig(a, b, c, d),
  aggregates_in_memory: Int,
) -> SignalConfig(a, b, c, d)

Defines the maximum number of aggregates kept in memory. Defaults to 100, lower it if you desire lower memory consumption, increase it if you desire higher performance.

When an aggregate which is not in the pool is requested, signal has to rebuild it from events in the database.

⚠️ Large aggregates that contain a lot of data are an anti-pattern in event sourcing, instead of lowering the pool size, you might want to consider breaking up your aggregate and redesigning it, or storing some data using a different persistance method.

pub fn with_subscriber(
  config: SignalConfig(a, b, c, d),
  sub: Subscriber(b, d),
) -> SignalConfig(a, b, c, d)

Subscribers can be one-of tasks (policies) or actors (consumers) that consume events generated by the aggregate. They are called for each event produced by all aggregates.

This is a great way of projecting state in a very specific way. Think of it as letting you create different read models of your data, or trigger some other specifics when an event is generated.

You can even use this method to trigger commands to your other aggregates, but be careful, that can make it difficult to track the state of you application!

Example consumer:

fn event_counter( message: signal.ConsumerMessage(MyBlogEvent), event_count: Int ) {
  case messasge {
    ShutdownConsumer -> actor.stop(process.Normal)
    Consume(signal.Event(_)) -> actor.continue(event_count + 1)
    GetConsumerState(s) -> {
      process.send(s, event_count)
      actor.continue(event_count)
    }
  }
} 

There are a few things to note about Consumers:

  • Signal will not start or stop your consumers, their lifetime is in your control.
  • Signal will ignore any returned data
  • Your actor should accept the messages which are actually the Events you defined at configuration time
pub fn without_debug_logging(
  config: SignalConfig(a, b, c, d),
) -> SignalConfig(a, b, c, d)

Disables debug level logging for the default logger.

⚠️ this setting does not affect custom loggers.

pub fn without_info_logging(
  config: SignalConfig(a, b, c, d),
) -> SignalConfig(a, b, c, d)

Disables info level logging for the default logger.

⚠️ this setting does not affect custom loggers.

Search Document