LlmCore.Agent.Pipeline.ToolDispatch (llm_core v0.3.0)

Copy Markdown View Source

ALF pipeline for orchestrated tool dispatch.

When DispatchTools encounters a tool call that has a registered dispatch recipe, it invokes this pipeline instead of calling the resolver directly. The pipeline orchestrates sub-tool execution using ALF's native components:

  • Switch routes between passthrough (simple tools) and recipe dispatch
  • Composer fans out one dispatch into N parallel sub-tool calls
  • Stage + count: executes sub-tools concurrently via GenStage workers
  • Composer fans in N results into one composed output

Architecture

This pipeline generalises the pattern of bespoke dispatchers that hardcode tool orchestration as sequential function calls. ToolDispatch makes it declarative via ALF components.

Pipeline Structure

ResolveStrategy  Switch(:dispatch_route)
   :passthrough  [DirectResolve]
   :recipe  [BuildPlan, ExecuteSerial, FanOutParallel,
                 ExecuteOneCall(count:3), CollectResults, ComposeOutput]

Summary

Functions

alf_components()

call(event, opts \\ [debug: false])

@spec call(any(), Keyword.t()) :: any() | [any()] | nil
@spec call(any(), Keyword.t()) :: reference()

cast(event, opts \\ [debug: false, send_result: false])

components()

@spec components() :: [map()]

dispatch_route(event, opts)

@spec dispatch_route(
  LlmCore.Agent.ToolDispatch.Event.t(),
  keyword()
) :: :recipe | :passthrough

Switch routing function — called at runtime for each event.

Returns the branch key based on the event's strategy field, which was set by ResolveStrategy.

ensure_started(opts \\ [])

@spec ensure_started(keyword()) :: :ok | {:error, term()}

Starts the pipeline if not already running.

Options

  • :synctrue for sequential execution (tests), false for async parallel execution (production). Default: false.

Returns

  • :ok — pipeline is running
  • {:error, reason} — pipeline failed to start

flow(flow, names, opts \\ [debug: false])

@spec flow(map(), list(), Keyword.t()) :: Enumerable.t()

start()

@spec start() :: :ok

start(opts)

@spec start(list()) :: :ok

started?()

@spec started?() :: true | false

stop()

@spec stop() :: :ok | {:exit, {atom(), any()}}

stream(stream, opts \\ [debug: false])

@spec stream(Enumerable.t(), Keyword.t()) :: Enumerable.t()