Multitask v0.1.0 Multitask View Source

Defines a multitask.

Multitasks are processes meant to execute several functions asynchronously, and collect their return values when they are done processing or the error of the first failure.

iex> multitask = Multitask.async([
...>   fn -> {:ok, "a"} end,
...>   fn -> {:ok, "b"} end
...> ])
iex> Multitask.await(multitask)
{:ok, ["a", "b"]}

As shown in the example above, multitasks spawned with async can be awaited on by their caller process (and only their caller). They are implemented by spawning a process that sends a message to the caller once the given computation is performed. Internally, the multitask process spawns a new process per function (linked and monitored by the multitask process) and awaits for replies.

Functions must return {:ok, result} or {:error, error} tuples to express success or failure. Multitasks implement a fail-fast behavior, reporting any failure or exit immediately and shutting down all the processes.

iex> multitask = Multitask.async([
...>   fn -> {:ok, "a"} end,
...>   fn -> {:ok, "b"} end,
...>   fn -> {:error, "c"} end
...> ])
iex> Multitask.await(multitask)
{:error, "c"}

Multitasks can also be started as part of a supervision tree and dynamically spawned on remote nodes. We will explore all three scenarios next.

Supervised multitasks

It is also possible to spawn a multitask under a supervisor. It is often done by defining the multitask in its own module:

defmodule MyMultitask do
  use Multitask

  def start_link(arg) do
    Multitask.start_link([
      {__MODULE__, :do_something, [arg]},
      {__MODULE__, :do_something_else, [arg]},
    ])
  end

  def do_something(arg) do
    # ...
  end

  def do_something_else(arg) do
    # ...
  end
end

And then passing it to the supervisor:

Supervisor.start_link([MyMultitask])

Since these multitasks are supervised and not directly linked to the caller, they cannot be awaited on. Note start_link/1, unlike async/1, returns {:ok, pid} (which is the result expected by supervisors).

Note use Multitask defines a child_spec/1 function, allowing the defined module to be put under a supervision tree. The generated child_spec/1 can be customized with the following options:

  • :id - the child specification id, defaults to the current module
  • :start - how to start the child process (defaults to calling __MODULE__.start_link/1)
  • :restart - when the child should be restarted, defaults to :temporary
  • :shutdown - how to shut down the child

Opposite to GenServer, Agent and Supervisor, a Multitask has a default :restart of :temporary. This means the multitask will not be restarted even if it crashes. If you desire the multitask to be restarted for non-successful exists, do:

use Multitask, restart: :transient

If you want the multitask to always be restarted:

use Multitask, restart: :permanent

See the Supervisor docs for more information.

Dynamically supervised multitasks

The Multitask.Supervisor module allows developers to dynamically create multiple supervised multitasks.

A short example is:

{:ok, pid} = Multitask.Supervisor.start_link()
multitask = Multitask.Supervisor.async(pid, [
  fn ->
    # Do something
  end,
  fn ->
    # Do something else
  end
])
Multitask.await(multitask)

However, in the majority of cases, you want to add the multitask supervisor to your supervision tree:

Supervisor.start_link([
  {Multitask.Supervisor, name: MyApp.MultitaskSupervisor}
])

Now you can dynamically start supervised multitasks:

Multitask.Supervisor.start_child(MyApp.TaskSupervisor, [
  fn ->
    # Do something
  end,
  fn ->
    # Do something else
  end
])

Or even use the async/await pattern:

Multitask.Supervisor.async(MyApp.MultitaskSupervisor, [
  fn ->
    # Do something
  end,
  fn ->
    # Do something else
  end
]) |> Multitask.await()

Finally, check Multitask.Supervisor for other supported operations.

Distributed multitasks

Since Multitask provides a supervisor, it is easy to use one to dynamically spawn multitasks across nodes:

# On the remote node
Multitask.Supervisor.start_link(name: MyApp.DistSupervisor)

# On the client
Multitask.Supervisor.async({MyApp.DistSupervisor, :remote@local},
                           MyMod, :my_fun, [arg1, arg2, arg3])

Note that, when working with distributed multitasks, one should express the list of functions using explicit module, function and arguments, instead of anonymous functions. That’s because anonymous functions expect the same module version to exist on all involved nodes. Check the Agent module documentation for more information on distributed processes as the limitations described there apply to the whole ecosystem.

Link to this section Summary

Functions

The Multitask struct

Starts a multitask that must be awaited on

Awaits a multitask reply and returns it

Unlinks and shuts down the multitask, and then checks for a reply

Starts a multitask

Starts a process linked to the current process

Temporarily blocks the current process waiting for a multitask reply

Link to this section Types

Link to this type t() View Source
t() :: %Multitask{owner: term, pid: term, ref: term}

Link to this section Functions

The Multitask struct.

It contains these fields:

  • :pid - the PID of the multitask process; nil if the multitask does not use a multitask process

  • :ref - the multitask monitor reference

  • :owner - the PID of the process that started the multitask

Link to this function async(functions) View Source
async([function | mfa]) :: t

Starts a multitask that must be awaited on.

A Multitask struct is returned containing the relevant information. Developers must eventually call Multitask.await/2 or Multitask.yield/2 followed by Multitask.shutdown/2 on the returned multitask.

Linking

As with Task.async/1 or Task.async/3, this function spawns a process that is linked to and monitored by the caller process. The linking part is important because it aborts the multitask if the parent process dies. It also guarantees the code before async/await has the same properties after you add the async call.

