actorx/filter

Filter operators for ActorX

These operators filter elements from an observable sequence. All stateful operators use actors for proper state management across async boundaries.

Values

pub fn choose(
  source: types.Observable(a),
  chooser: fn(a) -> option.Option(b),
) -> types.Observable(b)

Applies a function to each element that returns Option. Emits Some values, skips None values.

pub fn default_if_empty(
  source: types.Observable(a),
  default: a,
) -> types.Observable(a)

Emits a default value if the source completes without emitting.

Example

empty()
|> default_if_empty(42)
// Emits: 42, then completes
pub fn distinct(
  source: types.Observable(a),
) -> types.Observable(a)

Filters out all duplicate values, not just consecutive ones.

Uses a list to track seen values. For large streams, consider using distinct_until_changed which only tracks the previous value.

Example

from_list([1, 2, 1, 3, 2, 4, 1])
|> distinct()
// Emits: 1, 2, 3, 4
pub fn distinct_until_changed(
  source: types.Observable(a),
) -> types.Observable(a)

Emits elements that are different from the previous element.

pub fn filter(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Filters elements based on a predicate. Only elements for which predicate returns True are emitted.

pub fn first(source: types.Observable(a)) -> types.Observable(a)

Takes only the first element from the source.

Errors if the source completes without emitting.

Example

from_list([1, 2, 3])
|> first()
// Emits: 1, then completes
pub fn last(source: types.Observable(a)) -> types.Observable(a)

Takes only the last element from the source.

Errors if the source completes without emitting.

Example

from_list([1, 2, 3])
|> last()
// Emits: 3, then completes
pub fn sample(
  source: types.Observable(a),
  sampler: types.Observable(b),
) -> types.Observable(a)

Samples the source observable when the sampler observable emits.

Each time the sampler emits, the most recent value from the source is emitted (if any new value has arrived since last sample).

Example

// Emit mouse position every 100ms
mouse_moves
|> sample(interval(100))
pub fn skip(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Skips the first N elements from the source.

pub fn skip_while(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Skips elements while predicate returns True.

pub fn take(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Returns the first N elements from the source.

pub fn take_last(
  source: types.Observable(a),
  count: Int,
) -> types.Observable(a)

Returns the last N elements from the source.

pub fn take_until(
  source: types.Observable(a),
  other: types.Observable(b),
) -> types.Observable(a)

Returns elements until the other observable emits.

pub fn take_while(
  source: types.Observable(a),
  predicate: fn(a) -> Bool,
) -> types.Observable(a)

Takes elements while predicate returns True.

Search Document