actorx/interop

Actor interop for ActorX

Bridges BEAM actors (OTP Subjects) with reactive streams, enabling actor orchestration via Rx operators.

Values

pub fn call_actor(
  actor_subject: process.Subject(msg),
  timeout_ms: Int,
  make_request: fn(process.Subject(response)) -> msg,
) -> types.Observable(response)

Creates a cold Observable that performs a request-response call to an actor.

Each subscription triggers a new request. The make_request function receives a reply Subject and should return the message to send to the actor.

Example

type CounterMsg {
  Increment(Int)
  GetValue(Subject(Int))
}

// Single call
actorx.call_actor(counter, 1000, GetValue)
|> actorx.subscribe(observer)

// Periodic polling
actorx.interval(1000)
|> actorx.flat_map(fn(_) {
  actorx.call_actor(counter, 1000, GetValue)
})
|> actorx.subscribe(value_observer)
pub fn from_subject() -> #(
  process.Subject(a),
  types.Observable(a),
)

Creates a Subject and Observable pair for bridging actors.

Returns a tuple of (Subject, Observable) where:

  • Values sent to the Subject are emitted to all Observable subscribers
  • The Observable never completes on its own - use take_until for completion

This is useful for having an actor publish events that can be consumed reactively via the Observable.

Example

let #(events_subject, events_observable) = actorx.from_subject()

// Pass subject to a producer actor
let _producer = start_event_producer(events_subject)

// Subscribe to the observable
events_observable
|> actorx.filter(is_high_priority)
|> actorx.take_until(shutdown_signal)
|> actorx.subscribe(alert_observer)
pub fn to_subject(
  source: types.Observable(a),
  target: process.Subject(a),
) -> types.Observable(a)

Sends each emitted value to a Subject while passing through to downstream.

Only OnNext values are sent to the Subject. OnError and OnCompleted are forwarded downstream but not sent to the Subject.

Example

actorx.interval(100)
|> actorx.take(10)
|> actorx.map(fn(n) { Increment(n) })
|> actorx.to_subject(counter_inbox)
|> actorx.subscribe(log_observer)
Search Document