Multitask v0.1.0 Multitask View Source
Defines a multitask.
Multitasks are processes meant to execute several functions asynchronously, and collect their return values when they are done processing or the error of the first failure.
iex> multitask = Multitask.async([
...> fn -> {:ok, "a"} end,
...> fn -> {:ok, "b"} end
...> ])
iex> Multitask.await(multitask)
{:ok, ["a", "b"]}
As shown in the example above, multitasks spawned with async
can be awaited on by their
caller process (and only their caller). They are implemented by spawning a process that sends
a message to the caller once the given computation is performed. Internally, the multitask
process spawns a new process per function (linked and monitored by the multitask process) and
awaits for replies.
Functions must return {:ok, result}
or {:error, error}
tuples to express success or
failure. Multitasks implement a fail-fast behavior, reporting any failure or exit immediately
and shutting down all the processes.
iex> multitask = Multitask.async([
...> fn -> {:ok, "a"} end,
...> fn -> {:ok, "b"} end,
...> fn -> {:error, "c"} end
...> ])
iex> Multitask.await(multitask)
{:error, "c"}
Multitasks can also be started as part of a supervision tree and dynamically spawned on remote nodes. We will explore all three scenarios next.
Supervised multitasks
It is also possible to spawn a multitask under a supervisor. It is often done by defining the multitask in its own module:
defmodule MyMultitask do
use Multitask
def start_link(arg) do
Multitask.start_link([
{__MODULE__, :do_something, [arg]},
{__MODULE__, :do_something_else, [arg]},
])
end
def do_something(arg) do
# ...
end
def do_something_else(arg) do
# ...
end
end
And then passing it to the supervisor:
Supervisor.start_link([MyMultitask])
Since these multitasks are supervised and not directly linked to the caller, they cannot be
awaited on. Note start_link/1
, unlike async/1
, returns {:ok, pid}
(which is the result
expected by supervisors).
Note use Multitask
defines a child_spec/1
function, allowing the defined module to be put
under a supervision tree. The generated child_spec/1
can be customized with the following
options:
:id
- the child specification id, defaults to the current module:start
- how to start the child process (defaults to calling__MODULE__.start_link/1
):restart
- when the child should be restarted, defaults to:temporary
:shutdown
- how to shut down the child
Opposite to GenServer
, Agent
and Supervisor
, a Multitask
has a default :restart
of
:temporary
. This means the multitask will not be restarted even if it crashes. If you desire
the multitask to be restarted for non-successful exists, do:
use Multitask, restart: :transient
If you want the multitask to always be restarted:
use Multitask, restart: :permanent
See the Supervisor
docs for more information.
Dynamically supervised multitasks
The Multitask.Supervisor
module allows developers to dynamically create multiple supervised
multitasks.
A short example is:
{:ok, pid} = Multitask.Supervisor.start_link()
multitask = Multitask.Supervisor.async(pid, [
fn ->
# Do something
end,
fn ->
# Do something else
end
])
Multitask.await(multitask)
However, in the majority of cases, you want to add the multitask supervisor to your supervision tree:
Supervisor.start_link([
{Multitask.Supervisor, name: MyApp.MultitaskSupervisor}
])
Now you can dynamically start supervised multitasks:
Multitask.Supervisor.start_child(MyApp.TaskSupervisor, [
fn ->
# Do something
end,
fn ->
# Do something else
end
])
Or even use the async/await pattern:
Multitask.Supervisor.async(MyApp.MultitaskSupervisor, [
fn ->
# Do something
end,
fn ->
# Do something else
end
]) |> Multitask.await()
Finally, check Multitask.Supervisor
for other supported operations.
Distributed multitasks
Since Multitask
provides a supervisor, it is easy to use one to dynamically spawn
multitasks across nodes:
# On the remote node
Multitask.Supervisor.start_link(name: MyApp.DistSupervisor)
# On the client
Multitask.Supervisor.async({MyApp.DistSupervisor, :remote@local},
MyMod, :my_fun, [arg1, arg2, arg3])
Note that, when working with distributed multitasks, one should express the list of functions
using explicit module, function and arguments, instead of anonymous functions. That’s because
anonymous functions expect the same module version to exist on all involved nodes. Check the
Agent
module documentation for more information on distributed processes as the limitations
described there apply to the whole ecosystem.
Link to this section Summary
Functions
The Multitask struct
Starts a multitask that must be awaited on
Awaits a multitask reply and returns it
Unlinks and shuts down the multitask, and then checks for a reply
Starts a multitask
Starts a process linked to the current process
Temporarily blocks the current process waiting for a multitask reply
Link to this section Types
Link to this section Functions
The Multitask struct.
It contains these fields:
:pid
- the PID of the multitask process;nil
if the multitask does not use a multitask process:ref
- the multitask monitor reference:owner
- the PID of the process that started the multitask
Starts a multitask that must be awaited on.
A Multitask
struct is returned containing the relevant information. Developers must
eventually call Multitask.await/2
or Multitask.yield/2
followed by Multitask.shutdown/2
on the returned multitask.
Linking
As with Task.async/1
or Task.async/3
, this function spawns a process that is linked to
and monitored by the caller process. The linking part is important because it aborts the
multitask if the parent process dies. It also guarantees the code before async/await has the
same properties after you add the async call.
If you don’t want to link the caller to the multitask, then you must use a supervised
multitask with Multitask.Supervisor
and call Multitask.Supervisor.async_nolink/2
.
In any case, avoid any of the following:
Setting
:trap_exit
totrue
- trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the multitask but from any other processes.Moreover, even when trapping exits, calling
await
will still exit if the multitask has terminated without sending its results back.Unlinking the multitask process started with
async
/await
. If you unlink the processes and the multitask does not belong to any supervisor, you may leave dangling multitasks in case the parent dies.
Message format
The reply sent by the multitask will be in the format {ref, result}
, where ref
is the
monitor reference held by the multitask struct and result
is the return value of the
multitask function ({:ok, results}
or {:error, error}
).
await(t, timeout) :: {:ok, term} | {:error, term} | no_return
Awaits a multitask reply and returns it.
Returns {:ok, results}
in case of success, or {:error, error}
otherwise.
A timeout, in milliseconds, can be given with default value of 5000
. In case the multitask
process dies, this function will exit with the same reason as the multitask.
If the timeout is exceeded, await
will exit; however, the multitask will continue to run.
When the calling process exits, its exit signal will terminate the multitask if it is not
trapping exits.
This function assumes the multitask’s monitor is still active or the monitor’s :DOWN
message is in the message queue. If it has been demonitored, or the message already
received, this function will wait for the duration of the timeout awaiting the message.
This function can only be called once for any given multitask. If you want to be able to
check multiple times if a long-running multitask has finished its computation, use yield/2
instead.
Compatibility with OTP behaviours
It is not recommended to await
a long-running multitask inside an OTP behaviour such as
GenServer
. Instead, you should match on the message coming from a multitask inside your
GenServer.handle_info/2
callback.
Examples
iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:ok, 2 + 2} end])
iex> Multitask.await(multitask)
{:ok, [2, 4]}
shutdown(t, timeout | :brutal_kill) :: {:ok, term} | {:error, term} | {:exit, term} | nil
Unlinks and shuts down the multitask, and then checks for a reply.
Returns {:ok, results}
or {:error, error}
if the reply is received while shutting down
the multitask, {:exit, reason}
if the multitask died, otherwise nil
.
The shutdown method is either a timeout or :brutal_kill
. In case of a timeout
, a
:shutdown
exit signal is sent to the multitask process and if it does not exit within the
timeout, it is killed. With :brutal_kill
the multitask is killed straight away. In case the
multitask terminates abnormally (possibly killed by another process), this function will exit
with the same reason.
It is not required to call this function when terminating the caller, unless exiting with
reason :normal
or if the multitask is trapping exits. If the caller is exiting with a
reason other than :normal
and the multitask is not trapping exits, the caller’s exit signal
will stop the multitask. The caller can exit with reason :shutdown
to shutdown all of its
linked processes, including multitasks, that are not trapping exits without generating any
log messages.
If a multitask’s monitor has already been demonitored or received and there is not a
response waiting in the message queue this function will return {:exit, :noproc}
as the
result or exit reason can not be determined.
Starts a multitask.
This is only used when the multitask is used for side-effects (i.e. no interest in the returned result) and it should not be linked to the current process.
Starts a process linked to the current process.
This is often used to start the process as part of a supervision tree.
yield(t, timeout) :: {:ok, term} | {:error, term} | {:exit, term} | nil
Temporarily blocks the current process waiting for a multitask reply.
Returns {:ok, results}
or {:error, error}
if the reply is received, nil
if no reply has
arrived, or {:exit, reason}
if the multitask has already exited. Keep in mind that normally
a multitask failure also causes the process owning the multitask to exit. Therefore this
function can return {:exit, reason}
only if
- the multitask process exited with the reason
:normal
- it isn’t linked to the caller
- the caller is trapping exits
A timeout, in milliseconds, can be given with default value of 5000
. If the time runs out
before a message from the multitask is received, this function will return nil
and the
monitor will remain active. Therefore yield/2
can be called multiple times on the same
multitask.
This function assumes the multitask’s monitor is still active or the monitor’s :DOWN
message is in the message queue. If it has been demonitored or the message already received,
this function will wait for the duration of the timeout awaiting the message.
If you intend to shut the multitask down if it has not responded within timeout
milliseconds, you should chain this together with shutdown/1
, like so:
case Multitask.yield(multitask, timeout) || Multitask.shutdown(multitask) do
{:ok, results} ->
results
{:error, error} ->
Logger.error "Multitask failed with #{inspect error}"
error
nil ->
Logger.warn "Failed to get the results in #{timeout}ms"
nil
end
That ensures that if the multitask completes after the timeout
but before shutdown/1
has been called, you will still get the result, since shutdown/1
is designed to
handle this case and return the result.
Examples
iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:ok, 2 + 2} end])
iex> Multitask.yield(multitask)
{:ok, [2, 4]}
iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:error, :invalid} end])
iex> Multitask.yield(multitask)
{:error, :invalid}