View Source Flume (flume v1.0.0)
A convenient way to handle control flow in pipelines. This makes for easier reading and composability.
Link to this section Summary
Functions
Returns empty Flume struct.
Returns result of pipeline.
Executes passed in callback synchronously - and stores the returned result.
Executes passed in callback asynchronously - and stores the returned result. All asynchronous
operations are resolved when Flume.result/1 is called.
Link to this section Types
Link to this section Functions
Returns empty Flume struct.
Options:
:halt_on_errors: iffalse, the steps won't stop if aFlume.runstep returns an error:on_error: callback which is invoked every time an error occurs. If it is 1-arity, it's given the error reason, if 2-arity, it's given the tag and the reason
examples
Examples
iex> %Flume{} = Flume.new()
Returns result of pipeline.
examples
Examples
iex> Flume.new() |> Flume.run(:a, fn -> {:ok, 2} end) |> Flume.result()
{:ok, %{a: 2}}
iex> Flume.new() |> Flume.run(:a, fn -> {:error, :idk} end) |> Flume.result()
{:error, %{a: :idk}, %{}}
@spec run(t(), tag(), process_fun(), list()) :: t()
Executes passed in callback synchronously - and stores the returned result.
Callback has to be a 0- or 1-arity function, and if it accepts an argument it is passed the current accumulated results from previous steps.
It must return a {:ok, result} or a {:error, reason} tuple. This is so Flume
knows if the caller intends for the operation to be considered a success or failure.
In the first case, the result will be added to the accumulated results, and in the second case the error will be stored with other accumulated errors (if any).
A tag uniquely annotates the operation - duplicate tags will cause the second tag to overwrite the first.
Several options can be passed in:
on_success: 1 or 2 arity callback which is given the result of the operation if successful, or the tag and the result. The return value is stored in the resultson_error: 1 or 2 arity callback which is given the error reason of the operation if it failed, or the tag and the errorwait_for: by default async operations are resolved inFlume.result. If you want them resolved before so that they are accessible in earlier callbacks, specify the async operation tag here
examples
Examples
iex> Flume.new() |>
iex> Flume.run(:a, fn -> {:ok, 2} end) |>
iex> Flume.run(:b, fn data -> {:ok, 2 * data.a} end, on_success: & &1 * 100) |>
iex> Flume.run(:this_fails, fn -> {:error, :for_some_reason} end) |>
iex> Flume.run(:this_wont_run, fn -> raise "boom" end)
@spec run_async(t(), tag(), process_fun(), list()) :: t()
Executes passed in callback asynchronously - and stores the returned result. All asynchronous
operations are resolved when Flume.result/1 is called.
Apart from the asynchronous nature of this function, it behaves largely the same as Flume.run.
Obviously using this in combination with Flume.run is less safe (unless you use the wait_for option),
because it won't necessarily stop at the first error. Also the results of the asynchronous operations
will not be available until the end.
examples
Examples
iex> Flume.new() |>
iex> Flume.run(:a, fn -> {:ok, 2} end) |>
iex> Flume.run_async(:b, fn data -> {:ok, data.a * 2} end) |>
iex> Flume.run_async(:c, fn -> {:ok, 4} end, on_success: & &1 * 2) |>
iex> Flume.result()