Batched fan-out to Python pools with real backpressure.
Dispatch takes an enumerable of items, batches them via a pluggable strategy, executes each batch through an Executor (Snakepit or SnakeBridge), and returns results either preserving input order or as-completed.
Backpressure is enforced via max_in_flight — at most that many
batches are executing concurrently. Upstream enumeration only
advances when a worker slot becomes available.
Examples
# Run with SnakeBridge executor
items = Slither.Item.wrap_many(data)
ctx = Slither.Context.new(session_id: "my_session")
{:ok, results} = Slither.Dispatch.run(items,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :gpu_pool,
batch_size: 32,
max_in_flight: 4
)
# Stream results lazily
stream = Slither.Dispatch.stream(items,
executor: Slither.Dispatch.Executors.Snakepit,
pool: :compute,
command: "process_batch",
batch_size: 64
)
Summary
Functions
Dispatch a single pre-assembled batch directly.
Dispatch items to an executor in batches.
Dispatch items returning a lazy stream of results.
Types
@type on_error() :: :skip | :halt | {:retry, keyword()} | {:route, (Slither.Item.t(), term() -> Slither.Item.t())}
@type opts() :: [ {:executor, module()} | {:strategy, module()} | {:batch_size, pos_integer()} | {:max_in_flight, pos_integer()} | {:ordering, ordering()} | {:on_error, on_error()} | {:timeout, pos_integer()} | {:context, Slither.Context.t()} | {atom(), term()} ]
@type ordering() :: :preserve | :as_completed
Functions
@spec call_batch([Slither.Item.t()], opts()) :: {:ok, [Slither.Item.t()]} | {:error, term()}
Dispatch a single pre-assembled batch directly.
No batching strategy is applied. For when you already have your batch ready.
@spec run(Enumerable.t(Slither.Item.t()), opts()) :: {:ok, [Slither.Item.t()]} | {:error, term(), [Slither.Item.t()]}
Dispatch items to an executor in batches.
Returns {:ok, results} with results in the same order as input
(when ordering: :preserve), or {:error, reason, partial_results}.
@spec stream(Enumerable.t(Slither.Item.t()), opts()) :: Enumerable.t(Slither.Item.t())
Dispatch items returning a lazy stream of results.
Results emit as batches complete. Useful for large inputs where you don't want all results in memory at once.