Proposal: Refactor plan_node — Move Routing Logic to Graph Level¶
Problem¶
plan_node (agent.py:846-1192) is a ~350-line monolith that handles:
- Iteration bookkeeping (increment, max_iters check)
- Early abort/status checks (pre-existing aborted state)
- Region/currency gate (special-case ask_user for core fields)
- Decomposition management (proactive LLM decomposition generation)
- Schema composition (component-ready field merging)
- Calculator status routing (blocked/successful/limit checks)
- Acquisition task resolution (next task from recipe graph)
- LLM decision call (structured output → PlannerDecision)
- Decision post-processing (3 redirectors: derived fields, web-preferred fields, calculation adjustments)
- Loop/cap detection (duplicate search queries, per-field attempt caps)
This violates single-responsibility: plan_node is simultaneously the router, the guard, the LLM caller, and the policy enforcer. Debugging requires tracing through 15+ early-return branches.
Proposed Architecture¶
Split plan_node into three graph-level nodes and one routing function:
┌─────────────────────────────────────────────────────────────────────┐
│ Current: plan_node (350 lines, 10 responsibilities) │
└─────────────────────────────────────────────────────────────────────┘
↓ refactor ↓
┌─────────────────────────────────────────────────────────────────────┐
│ Proposed Graph │
│ │
│ START → guards │
│ │ │
│ ├─ [gate] ──→ ask_user (region/currency) │
│ │ │
│ ├─ [early_finish] ──→ finish (abort/calc-cap/done) │
│ │ │
│ └─ [proceed] ──→ acquire │
│ │ │
│ ├─ [has_task] ──→ decide (LLM call) │
│ │ │
│ └─ [no_task] ──→ finish │
│ │ │
│ decide → post_process (redirectors) │ │
│ │ │ │
│ └─ route_after_plan ──→ {search, ask_user, reflect, │
│ calculate, finish} │
└─────────────────────────────────────────────────────────────────────┘
New Nodes¶
1. guards (pure function, ~40 lines)¶
Input: raw state Output: state updates + routing signal
Responsibilities:
- Increment iterations
- Check status == "aborted" → route to finish
- Check iterations > max_iters → route to finish
- Check region/currency gates → route to ask_user with injected decision
- Check calculator status (blocked cap, success) → route to finish
Signature:
def guards_node(state: State) -> dict[str, Any]:
"""Increment iteration counter and apply early-exit guards.
Returns state updates. Routing is handled by route_after_guards().
"""
2. acquire (pure function, ~60 lines)¶
Input: state (post-guards) Output: state updates (dynamic_decompositions, schema composition)
Responsibilities:
- Run _proactive_decompositions() for composite fields
- Run compose_ready_fields() for component merging
- Call next_acquisition_task() to find pending work
- Set acquisition_task in state (or None)
Signature:
def acquire_node(state: State) -> dict[str, Any]:
"""Resolve next acquisition task from recipe graph.
Populates dynamic_decompositions and selects the next task.
Returns state updates including 'next_acquisition_task'.
"""
3. decide (LLM caller, ~80 lines)¶
Input: state with next_acquisition_task
Output: decision field
Responsibilities:
- If next_acquisition_task exists → convert to PlannerDecision
- Otherwise → build planner prompt, call LLM structured output
- Apply decision redirectors:
- _redirect_derived_direct_decision()
- _redirect_premature_ask_for_web_field()
- _adjust_calculation_decision()
- Apply loop/cap detection (duplicate queries, per-field caps)
Signature:
def decide_node(state: State) -> dict[str, Any]:
"""Produce a PlannerDecision via acquisition task or LLM call.
Applies post-decision redirectors and loop detection.
"""
New Routing Functions¶
route_after_guards(state) -> str¶
Routes to one of:
- "ask_user" — region/currency gate triggered
- "finish" — early termination (abort, calc-cap, max_iters)
- "acquire" — normal flow continues
route_after_acquire(state) -> str¶
Routes to one of:
- "decide" — acquisition task exists (from recipe or LLM)
- "finish" — no actionable work remains
Updated Graph Topology¶
def _build_state_graph() -> Any:
builder = StateGraph(State)
# New granular nodes
builder.add_node("guards", guards_node)
builder.add_node("acquire", acquire_node)
builder.add_node("decide", decide_node)
# Existing nodes (unchanged)
builder.add_node("search", search_node)
builder.add_node("observe", observe_node)
builder.add_node("calculate", calculate_node)
builder.add_node("ask_user", ask_user_node)
builder.add_node("observe_user", observe_user_node)
builder.add_node("reflect", reflect_node)
builder.add_node("finish", finish_node)
# New edges
builder.add_edge(START, "guards")
builder.add_conditional_edges("guards", route_after_guards, {
"ask_user": "ask_user",
"finish": "finish",
"acquire": "acquire",
})
builder.add_conditional_edges("acquire", route_after_acquire, {
"decide": "decide",
"finish": "finish",
})
builder.add_edge("decide", "plan_router") # or inline routing
# Existing edges (unchanged)
builder.add_conditional_edges("plan_router", route_after_plan, {
"search": "search",
"reflect": "reflect",
"ask_user": "ask_user",
"calculate": "calculate",
"finish": "finish",
})
builder.add_conditional_edges("search", route_after_search, {
"observe": "observe",
"guards": "guards", # was "plan" — now loops back to guards
})
builder.add_edge("observe", "guards")
builder.add_edge("calculate", "guards")
builder.add_edge("ask_user", "observe_user")
builder.add_edge("observe_user", "guards")
builder.add_edge("reflect", "guards")
builder.add_edge("finish", END)
return builder
Migration Path¶
Phase 1: Extract without changing topology (low risk)¶
- Extract
guards_nodefrom lines 848-887 (iteration + region/currency gates) - Extract
acquire_nodefrom lines 889-988 (decomposition + acquisition task) - Extract
decide_nodefrom lines 990-1192 (LLM call + redirectors) - Keep
plan_nodeas a thin orchestrator that calls all three - Verify: all existing tests pass, no behavior change
Phase 2: Wire into graph (medium risk)¶
- Add new nodes to
_build_state_graph() - Add
route_after_guardsandroute_after_acquire - Replace
plannode withguards → acquire → decidechain - Update all loop-back edges (
plan→guards) - Verify: planner integration tests pass
Phase 3: Cleanup (low risk)¶
- Remove old
plan_nodefunction - Update AGENTS.md and README.md
- Update current-graph.md reference
Expected Benefits¶
| Metric | Before | After |
|---|---|---|
plan_node LOC |
~350 | 0 (split into 3) |
guards_node LOC |
— | ~40 |
acquire_node LOC |
— | ~60 |
decide_node LOC |
— | ~80 |
| Branching depth | 4-5 levels | 2 levels max |
| Testability | Integration only | Unit-testable per node |
| Debuggability | Trace 15 branches | Inspect state at each boundary |
Risks¶
| Risk | Mitigation |
|---|---|
| State key changes break checkpointer | Keep all state keys identical; new nodes read/write same fields |
| Graph topology change breaks interrupt/resume | guards node is first after every loop-back; interrupt state unchanged |
| Hidden coupling between extracted functions | Phase 1 keeps plan_node as orchestrator to surface coupling before full split |
Open Questions¶
- Should
decidenode include the loop/cap detection, or should that be a separatevalidate_decisionnode? - Current proposal: inline in
decide(simpler graph) -
Alternative: separate node (more testable, but adds graph complexity)
-
Should
plan_routerbe a node or just a routing function? - Current proposal: routing function (no state mutation needed)
-
Alternative: thin node that sets
decisionkey (consistency with other nodes) -
Naming:
guards/acquire/decidevspre_flight/resolve_task/choose_action?