Skip to content

Planner Graph Refactor Proposal: Decomposing the plan Node

Problem Statement

The current plan node in src/venturescope/planner/agent.py (lines 846-1192, ~350 lines) is a monolithic function that handles:

  1. Early termination checks (aborted state, max iterations)
  2. Region/currency prerequisite questions
  3. Schema composition and dynamic decomposition generation
  4. Calculator status evaluation
  5. Acquisition task routing (blocked calculator, missing fields)
  6. Auto-finish logic (all inputs collected)
  7. LLM-based decision making
  8. Post-LLM decomposition generation
  9. Decision redirects (derived fields, premature asks)
  10. Search/ask-user cap enforcement
  11. Final decision adjustment and event emission

This violates single-responsibility principles and makes the graph structure opaque. The plan node is effectively a state machine within a state machine.

Proposed Architecture

Decompose plan into graph-level nodes with explicit routing:

flowchart TD
    planner_start([START]) --> check_termination

    check_termination -->|aborted or max_iters| finish
    check_termination -->|continue| check_region_currency

    check_region_currency -->|needs region| ask_region[ask_region]
    check_region_currency -->|needs currency| ask_currency[ask_currency]
    check_region_currency -->|ready| compose_schema

    ask_region --> observe_user
    ask_currency --> observe_user
    observe_user --> check_region_currency

    compose_schema --> check_calculator

    check_calculator -->|calc cap reached| finish
    check_calculator -->|calc succeeded| finish
    check_calculator -->|calc blocked| acquisition_routing
    check_calculator -->|no calc or pending| acquisition_routing

    acquisition_routing -->|has task| decide_action
    acquisition_routing -->|no task| check_completion

    check_completion -->|all collected| finish
    check_completion -->|missing fields| decide_action

    decide_action --> llm_decide

    llm_decide -->|LLM failure| finish
    llm_decide -->|valid decision| post_process

    post_process --> enforce_caps

    enforce_caps -->|search cap| ask_user
    enforce_caps -->|ask cap| finish
    enforce_caps -->|ok| route_decision

    route_decision -->|search| search
    route_decision -->|ask_user| ask_user
    route_decision -->|reflect| reflect
    route_decision -->|calculate| calculate
    route_decision -->|finish| finish

    search -->|has observation| observe
    search -->|no hits| check_termination

    observe --> check_termination
    calculate --> check_termination
    ask_user --> observe_user
    reflect --> check_termination
    finish --> planner_end([END])

Node Decomposition

1. check_termination (Guard Node)

Responsibility: Early exit conditions

def check_termination_node(state: State) -> dict[str, Any]:
    settings = get_planner_settings()
    iters = state["iterations"] + 1

    if state.get("status") == "aborted":
        return {
            "decision": PlannerDecision(action="finish", reasoning="Planner already aborted."),
            "iterations": iters,
            "status": "aborted",
            "_route": "finish"
        }

    if iters > settings.max_iters:
        return {
            "decision": PlannerDecision(
                action="finish",
                reasoning=f"Iteration cap {settings.max_iters} reached."
            ),
            "iterations": iters,
            "status": "running",
            "_route": "finish"
        }

    return {"iterations": iters, "_route": "continue"}

def route_after_termination(state: State) -> str:
    return state.get("_route", "continue")

2. check_region_currency (Guard Node)

Responsibility: Prerequisite field collection

def check_region_currency_node(state: State) -> dict[str, Any]:
    if _needs_region_question(state):
        return {
            "decision": PlannerDecision(
                action="ask_user",
                target_field=_REGION_FIELD,
                user_question="Where is this business based? ...",
                reasoning="Confirm region before any value lookups.",
                suggested_answers=[]
            ),
            "_route": "ask_region"
        }

    if _needs_currency_question(state):
        return {
            "decision": PlannerDecision(
                action="ask_user",
                target_field=_CURRENCY_FIELD,
                user_question="What reporting currency should I use? ...",
                reasoning="Confirm reporting currency before any value lookups.",
                suggested_answers=_currency_suggestions_for(state["idea"].location)
            ),
            "_route": "ask_currency"
        }

    return {"_route": "ready"}

def route_after_region_currency(state: State) -> str:
    return state.get("_route", "ready")

3. compose_schema (Transformation Node)

Responsibility: Schema composition and decomposition management

