View Source Relay Plugin
🌟 This plugin is available through Oban.Pro
The Relay
plugin lets you insert and await the results of jobs locally or
remotely, across any number of nodes, i.e. persistent distributed tasks. Once
the plugin is running, you can seamlessly distribute jobs and await the results
synchronously.
Installation
The Relay
plugin must be started to coordinate events and messages for
async/await functions.
Start by adding it to your plugins:
plugins: [Oban.Pro.Plugins.Relay]
There isn't any extra configuration necessary (or possible).
Usage
Typespecs
📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.
@type t :: %{job: Job.t(), pid: pid(), ref: UUID.t()}
@type await_result ::
:ok
| {:ok, term()}
| {:error, :discarded}
| {:error, :result_too_large}
| {:error, :snoozed}
| {:error, :timeout}
| {:error, Exception.t()}
Async
@spec async(Job.changeset()) :: t() | {:error, Job.changeset()}
@spec async(Oban.name(), Job.changeset()) :: t() | {:error, Job.changeset()}
Use Relay.async/1,2
to insert a job for asynchronous execution. The single
arity version takes a job changeset and inserts it:
relay =
%{id: 123}
|> MyApp.Worker.new()
|> Relay.async()
When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:
changeset = MyApp.Worker.new(%{id: 123})
relay = Relay.async(MyOban, changeset)
The returned map contains the caller's pid and a unique ref that is used to await the results.
Await
@spec await(relay :: t()) :: await_result()
@spec await(relay :: t(), timeout()) :: await_result()
After inserting a job and constructing a relay, use Relay.await/1,2
to await
the job's execution and return the result:
{:ok, result} =
%{:ok, 4}
|> MyApp.Worker.new()
|> Relay.async()
|> Relay.await()
By default, await will timeout after 5 seconds and return an {:error, :timeout}
tuple. The job itself may continue to run, only the local process
stops waiting on it. The second argument to await/2
is a timeout value:
Relay.await(relay, :timer.seconds(30))
When the executed job fails, crashes, snoozes or is discarded the result comes
back as an error tuple with :discarded
, :snoozed
, or the exception. Note
that results are encoded using term_to_binary/2
with compression and then
decoded locally using binary_to_term/2
with safety enabled to prevent leaking
atoms.
Any value smaller than 8kb when compressed is can be returned. Values larger
than 8kb will return {:error, :result_too_large}
tuple.
Await Many
@spec await_many(relays :: [t()]) :: [await_result() | nil]
@spec await_many(relays :: [t()], timeout()) :: [await_result() | nil]
Use Relay.await_many/1,2
to await replies from multiple relays and return the
results. It returns a list of the results in the same order as the relays
supplied as the first argument.
relayed =
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async(&1))
|> Relay.await_many()
# [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Unlike Task.await_many
or Task.yield_many
, Relay.await_many
may return
partial results when the timeout is reached. When a job hasn't finished
executing the value will be a timeout error tuple.
relayed =
[1, 2, 300_000_000]
|> Enum.map(&SlowWorker.new(%{int: &1}))
|> Enum.map(&Relay.async(&1))
|> Relay.await_many(100)
# [{:ok, 2}, {:ok, 4}, {:error, :timeout}]
Implementation Notes
Some additional notes and requirements to be aware of:
Relay
uses PubSub for to transmit results. That means it will work without Erlang distribution or clustering, but it does require functional PostgreSQL based PubSub.It doesn't matter where a job is executed, the result will still be broadcast back provided that the Oban instance which processed it is running the
Relay
plugin as well.Results are encoded using
:erlang.term_to_binary/2
and decoded using:erlang.binary_to_term/2
using the:safe
option to prevent the creation of new atoms or function references. If you're returning results with atoms you must be sure that those atoms are defined locally, where theawait/2
orawait_many/2
function is called.