ActorX for Gleam

Package Version Hex Docs

ActorX - Reactive Extensions for Gleam using BEAM actors. A reactive programming library that composes BEAM actors for building asynchronous, event-driven applications.

This is a port of FSharp.Control.AsyncRx to Gleam, targeting the Erlang/BEAM runtime.

Installation

gleam add actorx

Example

import actorx
import gleam/io
import gleam/int

pub fn main() {
  // Create an observable pipeline
  let observable =
    actorx.from_list([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    |> actorx.filter(fn(x) { x % 2 == 0 })  // Keep even numbers
    |> actorx.map(fn(x) { x * 10 })          // Multiply by 10
    |> actorx.take(3)                        // Take first 3

  // Create an observer
  let observer = actorx.make_observer(
    on_next: fn(x) { io.println("Value: " <> int.to_string(x)) },
    on_error: fn(_err) { Nil },
    on_completed: fn() { io.println("Done!") },
  )

  // Subscribe
  let _disposable = actorx.subscribe(observable, observer)
  // Output:
  // Value: 20
  // Value: 40
  // Value: 60
  // Done!
}

Builder Pattern with use

ActorX supports Gleam’s use keyword for monadic composition, similar to F#’s computation expressions:

import actorx
import actorx/builder.{bind, return}

pub fn example() {
  use x <- bind(actorx.single(10))
  use y <- bind(actorx.single(20))
  use z <- bind(actorx.from_list([1, 2, 3]))
  return(x + y + z)
}
// Emits: 31, 32, 33 then completes

Async Operators

ActorX provides time-based operators for async scenarios:

import actorx
import actorx/types.{Disposable}
import gleam/io
import gleam/int

pub fn async_example() {
  // Emit 0, 1, 2, ... every 100ms, take first 5
  let observable = actorx.interval(100)
    |> actorx.take(5)
    |> actorx.map(fn(x) { x * 10 })

  let observer = actorx.make_observer(
    on_next: fn(x) { io.println(int.to_string(x)) },
    on_error: fn(_) { Nil },
    on_completed: fn() { io.println("Done!") },
  )

  let Disposable(dispose) = actorx.subscribe(observable, observer)
  // Output over 500ms: 0, 10, 20, 30, 40, Done!

  // Can dispose early to cancel
  // dispose()
}

Subject Example

Subjects are both Observers and Observables - push values in, subscribe to receive them:

import actorx

pub fn subject_example() {
  let #(input, output) = actorx.single_subject()

  // Subscribe to output
  let _disp = actorx.subscribe(output, my_observer)

  // Push values through input
  actorx.on_next(input, 1)
  actorx.on_next(input, 2)
  actorx.on_completed(input)
}

Core Concepts

Observable

An Observable(a) represents a push-based stream of values of type a. Observables are lazy - they don’t produce values until subscribed to.

Observer

An Observer(a) receives notifications from an Observable:

The Rx contract guarantees: OnNext* (OnError | OnCompleted)?

Disposable

A Disposable represents a subscription that can be cancelled. Call dispose() to unsubscribe and release resources.

Available Operators

Creation

OperatorDescription
single(value)Emit single value, then complete
empty()Complete immediately
never()Never emit, never complete
fail(error)Error immediately
from_list(items)Emit all items from list
defer(factory)Create observable lazily on subscribe

Transform

OperatorDescription
map(source, fn)Transform each element
flat_map(source, fn)Map to observables, merge results (actor-based)
concat_map(source, fn)Map to observables, concatenate in order

Filter

OperatorDescription
filter(source, predicate)Keep elements matching predicate
take(source, n)Take first N elements
skip(source, n)Skip first N elements
take_while(source, predicate)Take while predicate is true
skip_while(source, predicate)Skip while predicate is true
choose(source, fn)Filter + map via Option
distinct_until_changed(source)Skip consecutive duplicates
take_until(source, other)Take until other observable emits
take_last(source, n)Emit last N elements on completion

Timeshift (Async)

OperatorDescription
timer(delay_ms)Emit 0 after delay, then complete
interval(period_ms)Emit 0, 1, 2, … at regular intervals
delay(source, ms)Delay each emission by specified time
debounce(source, ms)Emit only after silence period
throttle(source, ms)Rate limit to at most one per period

Subject

OperatorDescription
subject()Multicast subject, allows multiple subscribers
single_subject()Single-subscriber subject, buffers until subscribed

Combine

OperatorDescription
merge(sources)Merge multiple observables into one
merge2(source1, source2)Merge two observables
combine_latest(s1, s2, fn)Combine latest values from two sources
with_latest_from(source, s2, fn)Sample source with latest from another
zip(source1, source2, fn)Pair elements by index

Builder (for use syntax)

FunctionDescription
bind(source, fn)FlatMap for use syntax
return(value)Lift value into observable
map_over(source, fn)Map for use syntax
filter_with(source, fn)Filter for use syntax
for_each(list, fn)Iterate list, concat results

Design

Why Gleam + BEAM?

The original F# AsyncRx uses MailboxProcessor (actors) to:

  1. Serialize notifications (no concurrent observer calls)
  2. Enforce Rx grammar
  3. Manage state safely

Gleam on BEAM is a natural fit because:

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Observable │────▶│  Operator   │────▶│  Observer   │
│   (source)  │     │  (transform)│     │  (sink)     │
└─────────────┘     └─────────────┘     └─────────────┘
                           │
                    ┌──────┴──────┐
                    │ State Actor │
                    │ (for take,  │
                    │  skip, etc) │
                    └─────────────┘

Each stateful operator can use an actor to maintain state safely across async boundaries.

Current Implementation

ActorX uses two complementary approaches for state management:

Synchronous operators (like from_list, map, filter) use Erlang’s process dictionary for mutable state:

Asynchronous operators (like timer, interval, delay, debounce, throttle) use spawned processes:

Safe Observer

The safe_observer module provides Rx grammar enforcement:

Roadmap

Phase 1: Core Operators ✓

Phase 2: Actor-Based Async ✓

Phase 3: Combining Operators ✓

Phase 4: Advanced Features

Phase 5: Integration

Examples

Timeflies Demo

A classic Rx demo where letters trail behind your mouse cursor. Demonstrates subject, flat_map, delay, and WebSocket integration with Mist.

cd examples/timeflies
gleam run
# Open http://localhost:3000

Development

gleam build  # Build the project
gleam test   # Run the tests

License

MIT

Related Projects

Search Document