Lux.Beam (Lux v0.5.0)

View Source

Beams orchestrate workflows by combining multiple Prisms into sequential, parallel, or conditional execution paths with dependency management and execution logging.

Overview

A Beam is a workflow definition that:

  • Combines multiple Prisms into a cohesive workflow
  • Supports sequential, parallel, and conditional execution
  • Handles parameter passing between steps
  • Manages execution logging and error handling
  • Can be used by Agents for agent coordination

Creating a Beam

To create a beam, use the Lux.Beam module:

defmodule MyApp.Beams.TradingWorkflow do
  use Lux.Beam,
    name: "Trading Workflow",
    description: "Analyzes and executes trades",
    input_schema: %{
      type: :object,
      properties: %{
        symbol: %{type: :string},
        amount: %{type: :number}
      },
      required: ["symbol", "amount"]
    },
    output_schema: %{
      type: :object,
      properties: %{
        trade_id: %{type: :string}
      },
      required: ["trade_id"]
    },
    generate_execution_log: true

  sequence do
    step(:market_data, MyApp.Prisms.MarketData, %{symbol: :symbol})

    parallel do
      step(:technical, MyApp.Prisms.TechnicalAnalysis,
        %{data: {:ref, "market_data"}},
        retries: 3,
        store_io: true)

      step(:sentiment, MyApp.Prisms.SentimentAnalysis,
        %{symbol: :symbol},
        timeout: :timer.seconds(30))
    end

    branch {__MODULE__, :should_trade?} do
      true -> step(:execute, MyApp.Prisms.ExecuteTrade, %{
        symbol: :symbol,
        amount: :amount,
        signals: {:ref, "technical"}
      })
      false -> step(:skip, MyApp.Prisms.LogDecision, %{
        reason: "Unfavorable conditions"
      })
    end
  end

  def should_trade?(ctx) do
    ctx.technical.score > 0.7 && ctx.sentiment.confidence > 0.8
  end
end

Complex Example: Agent Management Beam

Here's an example of a more complex beam that manages other agents:

More examples: This beam:

  1. Evaluates current workload and team performance
  2. Decides whether to hire new agents or terminate underperforming ones
  3. Handles the hiring/firing process including resource allocation
defmodule HiringManagerBeam do
  use Lux.Beam, generate_execution_log: true

  sequence do
    # First evaluate current workforce metrics
    step(:workforce_metrics, WorkforceAnalysisPrism, %{
      team_size: {:ref, "current_team_size"},
      performance_data: {:ref, "agent_performance_metrics"},
      workload_stats: {:ref, "current_workload"}
    })

    # Check if we need to scale the team
    branch {__MODULE__, :needs_scaling?} do
      :scale_up ->
        sequence do
          # Find suitable candidates
          step(:candidate_search, AgentSearchPrism, %{
            required_skills: {:ref, "workforce_metrics.skill_gaps"},
            count: {:ref, "workforce_metrics.hiring_needs"}
          })

          # Interview and evaluate candidates
          step(:candidate_evaluation, AgentEvaluationPrism, %{
            candidates: {:ref, "candidate_search.results"},
            evaluation_criteria: {:ref, "workforce_metrics.requirements"}
          })

          # Onboard selected candidates
          step(:onboarding, AgentOnboardingPrism, %{
            selected_agents: {:ref, "candidate_evaluation.approved_candidates"},
            resource_allocation: {:ref, "workforce_metrics.available_resources"}
          })
        end

      :scale_down ->
        sequence do
          # Identify underperforming agents
          step(:performance_review, PerformanceReviewPrism, %{
            agents: {:ref, "workforce_metrics.underperforming_agents"},
            criteria: {:ref, "workforce_metrics.performance_thresholds"}
          })

          # Handle agent termination
          step(:termination, AgentTerminationPrism, %{
            agents: {:ref, "performance_review.agents_to_terminate"},
            reassign_tasks: true
          })
        end

      :maintain ->
        # Just update team metrics and resources
        step(:team_maintenance, TeamMaintenancePrism, %{
          current_team: {:ref, "workforce_metrics.active_agents"},
          resource_updates: {:ref, "workforce_metrics.resource_adjustments"}
        })
    end
  end

  def needs_scaling?(ctx) do
    metrics = ctx["workforce_metrics"]
    cond do
      metrics.workload_ratio > 0.8 and metrics.performance_score > 0.7 -> :scale_up
      metrics.efficiency_score < 0.4 or metrics.resource_strain > 0.9 -> :scale_down
      true -> :maintain
    end
  end
end

Step Configuration Options

Steps can be configured with various options:

  • timeout: Maximum execution time (default: 5 minutes)
  • retries: Number of retry attempts (default: 0)
  • retry_backoff: Delay between retries in ms (default: 1000)
  • track: Enable step tracking (default: false)
  • dependencies: List of dependent step IDs (default: [])
  • store_io: Store step I/O in execution log (default: false)
  • fallback: Module or function to handle step failures (default: nil)

Parameter References

Steps can reference outputs from previous steps using the {:ref, "step_id"} syntax:

step(:analysis, AnalysisPrism, %{
  data: {:ref, "data_collection"},
  config: {:ref, "settings.analysis_config"}
})

Execution Logging

When generate_execution_log: true is set, beams generate detailed execution logs including step timing, inputs, outputs, and errors.

See execution_log/0 for the full log structure.

Summary

Types

An optional type of a type t is a type that can be either the type t or nil.

t()

Functions

Creates a new beam from attributes

Validates a beam definition at compile time

Types

execution_log()

@type execution_log() :: %{
  beam_id: String.t(),
  started_by: String.t(),
  started_at: DateTime.t(),
  completed_at: DateTime.t() | nil,
  status: :running | :completed | :failed,
  input: map() | nil,
  output: map() | nil,
  steps: [
    %{
      id: String.t(),
      name: String.t(),
      started_at: DateTime.t(),
      completed_at: DateTime.t() | nil,
      input: map() | nil,
      output: map() | nil,
      error: term() | nil,
      status: :pending | :running | :completed | :failed
    }
  ]
}

nullable(t)

@type nullable(t) :: t | nil

An optional type of a type t is a type that can be either the type t or nil.

schema()

@type schema() :: map()

step()

@type step() :: %{
  id: String.t(),
  module: module(),
  params: map(),
  opts: %{
    timeout: pos_integer(),
    retries: non_neg_integer(),
    retry_backoff: pos_integer(),
    track: boolean(),
    dependencies: [String.t()],
    store_io: boolean(),
    fallback: module() | (map() -> {:continue | :stop, term()}) | nil
  }
}

t()

@type t() :: %Lux.Beam{
  definition: [step()],
  description: String.t(),
  generate_execution_log: boolean(),
  id: String.t(),
  input_schema: nullable(schema()),
  name: String.t(),
  output_schema: nullable(schema()),
  timeout: pos_integer()
}

Functions

branch(condition, list)

(macro)

new(attrs)

Creates a new beam from attributes

parallel(list)

(macro)

sequence(list)

(macro)

step(id, module, params, opts \\ [])

(macro)

validate!(beam)

Validates a beam definition at compile time