actorx/interop
Actor interop for ActorX
Bridges BEAM actors (OTP Subjects) with reactive streams, enabling actor orchestration via Rx operators.
- make_subject: Create a Subject/Observable pair for bridging actors
- to_subject: Send emissions to Subject (passthrough operator)
- call_actor: Request-response pattern as Observable (cold)
Values
pub fn call_actor(
actor_subject: process.Subject(msg),
timeout_ms: Int,
make_request: fn(process.Subject(response)) -> msg,
) -> types.Observable(response)
Creates a cold Observable that performs a request-response call to an actor.
Each subscription triggers a new request. The make_request function
receives a reply Subject and should return the message to send to the actor.
Example
type CounterMsg {
Increment(Int)
GetValue(Subject(Int))
}
// Single call
actorx.call_actor(counter, 1000, GetValue)
|> actorx.subscribe(observer)
// Periodic polling
actorx.interval(1000)
|> actorx.flat_map(fn(_) {
actorx.call_actor(counter, 1000, GetValue)
})
|> actorx.subscribe(value_observer)
pub fn from_subject() -> #(
process.Subject(a),
types.Observable(a),
)
Creates a Subject and Observable pair for bridging actors.
Returns a tuple of (Subject, Observable) where:
- Values sent to the Subject are emitted to all Observable subscribers
- The Observable never completes on its own - use
take_untilfor completion
This is useful for having an actor publish events that can be consumed reactively via the Observable.
Example
let #(events_subject, events_observable) = actorx.from_subject()
// Pass subject to a producer actor
let _producer = start_event_producer(events_subject)
// Subscribe to the observable
events_observable
|> actorx.filter(is_high_priority)
|> actorx.take_until(shutdown_signal)
|> actorx.subscribe(alert_observer)
pub fn to_subject(
source: types.Observable(a),
target: process.Subject(a),
) -> types.Observable(a)
Sends each emitted value to a Subject while passing through to downstream.
Only OnNext values are sent to the Subject. OnError and OnCompleted are forwarded downstream but not sent to the Subject.
Example
actorx.interval(100)
|> actorx.take(10)
|> actorx.map(fn(n) { Increment(n) })
|> actorx.to_subject(counter_inbox)
|> actorx.subscribe(log_observer)