actorx/combine

Combining operators for ActorX

These operators combine multiple observable sequences:

Values

pub fn amb(
  sources: List(types.Observable(a)),
) -> types.Observable(a)

Returns the observable that emits first.

Once any source emits a value, that source “wins” and all other sources are ignored. Also known as race.

Example

amb([
  timer(100) |> map(fn(_) { "slow" }),
  timer(50) |> map(fn(_) { "fast" }),
])
// Emits: "fast", then completes
pub fn combine_latest(
  source1: types.Observable(a),
  source2: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines the latest values from two observable sequences using a combiner function. Emits whenever either source emits, after both have emitted at least once. Completes when both sources complete.

Example

let obs1 = interval(100)
let obs2 = interval(150)
combine_latest(obs1, obs2, fn(a, b) { #(a, b) })
pub fn concat(
  sources: List(types.Observable(a)),
) -> types.Observable(a)

Concatenates multiple observable sequences sequentially.

Subscribes to each source only after the previous one completes. Values from each source are emitted in order.

Example

let obs1 = from_list([1, 2])
let obs2 = from_list([3, 4])
concat([obs1, obs2])
// Emits: 1, 2, 3, 4
pub fn concat2(
  source1: types.Observable(a),
  source2: types.Observable(a),
) -> types.Observable(a)

Concatenates two observable sequences.

pub fn fork_join(
  sources: List(types.Observable(a)),
) -> types.Observable(List(a))

Waits for all observables to complete, then emits a list of their last values.

If any source errors, the error is propagated immediately. If any source completes without emitting, an error is raised.

Example

fork_join([
  from_list([1, 2, 3]),   // last: 3
  from_list([4, 5]),       // last: 5
  single(6),               // last: 6
])
// Emits: [3, 5, 6], then completes
pub fn merge(
  sources: List(types.Observable(a)),
) -> types.Observable(a)

Merges multiple observable sequences into a single observable sequence. Values are emitted as they arrive from any source. Completes when all sources complete.

Example

let obs1 = interval(100) |> take(3)
let obs2 = interval(150) |> take(2)
merge([obs1, obs2])
// Emits values from both as they arrive
pub fn merge2(
  source1: types.Observable(a),
  source2: types.Observable(a),
) -> types.Observable(a)

Merge two observables.

pub fn race(
  sources: List(types.Observable(a)),
) -> types.Observable(a)

Alias for amb - returns the observable that emits first.

pub fn with_latest_from(
  source: types.Observable(a),
  sampler: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines the source observable with the latest value from another observable. Emits only when the source emits, pairing with the latest value from sampler. Values from sampler before the first source emission are ignored. Completes when source completes.

Example

let clicks = from_clicks()
let mouse_pos = from_mouse_moves()
with_latest_from(clicks, mouse_pos, fn(click, pos) { #(click, pos) })
// Emits position at each click
pub fn zip(
  source1: types.Observable(a),
  source2: types.Observable(b),
  combiner: fn(a, b) -> c,
) -> types.Observable(c)

Combines two observable sequences by pairing their elements by index. Emits when both sources have emitted a value at the same index. Completes when either source completes.

Example

let obs1 = from_list([1, 2, 3])
let obs2 = from_list(["a", "b", "c"])
zip(obs1, obs2, fn(n, s) { #(n, s) })
// Emits: #(1, "a"), #(2, "b"), #(3, "c")
Search Document