PtcRunner.PlanExecutor (PtcRunner v0.7.0)

Copy Markdown View Source

Execute plans with automatic replanning support.

PlanExecutor provides two levels of API:

Telemetry Events

PlanExecutor emits telemetry events for observability:

  • [:ptc_runner, :plan_executor, :plan, :generated] - Plan generated
    • metadata: %{plan: plan, mission: mission}
  • [:ptc_runner, :plan_executor, :execution, :start] - Execution starting
    • metadata: %{plan: plan, mission: mission, attempt: n}
  • [:ptc_runner, :plan_executor, :execution, :stop] - Execution finished
    • measurements: %{duration: native_time}
    • metadata: %{status: :ok | :error, results: map}

  • [:ptc_runner, :plan_executor, :task, :start] - Task starting
    • metadata: %{task_id: id, task: task, attempt: n}
  • [:ptc_runner, :plan_executor, :task, :stop] - Task finished
    • measurements: %{duration: native_time}
    • metadata: %{task_id: id, status: :ok | :error | :skipped, result: term}

  • [:ptc_runner, :plan_executor, :replan, :start] - Replan starting
    • metadata: %{task_id: id, diagnosis: string, attempt: n}
  • [:ptc_runner, :plan_executor, :replan, :stop] - Replan finished
    • metadata: %{new_task_count: n}

High-Level API: run/2

The "one-stop-shop" for autonomous execution. Generates a plan from a mission and executes it with automatic replanning:

{:ok, results, metadata} = PlanExecutor.run("Research AAPL stock price",
  llm: my_llm,
  available_tools: %{
    "search" => "Search the web for information",
    "fetch_price" => "Get stock price for a symbol"
  },
  base_tools: %{
    "search" => &MyApp.search/1,
    "fetch_price" => &MyApp.fetch_price/1
  }
)

Low-Level API: execute/3

For when you already have a plan and want fine-grained control:

{:ok, plan} = MetaPlanner.plan(mission, llm: llm)
{:ok, metadata} = PlanExecutor.execute(plan, mission, llm: my_llm)

Execution Lifecycle

  1. Generate plan via MetaPlanner (for run/2)
  2. Validate plan structure
  3. Execute via PlanRunner
  4. If verification fails with :replan, generate repair plan
  5. Re-execute with repair plan (preserving completed results)
  6. Repeat until success, max replans reached, or unrecoverable error

Loop Prevention

The executor enforces limits to prevent runaway costs:

  • max_replan_attempts - Max replans for the same task (default: 3)
  • max_total_replans - Max total replans per execution (default: 5)
  • replan_cooldown_ms - Delay between replan attempts (default: 1000)

Observability

Use the on_event callback for real-time execution visibility:

PlanExecutor.run(mission,
  llm: my_llm,
  on_event: &PlanTracer.log_event/1
)

Summary

Functions

Execute a plan with automatic replanning on verification failures.

Generate and execute a plan from a natural language mission.

Types

event()

@type event() ::
  {:planning_started, %{mission: String.t()}}
  | {:planning_finished, %{task_count: non_neg_integer(), plan: map()}}
  | {:planning_failed, %{reason: term()}}
  | {:planning_retry, %{validation_errors: non_neg_integer()}}
  | {:execution_started, %{mission: String.t(), task_count: non_neg_integer()}}
  | {:execution_finished,
     %{status: :ok | :error | :waiting, duration_ms: non_neg_integer()}}
  | {:task_started, %{task_id: String.t(), attempt: pos_integer()}}
  | {:task_succeeded, %{task_id: String.t(), duration_ms: non_neg_integer()}}
  | {:task_failed, %{task_id: String.t(), reason: term()}}
  | {:task_skipped, %{task_id: String.t(), reason: :already_completed}}
  | {:verification_failed, %{task_id: String.t(), diagnosis: String.t()}}
  | {:quality_gate_started, %{task_id: String.t()}}
  | {:quality_gate_passed, %{task_id: String.t()}}
  | {:quality_gate_failed, %{task_id: String.t(), missing: [String.t()]}}
  | {:quality_gate_error, %{task_id: String.t(), reason: term()}}
  | {:replan_started,
     %{
       task_id: String.t(),
       diagnosis: String.t(),
       total_replans: non_neg_integer()
     }}
  | {:replan_finished, %{new_tasks: non_neg_integer()}}
  | {:task_step, %{task_id: String.t(), step: PtcRunner.Step.t()}}

