actorx/timeshift
Timeshift operators for ActorX
These operators work with time-based asynchronous streams:
- timer: Emit a single value after a delay
- interval: Emit incrementing values at regular intervals
- delay: Delay each emission by a specified time
- debounce: Emit only after silence (no new values) for a period
- throttle: Rate limit emissions to at most one per period
IMPORTANT: These operators are asynchronous. After subscribing, use
process.sleep or a proper async wait mechanism to receive values.
Values
pub fn debounce(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Emits a value only after the specified time has passed without another value being emitted.
Example
search_input
|> debounce(300) // Wait 300ms after last keystroke
|> actorx.subscribe(observer)
pub fn delay(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Delays each emission from the source observable by the specified time.
Example
source
|> delay(100) // Delay each value by 100ms
|> actorx.subscribe(observer)
pub fn interval(period_ms: Int) -> types.Observable(Int)
Creates an observable that emits incrementing integers (0, 1, 2, …) at regular intervals.
Example
// Emit every 100ms
interval(100)
|> actorx.take(5) // Take first 5 values
|> actorx.subscribe(observer)
pub fn throttle(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Rate limits emissions to at most one per specified period. Emits the first value immediately, then samples the latest value at the end of each window.
Example
mouse_moves
|> throttle(100) // At most one emission per 100ms
|> actorx.subscribe(observer)
pub fn timeout(
source: types.Observable(a),
ms: Int,
) -> types.Observable(a)
Errors if no emission occurs within the specified timeout period.
The timeout resets after each emission. If the source doesn’t emit within the timeout, an error is raised.
Example
slow_source
|> timeout(5000) // Error if no emission within 5 seconds
pub fn timer(delay_ms: Int) -> types.Observable(Int)
Creates an observable that emits 0 after the specified delay, then completes.
Example
// Emit 0 after 100ms
timer(100)
|> actorx.subscribe(observer)
// Wait for async completion
process.sleep(150)