actorx/timeshift

Timeshift operators for ActorX

These operators work with time-based asynchronous streams:

IMPORTANT: These operators are asynchronous. After subscribing, use process.sleep or a proper async wait mechanism to receive values.

Values

pub fn debounce(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Emits a value only after the specified time has passed without another value being emitted.

Example

search_input
|> debounce(300)  // Wait 300ms after last keystroke
|> actorx.subscribe(observer)
pub fn delay(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Delays each emission from the source observable by the specified time.

Example

source
|> delay(100)  // Delay each value by 100ms
|> actorx.subscribe(observer)
pub fn interval(period_ms: Int) -> types.Observable(Int)

Creates an observable that emits incrementing integers (0, 1, 2, …) at regular intervals.

Example

// Emit every 100ms
interval(100)
|> actorx.take(5)  // Take first 5 values
|> actorx.subscribe(observer)
pub fn throttle(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Rate limits emissions to at most one per specified period. Emits the first value immediately, then samples the latest value at the end of each window.

Example

mouse_moves
|> throttle(100)  // At most one emission per 100ms
|> actorx.subscribe(observer)
pub fn timeout(
  source: types.Observable(a),
  ms: Int,
) -> types.Observable(a)

Errors if no emission occurs within the specified timeout period.

The timeout resets after each emission. If the source doesn’t emit within the timeout, an error is raised.

Example

slow_source
|> timeout(5000)  // Error if no emission within 5 seconds
pub fn timer(delay_ms: Int) -> types.Observable(Int)

Creates an observable that emits 0 after the specified delay, then completes.

Example

// Emit 0 after 100ms
timer(100)
|> actorx.subscribe(observer)

// Wait for async completion
process.sleep(150)
Search Document