View Source Flume (flume v1.0.0)

A convenient way to handle control flow in pipelines. This makes for easier reading and composability.

Link to this section Summary

Functions

Returns empty Flume struct.

Returns result of pipeline.

Executes passed in callback synchronously - and stores the returned result.

Executes passed in callback asynchronously - and stores the returned result. All asynchronous operations are resolved when Flume.result/1 is called.

Link to this section Types

@type process_fun() :: (map() -> {:ok, tag()} | {:error, atom()})
@type t() :: %Flume{
  errors: term(),
  global_funs: term(),
  halt_on_errors: term(),
  halted: term(),
  results: term(),
  tasks: term()
}
@type tag() :: atom()

Link to this section Functions

@spec new(list()) :: t()

Returns empty Flume struct.

Options:

  • :halt_on_errors: if false, the steps won't stop if a Flume.run step returns an error
  • :on_error: callback which is invoked every time an error occurs. If it is 1-arity, it's given the error reason, if 2-arity, it's given the tag and the reason

examples

Examples

iex> %Flume{} = Flume.new()
@spec result(t()) :: {:ok, map()} | {:error, map(), map()}

Returns result of pipeline.

examples

Examples

iex> Flume.new() |> Flume.run(:a, fn -> {:ok, 2} end) |> Flume.result()
{:ok, %{a: 2}}

iex> Flume.new() |> Flume.run(:a, fn -> {:error, :idk} end) |> Flume.result()
{:error, %{a: :idk}, %{}}
Link to this function

run(flume, tag, process_fun, opts \\ [])

View Source
@spec run(t(), tag(), process_fun(), list()) :: t()

Executes passed in callback synchronously - and stores the returned result.

Callback has to be a 0- or 1-arity function, and if it accepts an argument it is passed the current accumulated results from previous steps.

It must return a {:ok, result} or a {:error, reason} tuple. This is so Flume knows if the caller intends for the operation to be considered a success or failure.

In the first case, the result will be added to the accumulated results, and in the second case the error will be stored with other accumulated errors (if any).

A tag uniquely annotates the operation - duplicate tags will cause the second tag to overwrite the first.

Several options can be passed in:

  • on_success: 1 or 2 arity callback which is given the result of the operation if successful, or the tag and the result. The return value is stored in the results
  • on_error: 1 or 2 arity callback which is given the error reason of the operation if it failed, or the tag and the error
  • wait_for: by default async operations are resolved in Flume.result. If you want them resolved before so that they are accessible in earlier callbacks, specify the async operation tag here

examples

Examples

iex> Flume.new() |>
iex> Flume.run(:a, fn -> {:ok, 2} end) |>
iex> Flume.run(:b, fn data -> {:ok, 2 * data.a} end, on_success: & &1 * 100) |>
iex> Flume.run(:this_fails, fn -> {:error, :for_some_reason} end) |>
iex> Flume.run(:this_wont_run, fn -> raise "boom" end)
Link to this function

run_async(flume, tag, process_fun, opts \\ [])

View Source
@spec run_async(t(), tag(), process_fun(), list()) :: t()

Executes passed in callback asynchronously - and stores the returned result. All asynchronous operations are resolved when Flume.result/1 is called.

Apart from the asynchronous nature of this function, it behaves largely the same as Flume.run.

Obviously using this in combination with Flume.run is less safe (unless you use the wait_for option), because it won't necessarily stop at the first error. Also the results of the asynchronous operations will not be available until the end.

examples

Examples

iex> Flume.new() |>
iex> Flume.run(:a, fn -> {:ok, 2} end) |>
iex> Flume.run_async(:b, fn data -> {:ok, data.a * 2} end) |>
iex> Flume.run_async(:c, fn -> {:ok, 4} end, on_success: & &1 * 2) |>
iex> Flume.result()