Map over an enumerable with bounded concurrency, cancelling on first error.
When to use
- Processing a collection with bounded parallelism and fail-fast semantics — e.g., uploading files to S3 with at most 10 concurrent streams, aborting on the first permission error.
- Fetching data from multiple URLs in parallel with a concurrency cap.
How it works
Items are processed with a sliding window of at most :max_concurrency
tasks. When a task completes, its result is stored and the next pending
item is spawned. If any task crashes, all active tasks are killed and
{:error, reason} is returned immediately — no further items are started.
On success, {:ok, results} is returned in input order.
Algorithm Complexity
| Time | Space |
|---|---|
O(n) total spawns where n = number of items, at most c concurrent where c = max_concurrency | O(n + c) — :array of n results + c active monitored processes |
Examples
iex> Resiliency.Map.run([1, 2, 3], fn x -> x * 2 end)
{:ok, [2, 4, 6]}
iex> Resiliency.Map.run([], fn x -> x end)
{:ok, []}With bounded concurrency:
Resiliency.Map.run(urls, &fetch_url/1, max_concurrency: 10)On first failure, remaining work is cancelled:
{:error, reason} = Resiliency.Map.run(items, &process/1, max_concurrency: 5)Telemetry
All events are emitted in the caller's process via :telemetry.span/3. See
Resiliency.Telemetry for the complete event catalogue.
[:resiliency, :map, :run, :start]
Emitted before processing begins.
Measurements
| Key | Type | Description |
|---|---|---|
system_time | integer | System.system_time() at emission time |
Metadata
| Key | Type | Description |
|---|---|---|
count | integer | Number of items to process |
max_concurrency | integer | Maximum concurrent tasks |
[:resiliency, :map, :run, :stop]
Emitted after all items complete or on the first failure.
Measurements
| Key | Type | Description |
|---|---|---|
duration | integer | Elapsed native time units (System.monotonic_time/0 delta) |
Metadata
| Key | Type | Description |
|---|---|---|
count | integer | Number of items submitted |
max_concurrency | integer | Maximum concurrent tasks |
| result | :ok | :error | :ok if all items succeeded, :error on first failure |
[:resiliency, :map, :run, :exception]
Emitted if run/3 raises or exits unexpectedly.
Measurements
| Key | Type | Description |
|---|---|---|
duration | integer | Elapsed native time units |
Metadata
| Key | Type | Description |
|---|---|---|
count | integer | Number of items submitted |
max_concurrency | integer | Maximum concurrent tasks |
kind | atom | Exception kind (:error, :exit, or :throw) |
reason | term | The exception or exit reason |
stacktrace | list | Stack at the point of the exception |
Summary
Functions
Map over an enumerable with bounded concurrency, cancelling on first error.
Functions
Map over an enumerable with bounded concurrency, cancelling on first error.
Like Task.async_stream/3 but returns {:ok, results} or {:error, reason}
instead of a stream, and cancels all remaining work on the first failure.
At most max_concurrency tasks run at once. Results are always in input order.
Returns {:ok, []} for an empty enumerable.
Parameters
enumerable-- anyEnumerableof items to map over.fun-- a one-arity function to apply to each item.opts-- keyword list of options. Defaults to[].:max_concurrency-- max tasks running at once. Defaults toSystem.schedulers_online().:timeout-- milliseconds or:infinity. Defaults to:infinity.
Returns
{:ok, results} where results is a list of return values in input order, or {:error, reason} on the first task failure or timeout.
Examples
iex> Resiliency.Map.run([1, 2, 3], fn x -> x * 2 end)
{:ok, [2, 4, 6]}
iex> Resiliency.Map.run([], fn x -> x end)
{:ok, []}