Resiliency.Map (Resiliency v0.6.0)

Copy Markdown View Source

Map over an enumerable with bounded concurrency, cancelling on first error.

When to use

  • Processing a collection with bounded parallelism and fail-fast semantics — e.g., uploading files to S3 with at most 10 concurrent streams, aborting on the first permission error.
  • Fetching data from multiple URLs in parallel with a concurrency cap.

How it works

Items are processed with a sliding window of at most :max_concurrency tasks. When a task completes, its result is stored and the next pending item is spawned. If any task crashes, all active tasks are killed and {:error, reason} is returned immediately — no further items are started. On success, {:ok, results} is returned in input order.

Algorithm Complexity

TimeSpace
O(n) total spawns where n = number of items, at most c concurrent where c = max_concurrencyO(n + c) — :array of n results + c active monitored processes

Examples

iex> Resiliency.Map.run([1, 2, 3], fn x -> x * 2 end)
{:ok, [2, 4, 6]}

iex> Resiliency.Map.run([], fn x -> x end)
{:ok, []}

With bounded concurrency:

Resiliency.Map.run(urls, &fetch_url/1, max_concurrency: 10)

On first failure, remaining work is cancelled:

{:error, reason} = Resiliency.Map.run(items, &process/1, max_concurrency: 5)

Telemetry

All events are emitted in the caller's process via :telemetry.span/3. See Resiliency.Telemetry for the complete event catalogue.

[:resiliency, :map, :run, :start]

Emitted before processing begins.

Measurements

KeyTypeDescription
system_timeintegerSystem.system_time() at emission time

Metadata

KeyTypeDescription
countintegerNumber of items to process
max_concurrencyintegerMaximum concurrent tasks

[:resiliency, :map, :run, :stop]

Emitted after all items complete or on the first failure.

Measurements

KeyTypeDescription
durationintegerElapsed native time units (System.monotonic_time/0 delta)

Metadata

KeyTypeDescription
countintegerNumber of items submitted
max_concurrencyintegerMaximum concurrent tasks

| result | :ok | :error | :ok if all items succeeded, :error on first failure |

[:resiliency, :map, :run, :exception]

Emitted if run/3 raises or exits unexpectedly.

Measurements

KeyTypeDescription
durationintegerElapsed native time units

Metadata

KeyTypeDescription
countintegerNumber of items submitted
max_concurrencyintegerMaximum concurrent tasks
kindatomException kind (:error, :exit, or :throw)
reasontermThe exception or exit reason
stacktracelistStack at the point of the exception

Summary

Functions

Map over an enumerable with bounded concurrency, cancelling on first error.

Functions

run(enumerable, fun, opts \\ [])

@spec run(Enumerable.t(), (any() -> any()), keyword()) ::
  {:ok, [any()]} | {:error, any()}

Map over an enumerable with bounded concurrency, cancelling on first error.

Like Task.async_stream/3 but returns {:ok, results} or {:error, reason} instead of a stream, and cancels all remaining work on the first failure. At most max_concurrency tasks run at once. Results are always in input order.

Returns {:ok, []} for an empty enumerable.

Parameters

  • enumerable -- any Enumerable of items to map over.
  • fun -- a one-arity function to apply to each item.
  • opts -- keyword list of options. Defaults to [].
    • :max_concurrency -- max tasks running at once. Defaults to System.schedulers_online().
    • :timeout -- milliseconds or :infinity. Defaults to :infinity.

Returns

{:ok, results} where results is a list of return values in input order, or {:error, reason} on the first task failure or timeout.

Examples

iex> Resiliency.Map.run([1, 2, 3], fn x -> x * 2 end)
{:ok, [2, 4, 6]}

iex> Resiliency.Map.run([], fn x -> x end)
{:ok, []}