event_callback()

@type event_callback() :: (event() -> any()) | nil

execute_result()

@type execute_result() ::
  {:ok, execution_metadata()}
  | {:error, term(), execution_metadata()}
  | {:waiting, [PtcRunner.PlanRunner.pending_review()], execution_metadata()}

execution_metadata()

@type execution_metadata() :: %{
  results: %{required(String.t()) => term()},
  replan_count: non_neg_integer(),
  execution_attempts: non_neg_integer(),
  total_duration_ms: non_neg_integer(),
  replan_history: [replan_record()]
}

replan_record()

@type replan_record() :: %{
  attempt: pos_integer(),
  task_id: String.t(),
  timestamp: DateTime.t(),
  input: String.t(),
  approach: String.t(),
  output: String.t(),
  diagnosis: String.t(),
  new_task_count: pos_integer()
}

run_result()

@type run_result() ::
  {:ok, %{required(String.t()) => term()}, execution_metadata()}
  | {:error, term(), execution_metadata()}
  | {:waiting, [PtcRunner.PlanRunner.pending_review()], execution_metadata()}

Functions

execute(plan, mission, opts)

@spec execute(PtcRunner.Plan.t(), String.t(), keyword()) :: execute_result()

Execute a plan with automatic replanning on verification failures.

Parameters

  • plan - Parsed %Plan{} struct
  • mission - Original mission description (needed for replanning context)
  • opts - Execution options

Options

All PlanRunner options are supported, plus:

  • max_replan_attempts - Max replans per task (default: 3)
  • max_total_replans - Max total replans (default: 5)
  • replan_cooldown_ms - Delay between replans (default: 1000)
  • on_event - Optional callback for lifecycle events. See event/0 for event types.
  • quality_gate - Enable pre-flight data sufficiency check before tasks with dependencies (default: false). A lightweight SubAgent validates upstream results are sufficient.
  • quality_gate_llm - Optional separate LLM callback for quality gate checks.

Returns

  • {:ok, metadata} - Success with results and execution stats
  • {:error, reason, metadata} - Failure with partial results and stats
  • {:waiting, pending, metadata} - Paused at human review

run(mission, opts)

@spec run(
  String.t(),
  keyword()
) :: run_result()

Generate and execute a plan from a natural language mission.

This is the "one-stop-shop" API that handles the full lifecycle:

  1. Generate plan via MetaPlanner
  2. Validate plan structure
  3. Execute with automatic replanning

Parameters

  • mission - Natural language description of what to accomplish
  • opts - Execution options

Options

All execute/3 options are supported, plus:

  • available_tools - Map of tool_name => description for planning
  • builtin_tools - List of builtin tool families (e.g., [:grep]). Descriptions are auto-injected into available_tools for the planner, and the tools are forwarded to PlanRunner so agents can use them.
  • constraints - Optional planning constraints/guidelines
  • quality_gate - Enable pre-flight data sufficiency check (default: false)
  • quality_gate_llm - Optional separate LLM for quality gate checks

Returns

  • {:ok, results, metadata} - Success with task results and execution stats
  • {:error, reason, metadata} - Failure with partial results and stats
  • {:waiting, pending, metadata} - Paused at human review

Example

{:ok, results, metadata} = PlanExecutor.run("Research AAPL stock",
  llm: my_llm,
  available_tools: %{
    "search" => "Search the web. Input: query. Output: list of results",
    "fetch_price" => "Get stock price. Input: symbol. Output: {price, change}"
  },
  base_tools: %{
    "search" => &MyApp.search/1,
    "fetch_price" => &MyApp.fetch_price/1
  },
  on_event: &PlanTracer.log_event/1
)