actorx

ActorX - Reactive Extensions for Gleam using BEAM actors

A reactive programming library that composes BEAM actors for building asynchronous, event-driven applications.

Example

import actorx
import gleam/io
import gleam/int

pub fn main() {
  let observable =
    actorx.from_list([1, 2, 3, 4, 5])
    |> actorx.map(fn(x) { x * 2 })
    |> actorx.filter(fn(x) { x > 4 })

  let observer = actorx.make_observer(
    on_next: fn(x) { io.println(int.to_string(x)) },
    on_error: fn(err) { io.println("Error: " <> err) },
    on_completed: fn() { io.println("Done!") },
  )

  actorx.subscribe(observable, observer)
}

Types

pub type Disposable =
  types.Disposable
pub type Notification(a) =
  types.Notification(a)
pub type Observable(a) =
  types.Observable(a)
pub type Observer(a) =
  types.Observer(a)

Values

pub fn choose(
  source: types.Observable(a),
  chooser: fn(a) -> option.Option(b),
) -> types.Observable(b)

Filter and map in one operation.

pub fn combine_latest(
  source1: types.Observable(a),
  source2: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines the latest values from two observable sequences. Emits whenever either source emits, after both have emitted at least once. Completes when both sources complete.

pub fn concat_map(
  source: types.Observable(a),
  mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)

Project each element to an observable and concatenate in order.

pub fn create(
  subscribe_fn: fn(types.Observer(a)) -> types.Disposable,
) -> types.Observable(a)

Create an observable from a subscribe function.

pub fn debounce(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Emits a value only after the specified time has passed without another value being emitted.

pub fn defer(
  factory: fn() -> types.Observable(a),
) -> types.Observable(a)

Create an observable that calls a factory function on each subscription.

pub fn delay(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Delays each emission from the source observable by the specified time.

pub fn distinct_until_changed(
  source: types.Observable(a),
) -> types.Observable(a)

Emit only when value changes from previous.

pub fn empty() -> types.Observable(a)

Create an observable that completes immediately without emitting.

pub fn empty_disposable() -> types.Disposable

Create an empty disposable.

pub fn fail(error: String) -> types.Observable(a)

Create an observable that errors immediately.

pub fn filter(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Filter elements based on a predicate.

pub fn flat_map(
  source: types.Observable(a),
  mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)

Project each element to an observable and flatten.

Uses an actor to coordinate inner subscriptions, making it safe for both sync and async sources. Properly tracks all inner subscriptions and only completes when both the source AND all inners have completed.

pub fn from_list(items: List(a)) -> types.Observable(a)

Create an observable from a list of values.

pub fn interval(period_ms: Int) -> types.Observable(Int)

Creates an observable that emits incrementing integers (0, 1, 2, …) at regular intervals.

pub fn make_next_observer(
  on_next: fn(a) -> Nil,
) -> types.Observer(a)

Create an observer that only handles OnNext events.

pub fn make_observer(
  on_next on_next: fn(a) -> Nil,
  on_error on_error: fn(String) -> Nil,
  on_completed on_completed: fn() -> Nil,
) -> types.Observer(a)

Create an observer from three callback functions.

pub fn map(
  source: types.Observable(a),
  mapper: fn(a) -> b,
) -> types.Observable(b)

Transform each element using a mapper function.

pub fn merge(
  sources: List(types.Observable(a)),
) -> types.Observable(a)

Merges multiple observable sequences into a single observable sequence. Values are emitted as they arrive from any source. Completes when all sources complete.

pub fn merge2(
  source1: types.Observable(a),
  source2: types.Observable(a),
) -> types.Observable(a)

Merge two observables.

pub fn never() -> types.Observable(a)

Create an observable that never emits and never completes.

pub fn notify(
  observer: types.Observer(a),
  notification: types.Notification(a),
) -> Nil

Forward a notification to an observer.

pub fn on_completed(observer: types.Observer(a)) -> Nil

Send an OnCompleted notification to an observer.

pub fn on_error(
  observer: types.Observer(a),
  error: String,
) -> Nil

Send an OnError notification to an observer.

pub fn on_next(observer: types.Observer(a), value: a) -> Nil

Send an OnNext notification to an observer.

pub fn single(value: a) -> types.Observable(a)

Create an observable that emits a single value then completes.

pub fn single_subject() -> #(
  types.Observer(a),
  types.Observable(a),
)

Creates a single-subscriber subject.

Returns a tuple of (Observer, Observable) where:

  • The Observer side is used to push notifications
  • The Observable side can be subscribed to (once only!)
pub fn skip(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Skip the first N elements.

pub fn skip_while(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Skip elements while predicate returns True.

pub fn subject() -> #(types.Observer(a), types.Observable(a))

Creates a multicast subject that allows multiple subscribers.

Returns a tuple of (Observer, Observable) where:

  • The Observer side is used to push notifications
  • The Observable side can be subscribed to by multiple observers

Unlike single_subject, notifications are NOT buffered.

pub fn subscribe(
  observable: types.Observable(a),
  observer: types.Observer(a),
) -> types.Disposable

Subscribe an observer to an observable.

pub fn take(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Take the first N elements.

pub fn take_last(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Take the last N elements (emitted on completion).

pub fn take_until(
  source: types.Observable(a),
  other: types.Observable(b),
) -> types.Observable(a)

Take elements until another observable emits.

pub fn take_while(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Take elements while predicate returns True.

pub fn throttle(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Rate limits emissions to at most one per specified period.

pub fn timer(delay_ms: Int) -> types.Observable(Int)

Creates an observable that emits 0 after the specified delay, then completes.

pub fn with_latest_from(
  source: types.Observable(a),
  sampler: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines the source observable with the latest value from another observable. Emits only when the source emits, pairing with the latest value from sampler. Completes when source completes.

pub fn zip(
  source1: types.Observable(a),
  source2: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines two observable sequences by pairing their elements by index. Emits when both sources have emitted a value at the same index. Completes when either source completes.

Search Document