actorx/subject

Subject types for ActorX

Subjects are both Observers and Observables - they can receive notifications and forward them to subscribers.

Values

pub fn publish(
  source: types.Observable(a),
) -> #(types.Observable(a), fn() -> types.Disposable)

Converts a cold observable into a connectable hot observable.

Returns a tuple of (Observable, connect_fn) where:

  • The Observable can be subscribed to by multiple observers
  • The connect function starts the source subscription

Values are only emitted after connect() is called. Multiple subscribers share the same source subscription.

Example

let #(hot, connect) = publish(cold_source)

// Subscribe multiple observers (source not started yet)
let _d1 = hot |> actorx.subscribe(observer1)
let _d2 = hot |> actorx.subscribe(observer2)

// Now connect - source starts, both observers receive values
let connection = connect()

// Disconnect when done
let Disposable(dispose) = connection
dispose()
pub fn share(source: types.Observable(a)) -> types.Observable(a)

Shares a single subscription to the source among multiple subscribers.

Automatically connects to the source when the first subscriber subscribes, and disconnects when the last subscriber unsubscribes.

This is equivalent to publish(source) with automatic reference counting.

Example

let shared =
  interval(100)
  |> share()

// First subscriber - source starts
let d1 = shared |> actorx.subscribe(observer1)

// Second subscriber - shares same source
let d2 = shared |> actorx.subscribe(observer2)

// Unsubscribe first
let Disposable(dispose1) = d1
dispose1()

// Unsubscribe last - source stops
let Disposable(dispose2) = d2
dispose2()
pub fn single_subject() -> #(
  types.Observer(a),
  types.Observable(a),
)

Creates a single-subscriber subject.

Returns a tuple of (Observer, Observable) where:

  • The Observer side is used to push notifications
  • The Observable side can be subscribed to (once only!)

Notifications sent before subscription are buffered and delivered when a subscriber connects.

Example

let #(observer, observable) = single_subject()

// Push values through the observer side
types.on_next(observer, 1)
types.on_next(observer, 2)

// Subscribe to receive them
observable |> actorx.subscribe(my_observer)

// More values flow through
types.on_next(observer, 3)
types.on_completed(observer)
pub fn subject() -> #(types.Observer(a), types.Observable(a))

Creates a multicast subject that allows multiple subscribers.

Returns a tuple of (Observer, Observable) where:

  • The Observer side is used to push notifications
  • The Observable side can be subscribed to by multiple observers

Unlike single_subject, notifications are NOT buffered - they are only delivered to currently subscribed observers.

Example

let #(input, output) = subject()

// Multiple subscribers
let _disp1 = output |> actorx.subscribe(observer1)
let _disp2 = output |> actorx.subscribe(observer2)

// Both receive this value
actorx.on_next(input, 42)
Search Document