Skip to content

Proposal: Refactor plan_node — Move Routing Logic to Graph Level

Problem

plan_node (agent.py:846-1192) is a ~350-line monolith that handles:

  1. Iteration bookkeeping (increment, max_iters check)
  2. Early abort/status checks (pre-existing aborted state)
  3. Region/currency gate (special-case ask_user for core fields)
  4. Decomposition management (proactive LLM decomposition generation)
  5. Schema composition (component-ready field merging)
  6. Calculator status routing (blocked/successful/limit checks)
  7. Acquisition task resolution (next task from recipe graph)
  8. LLM decision call (structured output → PlannerDecision)
  9. Decision post-processing (3 redirectors: derived fields, web-preferred fields, calculation adjustments)
  10. Loop/cap detection (duplicate search queries, per-field attempt caps)

This violates single-responsibility: plan_node is simultaneously the router, the guard, the LLM caller, and the policy enforcer. Debugging requires tracing through 15+ early-return branches.

Proposed Architecture

Split plan_node into three graph-level nodes and one routing function:

┌─────────────────────────────────────────────────────────────────────┐
│  Current: plan_node (350 lines, 10 responsibilities)                │
└─────────────────────────────────────────────────────────────────────┘

                              ↓ refactor ↓

┌─────────────────────────────────────────────────────────────────────┐
│  Proposed Graph                                                       │
│                                                                       │
│  START → guards                                                     │
│            │                                                         │
│            ├─ [gate] ──→ ask_user (region/currency)                  │
│            │                                                         │
│            ├─ [early_finish] ──→ finish (abort/calc-cap/done)       │
│            │                                                         │
│            └─ [proceed] ──→ acquire                                 │
│                              │                                       │
│                              ├─ [has_task] ──→ decide (LLM call)    │
│                              │                                        │
│                              └─ [no_task] ──→ finish                │
│                                              │                       │
│  decide → post_process (redirectors)         │                       │
│            │                                 │                       │
│            └─ route_after_plan ──→ {search, ask_user, reflect,      │
│                                     calculate, finish}               │
└─────────────────────────────────────────────────────────────────────┘

New Nodes

1. guards (pure function, ~40 lines)

Input: raw state Output: state updates + routing signal

Responsibilities: - Increment iterations - Check status == "aborted" → route to finish - Check iterations > max_iters → route to finish - Check region/currency gates → route to ask_user with injected decision - Check calculator status (blocked cap, success) → route to finish

Signature:

def guards_node(state: State) -> dict[str, Any]:
    """Increment iteration counter and apply early-exit guards.

    Returns state updates. Routing is handled by route_after_guards().
    """

2. acquire (pure function, ~60 lines)

Input: state (post-guards) Output: state updates (dynamic_decompositions, schema composition)

Responsibilities: - Run _proactive_decompositions() for composite fields - Run compose_ready_fields() for component merging - Call next_acquisition_task() to find pending work - Set acquisition_task in state (or None)

Signature:

def acquire_node(state: State) -> dict[str, Any]:
    """Resolve next acquisition task from recipe graph.

    Populates dynamic_decompositions and selects the next task.
    Returns state updates including 'next_acquisition_task'.
    """

3. decide (LLM caller, ~80 lines)

Input: state with next_acquisition_task Output: decision field

Responsibilities: - If next_acquisition_task exists → convert to PlannerDecision - Otherwise → build planner prompt, call LLM structured output - Apply decision redirectors: - _redirect_derived_direct_decision() - _redirect_premature_ask_for_web_field() - _adjust_calculation_decision() - Apply loop/cap detection (duplicate queries, per-field caps)

Signature:

def decide_node(state: State) -> dict[str, Any]:
    """Produce a PlannerDecision via acquisition task or LLM call.

    Applies post-decision redirectors and loop detection.
    """

New Routing Functions

route_after_guards(state) -> str