If you don’t want to link the caller to the multitask, then you must use a supervised multitask with Multitask.Supervisor and call Multitask.Supervisor.async_nolink/2.

In any case, avoid any of the following:

  • Setting :trap_exit to true - trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the multitask but from any other processes.

    Moreover, even when trapping exits, calling await will still exit if the multitask has terminated without sending its results back.

  • Unlinking the multitask process started with async/await. If you unlink the processes and the multitask does not belong to any supervisor, you may leave dangling multitasks in case the parent dies.

Message format

The reply sent by the multitask will be in the format {ref, result}, where ref is the monitor reference held by the multitask struct and result is the return value of the multitask function ({:ok, results} or {:error, error}).

Link to this function await(multitask, timeout \\ 5000) View Source
await(t, timeout) ::
  {:ok, term} |
  {:error, term} |
  no_return

Awaits a multitask reply and returns it.

Returns {:ok, results} in case of success, or {:error, error} otherwise.

A timeout, in milliseconds, can be given with default value of 5000. In case the multitask process dies, this function will exit with the same reason as the multitask.

If the timeout is exceeded, await will exit; however, the multitask will continue to run. When the calling process exits, its exit signal will terminate the multitask if it is not trapping exits.

This function assumes the multitask’s monitor is still active or the monitor’s :DOWN message is in the message queue. If it has been demonitored, or the message already received, this function will wait for the duration of the timeout awaiting the message.

This function can only be called once for any given multitask. If you want to be able to check multiple times if a long-running multitask has finished its computation, use yield/2 instead.

Compatibility with OTP behaviours

It is not recommended to await a long-running multitask inside an OTP behaviour such as GenServer. Instead, you should match on the message coming from a multitask inside your GenServer.handle_info/2 callback.

Examples

iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:ok, 2 + 2} end])
iex> Multitask.await(multitask)
{:ok, [2, 4]}
Link to this function shutdown(multitask, shutdown \\ 5000) View Source
shutdown(t, timeout | :brutal_kill) ::
  {:ok, term} |
  {:error, term} |
  {:exit, term} |
  nil

Unlinks and shuts down the multitask, and then checks for a reply.

Returns {:ok, results} or {:error, error} if the reply is received while shutting down the multitask, {:exit, reason} if the multitask died, otherwise nil.

The shutdown method is either a timeout or :brutal_kill. In case of a timeout, a :shutdown exit signal is sent to the multitask process and if it does not exit within the timeout, it is killed. With :brutal_kill the multitask is killed straight away. In case the multitask terminates abnormally (possibly killed by another process), this function will exit with the same reason.

It is not required to call this function when terminating the caller, unless exiting with reason :normal or if the multitask is trapping exits. If the caller is exiting with a reason other than :normal and the multitask is not trapping exits, the caller’s exit signal will stop the multitask. The caller can exit with reason :shutdown to shutdown all of its linked processes, including multitasks, that are not trapping exits without generating any log messages.

If a multitask’s monitor has already been demonitored or received and there is not a response waiting in the message queue this function will return {:exit, :noproc} as the result or exit reason can not be determined.

Link to this function start(functions) View Source
start([function | mfa]) :: {:ok, pid}

Starts a multitask.

This is only used when the multitask is used for side-effects (i.e. no interest in the returned result) and it should not be linked to the current process.

Link to this function start_link(functions) View Source
start_link([function | mfa]) :: {:ok, pid}

Starts a process linked to the current process.

This is often used to start the process as part of a supervision tree.

Link to this function yield(multitask, timeout \\ 5000) View Source
yield(t, timeout) ::
  {:ok, term} |
  {:error, term} |
  {:exit, term} |
  nil

Temporarily blocks the current process waiting for a multitask reply.

Returns {:ok, results} or {:error, error} if the reply is received, nil if no reply has arrived, or {:exit, reason} if the multitask has already exited. Keep in mind that normally a multitask failure also causes the process owning the multitask to exit. Therefore this function can return {:exit, reason} only if

  • the multitask process exited with the reason :normal
  • it isn’t linked to the caller
  • the caller is trapping exits

A timeout, in milliseconds, can be given with default value of 5000. If the time runs out before a message from the multitask is received, this function will return nil and the monitor will remain active. Therefore yield/2 can be called multiple times on the same multitask.

This function assumes the multitask’s monitor is still active or the monitor’s :DOWN message is in the message queue. If it has been demonitored or the message already received, this function will wait for the duration of the timeout awaiting the message.

If you intend to shut the multitask down if it has not responded within timeout milliseconds, you should chain this together with shutdown/1, like so:

case Multitask.yield(multitask, timeout) || Multitask.shutdown(multitask) do
  {:ok, results} ->
    results
  {:error, error} ->
    Logger.error "Multitask failed with #{inspect error}"
    error
  nil ->
    Logger.warn "Failed to get the results in #{timeout}ms"
    nil
end

That ensures that if the multitask completes after the timeout but before shutdown/1 has been called, you will still get the result, since shutdown/1 is designed to handle this case and return the result.

Examples

iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:ok, 2 + 2} end])
iex> Multitask.yield(multitask)
{:ok, [2, 4]}

iex> multitask = Multitask.async([fn -> {:ok, 1 + 1} end, fn -> {:error, :invalid} end])
iex> Multitask.yield(multitask)
{:error, :invalid}