def compose_schema_node(state: State) -> dict[str, Any]:
    dynamic_decomps = dict(state.get("dynamic_decompositions") or {})

    # Proactive decomposition
    dynamic_decomps = _proactive_decompositions(
        state.get("static_field_acquisition") or {},
        state["schema"],
        dynamic_decomps,
        state["idea"]
    )

    all_recipes = build_dynamic_recipes(
        dynamic_decomps,
        state.get("static_field_acquisition") or {}
    )

    try:
        schema_dict = compose_ready_fields(
            state["schema"],
            all_recipes,
            expected_currency=state["idea"].currency
        )
    except ValueError as exc:
        log.warning(f"component composition failed: {exc}")
        schema_dict = state["schema"]

    schema_changed = schema_dict != state["schema"]

    out: dict[str, Any] = {}
    if schema_changed:
        out["schema"] = schema_dict
    if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
        out["dynamic_decompositions"] = dynamic_decomps

    return out

4. check_calculator (Guard Node)

Responsibility: Calculator state evaluation

def check_calculator_node(state: State) -> dict[str, Any]:
    settings = get_planner_settings()
    profile_name = state.get("profile_name")

    if not _profile_has_calculator(profile_name):
        return {"_route": "no_calc"}

    if (
        state.get("calculation_attempts", 0) >= settings.max_calculation_attempts
        and state.get("last_calculation_status") in {CalculatorRunStatus.BLOCKED.value, "ERROR"}
    ):
        return {
            "decision": PlannerDecision(
                action="finish",
                reasoning=f"Calculation cap {settings.max_calculation_attempts} reached."
            ),
            "status": "aborted",
            "_route": "finish"
        }

    if _successful_calculation_current(state):
        return {
            "decision": PlannerDecision(
                action="finish",
                reasoning="Deterministic calculator completed successfully."
            ),
            "_route": "finish"
        }

    if state.get("last_calculation_status") == CalculatorRunStatus.BLOCKED.value:
        return {"_route": "blocked"}

    return {"_route": "pending"}

def route_after_calculator(state: State) -> str:
    return state.get("_route", "pending")

5. acquisition_routing (Decision Node)

Responsibility: Handle blocked calculator and missing field acquisition

def acquisition_routing_node(state: State) -> dict[str, Any]:
    settings = get_planner_settings()
    dynamic_decomps = dict(state.get("dynamic_decompositions") or {})
    all_recipes = build_dynamic_recipes(
        dynamic_decomps,
        state.get("static_field_acquisition") or {}
    )

    acquisition_task = None

    # Blocked calculator path
    if state.get("last_calculation_status") == CalculatorRunStatus.BLOCKED.value:
        acquisition_task = next_acquisition_task(
            recipes=all_recipes,
            schema_dict=state["schema"],
            attempts=state["attempts"],
            idea=state["idea"],
            last_calculation_errors=state.get("last_calculation_errors", []),
            max_attempts_per_field=settings.max_attempts_per_field
        )

        if acquisition_task is None:
            # Try dynamic decomposition for blocked field
            blocked_path = _first_blocked_field_without_recipe(
                state.get("last_calculation_errors", []),
                all_recipes
            )
            if blocked_path and blocked_path not in dynamic_decomps:
                specs = generate_decomposition(blocked_path, state["idea"], _llm())
                dynamic_decomps = {**dynamic_decomps, blocked_path: specs or []}
                if specs:
                    all_recipes = build_dynamic_recipes(
                        dynamic_decomps,
                        state.get("static_field_acquisition") or {}
                    )

            if blocked_path and blocked_path in dynamic_decomps:
                acquisition_task = next_acquisition_task(
                    recipes=all_recipes,
                    schema_dict=state["schema"],
                    attempts=state["attempts"],
                    idea=state["idea"],
                    last_calculation_errors=state.get("last_calculation_errors", []),
                    max_attempts_per_field=settings.max_attempts_per_field,
                    only_field=blocked_path
                )

    if acquisition_task is not None:
        decision = _decision_from_acquisition_task(acquisition_task)
        decision = _adjust_calculation_decision(state, decision)
        out = {
            "decision": decision,
            "_route": "has_task"
        }
        if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
            out["dynamic_decompositions"] = dynamic_decomps
        return out

    return {"_route": "no_task"}

def route_after_acquisition(state: State) -> str:
    return state.get("_route", "no_task")

