actorx/transform
Transform operators for ActorX
These operators transform the elements of an observable sequence:
- map/mapi: Apply a function to each element (with optional index)
- flat_map/flat_mapi: Map to observables and flatten (= mapi + merge_inner)
- concat_map/concat_mapi: Map to observables and concatenate (= mapi + concat_inner)
- merge_inner: Flatten Observable(Observable(a)) by merging
- concat_inner: Flatten Observable(Observable(a)) by concatenating
- scan: Running accumulation
- reduce: Final accumulation on completion
- group_by: Group elements into sub-observables by key
Values
pub fn concat_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by concatenating in order.
Subscribes to each inner observable only after the previous one completes. Queues inner observables and processes them sequentially.
Example
// Flatten a stream of streams in order
source_of_sources
|> concat_inner()
pub fn concat_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Projects each element into an observable and concatenates them in order.
This is composed from map and concat_inner:
concat_map(source, f) = source |> map(f) |> concat_inner()
Unlike flat_map, this preserves the order of inner observables.
Each inner observable is fully processed before the next one starts.
Example
from_list([1, 2, 3])
|> concat_map(fn(x) { from_list([x, x * 10]) })
// Emits: 1, 10, 2, 20, 3, 30
pub fn concat_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Projects each element and its index into an observable and concatenates in order.
This is composed from mapi and concat_inner:
concat_mapi(source, f) = source |> mapi(f) |> concat_inner()
Example
from_list(["a", "b"])
|> concat_mapi(fn(x, i) { from_list([#(i, x), #(i, x <> "!")]) })
// Emits: #(0, "a"), #(0, "a!"), #(1, "b"), #(1, "b!")
pub fn flat_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Projects each element of an observable sequence into an observable sequence and merges the resulting observable sequences.
This is composed from map and merge_inner:
flat_map(source, f) = source |> map(f) |> merge_inner()
Example
interval(100)
|> take(3)
|> flat_map(fn(i) { timer(50) |> map(fn(_) { i }) })
pub fn flat_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Projects each element and its index into an observable and merges results.
This is composed from mapi and merge_inner:
flat_mapi(source, f) = source |> mapi(f) |> merge_inner()
Example
from_list(["a", "b", "c"])
|> flat_mapi(fn(x, i) { from_list([#(i, x), #(i, x <> "!")]) })
pub fn group_by(
source: types.Observable(a),
key_selector: fn(a) -> k,
) -> types.Observable(#(k, types.Observable(a)))
Groups elements of an observable by key, returning an observable of grouped observables.
Each time a new key is encountered, emits a tuple of (key, Observable). All elements with that key are forwarded to the corresponding group’s observable.
Example
// Group numbers by even/odd
from_list([1, 2, 3, 4, 5, 6])
|> group_by(fn(x) { x % 2 })
|> flat_map(fn(group) {
let #(key, values) = group
values |> map(fn(v) { #(key, v) })
})
// Emits: #(1, 1), #(0, 2), #(1, 3), #(0, 4), #(1, 5), #(0, 6)
pub fn map(
source: types.Observable(a),
mapper: fn(a) -> b,
) -> types.Observable(b)
Returns an observable whose elements are the result of invoking the mapper function on each element of the source.
pub fn mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> b,
) -> types.Observable(b)
Returns an observable whose elements are the result of invoking the mapper function on each element and its index.
Example
from_list(["a", "b", "c"])
|> mapi(fn(x, i) { #(i, x) })
// Emits: #(0, "a"), #(1, "b"), #(2, "c")
pub fn merge_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by merging inner emissions.
Subscribes to each inner observable as it arrives and forwards all emissions. Completes only when the outer source AND all inner observables have completed.
Example
// Flatten a stream of streams
source_of_sources
|> merge_inner()
pub fn pairwise(
source: types.Observable(a),
) -> types.Observable(#(a, a))
Emits consecutive pairs of values.
Example
from_list([1, 2, 3, 4])
|> pairwise()
// Emits: #(1, 2), #(2, 3), #(3, 4)
pub fn reduce(
source: types.Observable(a),
initial: b,
accumulator: fn(b, a) -> b,
) -> types.Observable(b)
Applies an accumulator function over the source, emitting only the final accumulated value when the source completes.
Example
from_list([1, 2, 3, 4, 5])
|> reduce(0, fn(acc, x) { acc + x })
// Emits: 15, then completes
pub fn scan(
source: types.Observable(a),
initial: b,
accumulator: fn(b, a) -> b,
) -> types.Observable(b)
Applies an accumulator function over the source, emitting each intermediate result.
Example
from_list([1, 2, 3, 4, 5])
|> scan(0, fn(acc, x) { acc + x })
// Emits: 1, 3, 6, 10, 15
pub fn start_with(
source: types.Observable(a),
values: List(a),
) -> types.Observable(a)
Prepends values before the source emissions.
Example
from_list([3, 4, 5])
|> start_with([1, 2])
// Emits: 1, 2, 3, 4, 5
pub fn switch_inner(
source: types.Observable(types.Observable(a)),
) -> types.Observable(a)
Flattens an Observable of Observables by switching to the latest.
When a new inner observable arrives, the previous inner is disposed and the new one becomes active. Only emissions from the current inner are forwarded.
This is useful for scenarios like search-as-you-type where you only care about the latest request.
Example
// Cancel previous search when new query arrives
search_queries
|> switch_inner()
pub fn switch_map(
source: types.Observable(a),
mapper: fn(a) -> types.Observable(b),
) -> types.Observable(b)
Projects each element into an observable and switches to the latest.
This is composed from map and switch_inner:
switch_map(source, f) = source |> map(f) |> switch_inner()
When a new element arrives, the previous inner observable is cancelled and the mapper is called to create a new one.
Example
// Search as you type - cancel previous request on new keystroke
keystrokes
|> debounce(300)
|> switch_map(fn(query) { search_api(query) })
pub fn switch_mapi(
source: types.Observable(a),
mapper: fn(a, Int) -> types.Observable(b),
) -> types.Observable(b)
Projects each element and its index into an observable and switches to latest.
This is composed from mapi and switch_inner:
switch_mapi(source, f) = source |> mapi(f) |> switch_inner()
pub fn tap(
source: types.Observable(a),
effect: fn(a) -> Nil,
) -> types.Observable(a)
Performs a side effect for each emission without transforming.
Useful for debugging, logging, or triggering external actions.
Example
source
|> tap(fn(x) { io.println("Got: " <> int.to_string(x)) })
|> map(fn(x) { x * 2 })