Lux.Beam (Lux v0.5.0)
View SourceBeams orchestrate workflows by combining multiple Prisms into sequential, parallel, or conditional execution paths with dependency management and execution logging.
Overview
A Beam is a workflow definition that:
- Combines multiple Prisms into a cohesive workflow
- Supports sequential, parallel, and conditional execution
- Handles parameter passing between steps
- Manages execution logging and error handling
- Can be used by Agents for agent coordination
Creating a Beam
To create a beam, use the Lux.Beam
module:
defmodule MyApp.Beams.TradingWorkflow do
use Lux.Beam,
name: "Trading Workflow",
description: "Analyzes and executes trades",
input_schema: %{
type: :object,
properties: %{
symbol: %{type: :string},
amount: %{type: :number}
},
required: ["symbol", "amount"]
},
output_schema: %{
type: :object,
properties: %{
trade_id: %{type: :string}
},
required: ["trade_id"]
},
generate_execution_log: true
sequence do
step(:market_data, MyApp.Prisms.MarketData, %{symbol: :symbol})
parallel do
step(:technical, MyApp.Prisms.TechnicalAnalysis,
%{data: {:ref, "market_data"}},
retries: 3,
store_io: true)
step(:sentiment, MyApp.Prisms.SentimentAnalysis,
%{symbol: :symbol},
timeout: :timer.seconds(30))
end
branch {__MODULE__, :should_trade?} do
true -> step(:execute, MyApp.Prisms.ExecuteTrade, %{
symbol: :symbol,
amount: :amount,
signals: {:ref, "technical"}
})
false -> step(:skip, MyApp.Prisms.LogDecision, %{
reason: "Unfavorable conditions"
})
end
end
def should_trade?(ctx) do
ctx.technical.score > 0.7 && ctx.sentiment.confidence > 0.8
end
end
Complex Example: Agent Management Beam
Here's an example of a more complex beam that manages other agents:
More examples: This beam:
- Evaluates current workload and team performance
- Decides whether to hire new agents or terminate underperforming ones
- Handles the hiring/firing process including resource allocation
defmodule HiringManagerBeam do
use Lux.Beam, generate_execution_log: true
sequence do
# First evaluate current workforce metrics
step(:workforce_metrics, WorkforceAnalysisPrism, %{
team_size: {:ref, "current_team_size"},
performance_data: {:ref, "agent_performance_metrics"},
workload_stats: {:ref, "current_workload"}
})
# Check if we need to scale the team
branch {__MODULE__, :needs_scaling?} do
:scale_up ->
sequence do
# Find suitable candidates
step(:candidate_search, AgentSearchPrism, %{
required_skills: {:ref, "workforce_metrics.skill_gaps"},
count: {:ref, "workforce_metrics.hiring_needs"}
})
# Interview and evaluate candidates
step(:candidate_evaluation, AgentEvaluationPrism, %{
candidates: {:ref, "candidate_search.results"},
evaluation_criteria: {:ref, "workforce_metrics.requirements"}
})
# Onboard selected candidates
step(:onboarding, AgentOnboardingPrism, %{
selected_agents: {:ref, "candidate_evaluation.approved_candidates"},
resource_allocation: {:ref, "workforce_metrics.available_resources"}
})
end
:scale_down ->
sequence do
# Identify underperforming agents
step(:performance_review, PerformanceReviewPrism, %{
agents: {:ref, "workforce_metrics.underperforming_agents"},
criteria: {:ref, "workforce_metrics.performance_thresholds"}
})
# Handle agent termination
step(:termination, AgentTerminationPrism, %{
agents: {:ref, "performance_review.agents_to_terminate"},
reassign_tasks: true
})
end
:maintain ->
# Just update team metrics and resources
step(:team_maintenance, TeamMaintenancePrism, %{
current_team: {:ref, "workforce_metrics.active_agents"},
resource_updates: {:ref, "workforce_metrics.resource_adjustments"}
})
end
end
def needs_scaling?(ctx) do
metrics = ctx["workforce_metrics"]
cond do
metrics.workload_ratio > 0.8 and metrics.performance_score > 0.7 -> :scale_up
metrics.efficiency_score < 0.4 or metrics.resource_strain > 0.9 -> :scale_down
true -> :maintain
end
end
end
Step Configuration Options
Steps can be configured with various options:
timeout
: Maximum execution time (default: 5 minutes)retries
: Number of retry attempts (default: 0)retry_backoff
: Delay between retries in ms (default: 1000)track
: Enable step tracking (default: false)dependencies
: List of dependent step IDs (default: [])store_io
: Store step I/O in execution log (default: false)fallback
: Module or function to handle step failures (default: nil)
Parameter References
Steps can reference outputs from previous steps using the {:ref, "step_id"}
syntax:
step(:analysis, AnalysisPrism, %{
data: {:ref, "data_collection"},
config: {:ref, "settings.analysis_config"}
})
Execution Logging
When generate_execution_log: true
is set, beams generate detailed execution logs
including step timing, inputs, outputs, and errors.
See execution_log/0
for the full log structure.
Summary
Types
An optional type of a type t
is a type that can be either the type t
or nil
.
Types
@type execution_log() :: %{ beam_id: String.t(), started_by: String.t(), started_at: DateTime.t(), completed_at: DateTime.t() | nil, status: :running | :completed | :failed, input: map() | nil, output: map() | nil, steps: [ %{ id: String.t(), name: String.t(), started_at: DateTime.t(), completed_at: DateTime.t() | nil, input: map() | nil, output: map() | nil, error: term() | nil, status: :pending | :running | :completed | :failed } ] }
@type nullable(t) :: t | nil
An optional type of a type t
is a type that can be either the type t
or nil
.
@type schema() :: map()
@type step() :: %{ id: String.t(), module: module(), params: map(), opts: %{ timeout: pos_integer(), retries: non_neg_integer(), retry_backoff: pos_integer(), track: boolean(), dependencies: [String.t()], store_io: boolean(), fallback: module() | (map() -> {:continue | :stop, term()}) | nil } }