6. check_completion (Guard Node)

Responsibility: Auto-finish when all inputs collected

def check_completion_node(state: State) -> dict[str, Any]:
    recipes = build_dynamic_recipes(
        state.get("dynamic_decompositions") or {},
        state.get("static_field_acquisition") or {}
    )

    auto_finish_ok = True
    for path, leaf in iter_schema_leaves(state["schema"]):
        if is_derived_field(path, recipes):
            continue
        src = field_source(path)
        leaf_value = leaf.get("value") if isinstance(leaf, dict) else None
        if src in ("web", "either") and leaf_value is None:
            auto_finish_ok = False
            break
        if src == "user" and leaf_value is None and not state["attempts"].get(path):
            auto_finish_ok = False
            break

    missing = missing_leaves(state["schema"])
    actionable_missing = actionable_missing_fields(missing, recipes)
    open_acquisition_tasks = acquisition_task_summary(recipes, state["schema"])

    if not actionable_missing and open_acquisition_tasks:
        # Try to get next acquisition task
        settings = get_planner_settings()
        acquisition_task = next_acquisition_task(
            recipes=recipes,
            schema_dict=state["schema"],
            attempts=state["attempts"],
            idea=state["idea"],
            last_calculation_errors=state.get("last_calculation_errors", []),
            max_attempts_per_field=settings.max_attempts_per_field
        )
        if acquisition_task is not None:
            decision = _decision_from_acquisition_task(acquisition_task)
            decision = _adjust_calculation_decision(state, decision)
            return {"decision": decision, "_route": "has_task"}

    if auto_finish_ok and not actionable_missing and not open_acquisition_tasks:
        decision = PlannerDecision(
            action="finish",
            reasoning="All raw inputs collected."
        )
        decision = _adjust_calculation_decision(state, decision)
        return {"decision": decision, "_route": "finish"}

    return {"_route": "continue"}

def route_after_completion(state: State) -> str:
    return state.get("_route", "continue")

7. llm_decide (LLM Node)

Responsibility: LLM-based decision making

def llm_decide_node(state: State) -> dict[str, Any]:
    settings = get_planner_settings()
    recipes = build_dynamic_recipes(
        state.get("dynamic_decompositions") or {},
        state.get("static_field_acquisition") or {}
    )

    missing = missing_leaves(state["schema"])
    actionable_missing = actionable_missing_fields(missing, recipes)
    open_acquisition_tasks = acquisition_task_summary(recipes, state["schema"])

    system, user = planner_prompt(
        idea=state["idea"],
        schema_dict=state["schema"],
        attempts=state["attempts"],
        notes=state["notes"],
        iterations=state["iterations"],
        max_iters=settings.max_iters,
        field_sources=_field_sources(state["schema"], recipes),
        missing=actionable_missing,
        acquisition_tasks=open_acquisition_tasks,
        profile_name=state.get("profile_name", ""),
        terminology=state.get("terminology", {}),
        critical_parameters=state.get("critical_parameters", []),
        last_calculation_status=state.get("last_calculation_status"),
        last_calculation_errors=state.get("last_calculation_errors", []),
        composite_fields=list(
            _requires_components_paths(state.get("static_field_acquisition") or {})
        )
    )

    try:
        decision = _llm().structured(system, user, schema=PlannerDecision, temperature=0.0)
        return {"decision": decision, "_llm_failed": False}
    except (LLMInvalidOutputError, LLMTimeoutError, LLMError, ValidationError, ValueError) as e:
        log.warning(f"planner structured-output failure: {e} — finishing")
        return {
            "decision": PlannerDecision(
                action="finish",
                reasoning=f"Planner output invalid: {e}"
            ),
            "_llm_failed": True,
            "_route": "finish"
        }

def route_after_llm(state: State) -> str:
    if state.get("_llm_failed"):
        return "finish"
    return "post_process"

8. post_process (Transformation Node)

Responsibility: Post-LLM decomposition and redirects