Routes to one of: - "ask_user" — region/currency gate triggered - "finish" — early termination (abort, calc-cap, max_iters) - "acquire" — normal flow continues

route_after_acquire(state) -> str

Routes to one of: - "decide" — acquisition task exists (from recipe or LLM) - "finish" — no actionable work remains

Updated Graph Topology

def _build_state_graph() -> Any:
    builder = StateGraph(State)

    # New granular nodes
    builder.add_node("guards", guards_node)
    builder.add_node("acquire", acquire_node)
    builder.add_node("decide", decide_node)

    # Existing nodes (unchanged)
    builder.add_node("search", search_node)
    builder.add_node("observe", observe_node)
    builder.add_node("calculate", calculate_node)
    builder.add_node("ask_user", ask_user_node)
    builder.add_node("observe_user", observe_user_node)
    builder.add_node("reflect", reflect_node)
    builder.add_node("finish", finish_node)

    # New edges
    builder.add_edge(START, "guards")
    builder.add_conditional_edges("guards", route_after_guards, {
        "ask_user": "ask_user",
        "finish": "finish",
        "acquire": "acquire",
    })
    builder.add_conditional_edges("acquire", route_after_acquire, {
        "decide": "decide",
        "finish": "finish",
    })
    builder.add_edge("decide", "plan_router")  # or inline routing

    # Existing edges (unchanged)
    builder.add_conditional_edges("plan_router", route_after_plan, {
        "search": "search",
        "reflect": "reflect",
        "ask_user": "ask_user",
        "calculate": "calculate",
        "finish": "finish",
    })
    builder.add_conditional_edges("search", route_after_search, {
        "observe": "observe",
        "guards": "guards",  # was "plan" — now loops back to guards
    })
    builder.add_edge("observe", "guards")
    builder.add_edge("calculate", "guards")
    builder.add_edge("ask_user", "observe_user")
    builder.add_edge("observe_user", "guards")
    builder.add_edge("reflect", "guards")
    builder.add_edge("finish", END)

    return builder

Migration Path

Phase 1: Extract without changing topology (low risk)

  1. Extract guards_node from lines 848-887 (iteration + region/currency gates)
  2. Extract acquire_node from lines 889-988 (decomposition + acquisition task)
  3. Extract decide_node from lines 990-1192 (LLM call + redirectors)
  4. Keep plan_node as a thin orchestrator that calls all three
  5. Verify: all existing tests pass, no behavior change

Phase 2: Wire into graph (medium risk)

  1. Add new nodes to _build_state_graph()
  2. Add route_after_guards and route_after_acquire
  3. Replace plan node with guards → acquire → decide chain
  4. Update all loop-back edges (planguards)
  5. Verify: planner integration tests pass

Phase 3: Cleanup (low risk)

  1. Remove old plan_node function
  2. Update AGENTS.md and README.md
  3. Update current-graph.md reference

Expected Benefits

Metric Before After
plan_node LOC ~350 0 (split into 3)
guards_node LOC ~40
acquire_node LOC ~60
decide_node LOC ~80
Branching depth 4-5 levels 2 levels max
Testability Integration only Unit-testable per node
Debuggability Trace 15 branches Inspect state at each boundary

Risks

Risk Mitigation
State key changes break checkpointer Keep all state keys identical; new nodes read/write same fields
Graph topology change breaks interrupt/resume guards node is first after every loop-back; interrupt state unchanged
Hidden coupling between extracted functions Phase 1 keeps plan_node as orchestrator to surface coupling before full split

Open Questions

  1. Should decide node include the loop/cap detection, or should that be a separate validate_decision node?
  2. Current proposal: inline in decide (simpler graph)
  3. Alternative: separate node (more testable, but adds graph complexity)

  4. Should plan_router be a node or just a routing function?

  5. Current proposal: routing function (no state mutation needed)
  6. Alternative: thin node that sets decision key (consistency with other nodes)

  7. Naming: guards / acquire / decide vs pre_flight / resolve_task / choose_action?