actorx
ActorX - Reactive Extensions for Gleam using BEAM actors
A reactive programming library that composes BEAM actors for building asynchronous, event-driven applications.
Example
import actorx
import gleam/io
import gleam/int
pub fn main() {
let observable =
actorx.from_list([1, 2, 3, 4, 5])
|> actorx.map(fn(x) { x * 2 })
|> actorx.filter(fn(x) { x > 4 })
let observer = actorx.make_observer(
on_next: fn(x) { io.println(int.to_string(x)) },
on_error: fn(err) { io.println("Error: " <> err) },
on_completed: fn() { io.println("Done!") },
)
actorx.subscribe(observable, observer)
}
Types
pub type Disposable =
types.Disposable
pub type Notification(a) =
types.Notification(a)
pub type Observable(a) =
types.Observable(a)
pub type Observer(a) =
types.Observer(a)
Values
pub fn amb(
sources: List(types.Observable(a)),
) -> types.Observable(a)
Returns the observable that emits first.
Also known as race. Other sources are ignored after one emits.
pub fn call_actor(
actor_subject: process.Subject(msg),
timeout_ms: Int,
make_request: fn(process.Subject(response)) -> msg,
) -> types.Observable(response)
Creates a cold Observable that performs a request-response call to an actor.
Each subscription triggers a new request. The make_request function
receives a reply Subject and should return the message to send to the actor.
Example
type CounterMsg {
Increment(Int)
GetValue(Subject(Int))
}
// Single call
actorx.call_actor(counter, 1000, GetValue)
|> actorx.subscribe(observer)
// Periodic polling
actorx.interval(1000)
|> actorx.flat_map(fn(_) {
actorx.call_actor(counter, 1000, GetValue)
})
|> actorx.subscribe(value_observer)
pub fn catch(
source: types.Observable(a),
handler: fn(String) -> types.Observable(a),
) -> types.Observable(a)
On error, switches to a fallback observable returned by the handler.
Also known as catch_error or on_error_resume_next.
Example
// On error, emit a default value
risky_observable
|> catch(fn(_error) { single(default_value) })
pub fn choose(
source: types.Observable(a),
chooser: fn(a) -> option.Option(b),
) -> types.Observable(b)
Filter and map in one operation.
pub fn combine_latest(
source1: types.Observable(a),
source2: types.Observable(b),
combiner: fn(a, b) -> c,
) -> types.Observable(c)
Combines the latest values from two observable sequences. Emits whenever either source emits, after both have emitted at least once. Completes when both sources complete.
pub fn concat(
sources: List(types.Observable(a)),
) -> types.Observable(a)
Concatenates multiple observables sequentially. Subscribes to each source only after the previous completes.
pub fn concat2(
source1: types.Observable(a),
source2: types.Observable(a),
) -> types.Observable(a)
Concatenates two observables sequentially.
pub fn concat_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by concatenating in order.
Subscribes to each inner observable only after the previous one completes. Queues inner observables and processes them sequentially.
pub fn concat_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Project each element to an observable and concatenate in order.
Composed from map and concat_inner:
concat_map(source, f) = source |> map(f) |> concat_inner()
Unlike flat_map, this preserves the order of inner observables.
pub fn concat_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Project each element and its index to an observable and concatenate in order.
Composed from mapi and concat_inner:
concat_mapi(source, f) = source |> mapi(f) |> concat_inner()
pub fn create(
subscribe_fn: fn(types.Observer(a)) -> types.Disposable,
) -> types.Observable(a)
Create an observable from a subscribe function.
pub fn debounce(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Emits a value only after the specified time has passed without another value being emitted.
pub fn default_if_empty(
source: types.Observable(a),
default: a,
) -> types.Observable(a)
Emit default value if source completes empty.
pub fn defer(
factory: fn() -> types.Observable(a),
) -> types.Observable(a)
Create an observable that calls a factory function on each subscription.
pub fn delay(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Delays each emission from the source observable by the specified time.
pub fn distinct(
source: types.Observable(a),
) -> types.Observable(a)
Filter out all duplicate values (not just consecutive).
pub fn distinct_until_changed(
source: types.Observable(a),
) -> types.Observable(a)
Emit only when value changes from previous.
pub fn empty() -> types.Observable(a)
Create an observable that completes immediately without emitting.
pub fn fail(error: String) -> types.Observable(a)
Create an observable that errors immediately.
pub fn filter(
source: types.Observable(a),
predicate: fn(a) -> Bool,
) -> types.Observable(a)
Filter elements based on a predicate.
pub fn first(source: types.Observable(a)) -> types.Observable(a)
Take only the first element. Errors if source is empty.
pub fn flat_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Project each element to an observable and flatten.
Composed from map and merge_inner:
flat_map(source, f) = source |> map(f) |> merge_inner()
pub fn flat_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Project each element and its index to an observable and flatten.
Composed from mapi and merge_inner:
flat_mapi(source, f) = source |> mapi(f) |> merge_inner()
pub fn fork_join(
sources: List(types.Observable(a)),
) -> types.Observable(List(a))
Waits for all observables to complete, then emits list of their last values.
pub fn from_list(items: List(a)) -> types.Observable(a)
Create an observable from a list of values.
pub fn from_subject() -> #(
process.Subject(a),
types.Observable(a),
)
Creates a Subject and Observable pair for bridging actors.
Returns a tuple of (Subject, Observable) where:
- Values sent to the Subject are emitted to all Observable subscribers
- The Observable never completes on its own - use
take_untilfor completion
This is useful for having an actor publish events that can be consumed reactively via the Observable.
Example
let #(events_subject, events_observable) = actorx.from_subject()
// Pass subject to a producer actor
let _producer = start_event_producer(events_subject)
// Subscribe to the observable
events_observable
|> actorx.filter(is_high_priority)
|> actorx.take_until(shutdown_signal)
|> actorx.subscribe(alert_observer)
pub fn group_by(
source: types.Observable(a),
key_selector: fn(a) -> k,
) -> types.Observable(#(k, types.Observable(a)))
Groups elements by key, returning an observable of grouped observables.
Each time a new key is encountered, emits a tuple of (key, Observable). All elements with that key are forwarded to the corresponding group’s observable.
Example
// Group numbers by even/odd
from_list([1, 2, 3, 4, 5, 6])
|> group_by(fn(x) { x % 2 })
|> flat_map(fn(group) {
let #(key, values) = group
values |> map(fn(v) { #(key, v) })
})
pub fn interval(period_ms: Int) -> types.Observable(Int)
Creates an observable that emits incrementing integers (0, 1, 2, …) at regular intervals.
pub fn last(source: types.Observable(a)) -> types.Observable(a)
Take only the last element. Errors if source is empty.
pub fn make_next_observer(
on_next: fn(a) -> Nil,
) -> types.Observer(a)
Create an observer that only handles OnNext events.
pub fn make_observer(
on_next on_next: fn(a) -> Nil,
on_error on_error: fn(String) -> Nil,
on_completed on_completed: fn() -> Nil,
) -> types.Observer(a)
Create an observer from three callback functions.
pub fn map(
source: types.Observable(a),
mapper: fn(a) -> b,
) -> types.Observable(b)
Transform each element using a mapper function.
pub fn mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> b,
) -> types.Observable(b)
Transform each element using a mapper function that also receives the index.
Example
from_list(["a", "b", "c"])
|> mapi(fn(x, i) { #(i, x) })
// Emits: #(0, "a"), #(1, "b"), #(2, "c")
pub fn merge(
sources: List(types.Observable(a)),
) -> types.Observable(a)
Merges multiple observable sequences into a single observable sequence. Values are emitted as they arrive from any source. Completes when all sources complete.
pub fn merge2(
source1: types.Observable(a),
source2: types.Observable(a),
) -> types.Observable(a)
Merge two observables.
pub fn merge_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by merging inner emissions.
Subscribes to each inner observable as it arrives and forwards all emissions. Completes when outer AND all inners complete.
pub fn never() -> types.Observable(a)
Create an observable that never emits and never completes.
pub fn notify(
observer: types.Observer(a),
notification: types.Notification(a),
) -> Nil
Forward a notification to an observer.
pub fn on_completed(observer: types.Observer(a)) -> Nil
Send an OnCompleted notification to an observer.
pub fn on_error(
observer: types.Observer(a),
error: String,
) -> Nil
Send an OnError notification to an observer.
pub fn on_next(observer: types.Observer(a), value: a) -> Nil
Send an OnNext notification to an observer.
pub fn pairwise(
source: types.Observable(a),
) -> types.Observable(#(a, a))
Emits consecutive pairs of values.
Example
from_list([1, 2, 3, 4])
|> pairwise()
// Emits: #(1, 2), #(2, 3), #(3, 4)
pub fn publish(
source: types.Observable(a),
) -> #(types.Observable(a), fn() -> types.Disposable)
Converts a cold observable into a connectable hot observable.
Returns a tuple of (Observable, connect_fn) where:
- The Observable can be subscribed to by multiple observers
- The connect function starts the source subscription
Values are only emitted after connect() is called. Multiple subscribers share the same source subscription.
Example
let #(hot, connect) = publish(cold_source)
// Subscribe multiple observers (source not started yet)
let _d1 = hot |> actorx.subscribe(observer1)
let _d2 = hot |> actorx.subscribe(observer2)
// Now connect - source starts, both observers receive values
let connection = connect()
pub fn race(
sources: List(types.Observable(a)),
) -> types.Observable(a)
Alias for amb - returns the observable that emits first.
pub fn reduce(
source: types.Observable(a),
initial: b,
accumulator: fn(b, a) -> b,
) -> types.Observable(b)
Applies an accumulator function over the source, emitting only the final accumulated value when the source completes.
Example
from_list([1, 2, 3, 4, 5])
|> reduce(0, fn(acc, x) { acc + x })
// Emits: 15, then completes
pub fn retry(
source: types.Observable(a),
max_retries: Int,
) -> types.Observable(a)
Resubscribes to the source observable when an error occurs, up to the specified number of retries.
Example
// Retry up to 3 times on error
flaky_observable
|> retry(3)
pub fn sample(
source: types.Observable(a),
sampler: types.Observable(b),
) -> types.Observable(a)
Sample source when sampler emits.
pub fn scan(
source: types.Observable(a),
initial: b,
accumulator: fn(b, a) -> b,
) -> types.Observable(b)
Applies an accumulator function over the source, emitting each intermediate result.
Example
from_list([1, 2, 3, 4, 5])
|> scan(0, fn(acc, x) { acc + x })
// Emits: 1, 3, 6, 10, 15
pub fn share(source: types.Observable(a)) -> types.Observable(a)
Shares a single subscription to the source among multiple subscribers.
Automatically connects when the first subscriber subscribes, and disconnects when the last subscriber unsubscribes.
This is equivalent to publish(source) with automatic reference counting.
Example
let shared =
interval(100)
|> share()
// First subscriber - source starts
let d1 = shared |> actorx.subscribe(observer1)
// Second subscriber - shares same source
let d2 = shared |> actorx.subscribe(observer2)
pub fn single(value: a) -> types.Observable(a)
Create an observable that emits a single value then completes.
pub fn single_subject() -> #(
types.Observer(a),
types.Observable(a),
)
Creates a single-subscriber subject.
Returns a tuple of (Observer, Observable) where:
- The Observer side is used to push notifications
- The Observable side can be subscribed to (once only!)
pub fn skip(
source: types.Observable(a),
count: Int,
) -> types.Observable(a)
Skip the first N elements.
pub fn skip_while(
source: types.Observable(a),
predicate: fn(a) -> Bool,
) -> types.Observable(a)
Skip elements while predicate returns True.
pub fn start_with(
source: types.Observable(a),
values: List(a),
) -> types.Observable(a)
Prepends values before the source emissions.
pub fn subject() -> #(types.Observer(a), types.Observable(a))
Creates a multicast subject that allows multiple subscribers.
Returns a tuple of (Observer, Observable) where:
- The Observer side is used to push notifications
- The Observable side can be subscribed to by multiple observers
Unlike single_subject, notifications are NOT buffered.
pub fn subscribe(
observable: types.Observable(a),
observer: types.Observer(a),
) -> types.Disposable
Subscribe an observer to an observable.
pub fn switch_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by switching to the latest.
When a new inner arrives, the previous inner is cancelled. Useful for search-as-you-type patterns.
pub fn switch_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Project each element to an observable and switch to the latest.
Composed from map and switch_inner:
switch_map(source, f) = source |> map(f) |> switch_inner()
pub fn switch_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Project each element and its index to an observable and switch to latest.
pub fn take(
source: types.Observable(a),
count: Int,
) -> types.Observable(a)
Take the first N elements.
pub fn take_last(
source: types.Observable(a),
count: Int,
) -> types.Observable(a)
Take the last N elements (emitted on completion).
pub fn take_until(
source: types.Observable(a),
other: types.Observable(b),
) -> types.Observable(a)
Take elements until another observable emits.
pub fn take_while(
source: types.Observable(a),
predicate: fn(a) -> Bool,
) -> types.Observable(a)
Take elements while predicate returns True.
pub fn tap(
source: types.Observable(a),
effect: fn(a) -> Nil,
) -> types.Observable(a)
Performs a side effect for each emission without transforming.
pub fn throttle(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Rate limits emissions to at most one per specified period.
pub fn timeout(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Errors if no emission occurs within the specified timeout period. The timeout resets after each emission.
pub fn timer(delay_ms: Int) -> types.Observable(Int)
Creates an observable that emits 0 after the specified delay, then completes.
pub fn to_subject(
source: types.Observable(a),
target: process.Subject(a),
) -> types.Observable(a)
Sends each emitted value to a Subject while passing through to downstream.
Only OnNext values are sent to the Subject. OnError and OnCompleted are forwarded downstream but not sent to the Subject.
Example
actorx.interval(100)
|> actorx.take(10)
|> actorx.map(fn(n) { Increment(n) })
|> actorx.to_subject(counter_inbox)
|> actorx.subscribe(log_observer)
pub fn with_latest_from(
source: types.Observable(a),
sampler: types.Observable(b),
combiner: fn(a, b) -> c,
) -> types.Observable(c)
Combines the source observable with the latest value from another observable. Emits only when the source emits, pairing with the latest value from sampler. Completes when source completes.
pub fn zip(
source1: types.Observable(a),
source2: types.Observable(b),
combiner: fn(a, b) -> c,
) -> types.Observable(c)
Combines two observable sequences by pairing their elements by index. Emits when both sources have emitted a value at the same index. Completes when either source completes.