def post_process_node(state: State) -> dict[str, Any]:
    decision = state["decision"]
    dynamic_decomps = dict(state.get("dynamic_decompositions") or {})
    recipes = build_dynamic_recipes(
        dynamic_decomps,
        state.get("static_field_acquisition") or {}
    )

    # Generate decomposition for requires-components fields
    if (
        decision.action in {"search", "ask_user"}
        and decision.target_field
        and decision.target_component is None
        and decision.target_field in _requires_components_paths(
            state.get("static_field_acquisition") or {}
        )
        and decision.target_field not in dynamic_decomps
    ):
        specs = generate_decomposition(decision.target_field, state["idea"], _llm())
        dynamic_decomps = {**dynamic_decomps, decision.target_field: specs or []}
        if specs:
            recipes = build_dynamic_recipes(
                dynamic_decomps,
                state.get("static_field_acquisition") or {}
            )

    # Apply redirects
    decision = _redirect_derived_direct_decision(state, recipes, decision)
    decision = _redirect_premature_ask_for_web_field(state, recipes, decision)

    out = {"decision": decision}
    if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
        out["dynamic_decompositions"] = dynamic_decomps

    return out

9. enforce_caps (Guard Node)

Responsibility: Search and ask-user cap enforcement

def enforce_caps_node(state: State) -> dict[str, Any]:
    settings = get_planner_settings()
    decision = state["decision"]
    recipes = build_dynamic_recipes(
        state.get("dynamic_decompositions") or {},
        state.get("static_field_acquisition") or {}
    )

    # Search cap
    if decision.action == "search" and decision.target_field:
        same = [
            a for a in _attempts_for(state, decision.target_field, decision.target_component)
            if a.action == "search"
        ]
        prior_queries = {a.query_or_question for a in same if a.query_or_question}
        duplicate = decision.search_query in prior_queries

        if len(same) >= settings.max_attempts_per_field or duplicate:
            reason = "duplicate query" if duplicate else f"cap {settings.max_attempts_per_field}"
            log.info(f"field {decision.target_field} search loop ({reason}) → forcing ask_user")

            if decision.target_component:
                task = next_acquisition_task(
                    recipes=recipes,
                    schema_dict=state["schema"],
                    attempts=state["attempts"],
                    idea=state["idea"],
                    last_calculation_errors=state.get("last_calculation_errors", []),
                    max_attempts_per_field=settings.max_attempts_per_field,
                    only_field=decision.target_field
                )
                decision = (
                    _decision_from_acquisition_task(task)
                    if task is not None
                    else PlannerDecision(action="reflect", reasoning="Search cap reached.")
                )
            else:
                decision = PlannerDecision(
                    action="ask_user",
                    target_field=decision.target_field,
                    user_question=f"I couldn't find reliable data for {decision.target_field}. ...",
                    reasoning="Per-field search cap reached."
                )
            return {"decision": decision, "_route": "ask_user"}

    # Ask-user cap
    if decision.action == "ask_user" and decision.target_field:
        same = [
            a for a in _attempts_for(state, decision.target_field, decision.target_component)
            if a.action == "ask_user"
        ]
        if len(same) >= settings.max_attempts_per_field:
            log.info(f"field {decision.target_field} ask_user cap reached → finishing")
            decision = PlannerDecision(
                action="finish",
                reasoning=f"Ask-user cap {settings.max_attempts_per_field} reached for {decision.target_field}."
            )
            decision = _adjust_calculation_decision(state, decision)
            return {"decision": decision, "status": "aborted", "_route": "finish"}

    # Final adjustment
    if not state.get("_llm_failed"):
        decision = _adjust_calculation_decision(state, decision)

    return {"decision": decision, "_route": "ok"}

def route_after_enforce_caps(state: State) -> str:
    return state.get("_route", "ok")

10. route_decision (Router Node)

Responsibility: Final routing based on decision action

def route_decision(state: State) -> str:
    decision = state["decision"]
    assert decision is not None
    _emit_decision_event(decision)
    return decision.action

Graph Construction

