Slither.Dispatch (Slither v0.1.0)

Copy Markdown View Source

Batched fan-out to Python pools with real backpressure.

Dispatch takes an enumerable of items, batches them via a pluggable strategy, executes each batch through an Executor (Snakepit or SnakeBridge), and returns results either preserving input order or as-completed.

Backpressure is enforced via max_in_flight — at most that many batches are executing concurrently. Upstream enumeration only advances when a worker slot becomes available.

Examples

# Run with SnakeBridge executor
items = Slither.Item.wrap_many(data)
ctx = Slither.Context.new(session_id: "my_session")

{:ok, results} = Slither.Dispatch.run(items,
  executor: Slither.Dispatch.Executors.SnakeBridge,
  module: "my_model",
  function: "predict_batch",
  pool: :gpu_pool,
  batch_size: 32,
  max_in_flight: 4
)

# Stream results lazily
stream = Slither.Dispatch.stream(items,
  executor: Slither.Dispatch.Executors.Snakepit,
  pool: :compute,
  command: "process_batch",
  batch_size: 64
)

Summary

Functions

Dispatch a single pre-assembled batch directly.

Dispatch items to an executor in batches.

Dispatch items returning a lazy stream of results.

Types

on_error()

@type on_error() ::
  :skip
  | :halt
  | {:retry, keyword()}
  | {:route, (Slither.Item.t(), term() -> Slither.Item.t())}

opts()

@type opts() :: [
  {:executor, module()}
  | {:strategy, module()}
  | {:batch_size, pos_integer()}
  | {:max_in_flight, pos_integer()}
  | {:ordering, ordering()}
  | {:on_error, on_error()}
  | {:timeout, pos_integer()}
  | {:context, Slither.Context.t()}
  | {atom(), term()}
]

ordering()

@type ordering() :: :preserve | :as_completed

Functions

call_batch(batch, opts)

@spec call_batch([Slither.Item.t()], opts()) ::
  {:ok, [Slither.Item.t()]} | {:error, term()}

Dispatch a single pre-assembled batch directly.

No batching strategy is applied. For when you already have your batch ready.

run(items, opts)

@spec run(Enumerable.t(Slither.Item.t()), opts()) ::
  {:ok, [Slither.Item.t()]} | {:error, term(), [Slither.Item.t()]}

Dispatch items to an executor in batches.

Returns {:ok, results} with results in the same order as input (when ordering: :preserve), or {:error, reason, partial_results}.

stream(items, opts)

Dispatch items returning a lazy stream of results.

Results emit as batches complete. Useful for large inputs where you don't want all results in memory at once.