def _build_state_graph() -> Any:
    builder = StateGraph(State)

    # Guard nodes
    builder.add_node("check_termination", check_termination_node)
    builder.add_node("check_region_currency", check_region_currency_node)
    builder.add_node("check_calculator", check_calculator_node)
    builder.add_node("check_completion", check_completion_node)
    builder.add_node("enforce_caps", enforce_caps_node)

    # Transformation nodes
    builder.add_node("compose_schema", compose_schema_node)
    builder.add_node("acquisition_routing", acquisition_routing_node)
    builder.add_node("post_process", post_process_node)

    # LLM node
    builder.add_node("llm_decide", llm_decide_node)

    # Action nodes
    builder.add_node("search", search_node)
    builder.add_node("observe", observe_node)
    builder.add_node("calculate", calculate_node)
    builder.add_node("ask_user", ask_user_node)
    builder.add_node("observe_user", observe_user_node)
    builder.add_node("reflect", reflect_node)
    builder.add_node("finish", finish_node)

    # Entry
    builder.add_edge(START, "check_termination")

    # Termination routing
    builder.add_conditional_edges(
        "check_termination",
        route_after_termination,
        {"finish": "finish", "continue": "check_region_currency"}
    )

    # Region/currency routing
    builder.add_conditional_edges(
        "check_region_currency",
        route_after_region_currency,
        {
            "ask_region": "ask_user",
            "ask_currency": "ask_user",
            "ready": "compose_schema"
        }
    )

    # Schema composition → calculator check
    builder.add_edge("compose_schema", "check_calculator")

    # Calculator routing
    builder.add_conditional_edges(
        "check_calculator",
        route_after_calculator,
        {
            "finish": "finish",
            "blocked": "acquisition_routing",
            "pending": "acquisition_routing",
            "no_calc": "acquisition_routing"
        }
    )

    # Acquisition routing
    builder.add_conditional_edges(
        "acquisition_routing",
        route_after_acquisition,
        {"has_task": "enforce_caps", "no_task": "check_completion"}
    )

    # Completion check
    builder.add_conditional_edges(
        "check_completion",
        route_after_completion,
        {"finish": "finish", "has_task": "enforce_caps", "continue": "llm_decide"}
    )

    # LLM decision
    builder.add_conditional_edges(
        "llm_decide",
        route_after_llm,
        {"finish": "finish", "post_process": "post_process"}
    )

    # Post-processing → cap enforcement
    builder.add_edge("post_process", "enforce_caps")

    # Cap enforcement routing
    builder.add_conditional_edges(
        "enforce_caps",
        route_after_enforce_caps,
        {"finish": "finish", "ask_user": "ask_user", "ok": "route_decision"}
    )

    # Final routing to action nodes
    builder.add_conditional_edges(
        "route_decision",
        route_decision,
        {
            "search": "search",
            "reflect": "reflect",
            "ask_user": "ask_user",
            "calculate": "calculate",
            "finish": "finish"
        }
    )

    # Action node returns
    builder.add_conditional_edges(
        "search",
        route_after_search,
        {"observe": "observe", "plan": "check_termination"}
    )
    builder.add_edge("observe", "check_termination")
    builder.add_edge("calculate", "check_termination")
    builder.add_edge("ask_user", "observe_user")
    builder.add_edge("observe_user", "check_region_currency")
    builder.add_edge("reflect", "check_termination")
    builder.add_edge("finish", END)

    return builder

Benefits

  1. Single Responsibility: Each node has one clear purpose
  2. Testability: Guard nodes can be tested in isolation
  3. Observability: Graph visualization shows decision flow clearly
  4. Maintainability: Changes to one concern don't affect others
  5. Debugging: Easier to trace which guard/condition triggered a path
  6. Extensibility: New guards or transformations can be inserted without modifying existing nodes

Migration Strategy

  1. Phase 1: Extract guard nodes (check_termination, check_region_currency, check_calculator, check_completion, enforce_caps)
  2. Phase 2: Extract transformation nodes (compose_schema, acquisition_routing, post_process)
  3. Phase 3: Extract LLM node (llm_decide)
  4. Phase 4: Update graph construction and routing
  5. Phase 5: Remove monolithic plan_node

Each phase should include: - Unit tests for new nodes - Integration tests for graph flow - Regression tests against existing behavior

State Schema Changes

Add routing hints to state (transient, not persisted):

class State(TypedDict):
    # ... existing fields ...
    _route: NotRequired[str]  # Transient routing hint
    _llm_failed: NotRequired[bool]  # Transient LLM status

These fields should be excluded from checkpointing and cleared after use.

Risks and Mitigations

Risk Mitigation
Increased graph complexity Clear node naming and documentation
State mutation ordering Explicit dependencies in graph edges
Checkpoint serialization Exclude transient _route fields
Performance overhead Minimal - same logic, different structure
Test coverage gaps Phase-by-phase migration with regression tests

Conclusion

This refactor transforms the monolithic plan node into a composable graph of single-responsibility nodes. The resulting architecture is more maintainable, testable, and observable while preserving all existing behavior.