Planner Graph Refactor Proposal: Decomposing the plan Node¶
Problem Statement¶
The current plan node in src/venturescope/planner/agent.py (lines 846-1192, ~350 lines) is a monolithic function that handles:
- Early termination checks (aborted state, max iterations)
- Region/currency prerequisite questions
- Schema composition and dynamic decomposition generation
- Calculator status evaluation
- Acquisition task routing (blocked calculator, missing fields)
- Auto-finish logic (all inputs collected)
- LLM-based decision making
- Post-LLM decomposition generation
- Decision redirects (derived fields, premature asks)
- Search/ask-user cap enforcement
- Final decision adjustment and event emission
This violates single-responsibility principles and makes the graph structure opaque. The plan node is effectively a state machine within a state machine.
Proposed Architecture¶
Decompose plan into graph-level nodes with explicit routing:
flowchart TD
planner_start([START]) --> check_termination
check_termination -->|aborted or max_iters| finish
check_termination -->|continue| check_region_currency
check_region_currency -->|needs region| ask_region[ask_region]
check_region_currency -->|needs currency| ask_currency[ask_currency]
check_region_currency -->|ready| compose_schema
ask_region --> observe_user
ask_currency --> observe_user
observe_user --> check_region_currency
compose_schema --> check_calculator
check_calculator -->|calc cap reached| finish
check_calculator -->|calc succeeded| finish
check_calculator -->|calc blocked| acquisition_routing
check_calculator -->|no calc or pending| acquisition_routing
acquisition_routing -->|has task| decide_action
acquisition_routing -->|no task| check_completion
check_completion -->|all collected| finish
check_completion -->|missing fields| decide_action
decide_action --> llm_decide
llm_decide -->|LLM failure| finish
llm_decide -->|valid decision| post_process
post_process --> enforce_caps
enforce_caps -->|search cap| ask_user
enforce_caps -->|ask cap| finish
enforce_caps -->|ok| route_decision
route_decision -->|search| search
route_decision -->|ask_user| ask_user
route_decision -->|reflect| reflect
route_decision -->|calculate| calculate
route_decision -->|finish| finish
search -->|has observation| observe
search -->|no hits| check_termination
observe --> check_termination
calculate --> check_termination
ask_user --> observe_user
reflect --> check_termination
finish --> planner_end([END])
Node Decomposition¶
1. check_termination (Guard Node)¶
Responsibility: Early exit conditions
def check_termination_node(state: State) -> dict[str, Any]:
settings = get_planner_settings()
iters = state["iterations"] + 1
if state.get("status") == "aborted":
return {
"decision": PlannerDecision(action="finish", reasoning="Planner already aborted."),
"iterations": iters,
"status": "aborted",
"_route": "finish"
}
if iters > settings.max_iters:
return {
"decision": PlannerDecision(
action="finish",
reasoning=f"Iteration cap {settings.max_iters} reached."
),
"iterations": iters,
"status": "running",
"_route": "finish"
}
return {"iterations": iters, "_route": "continue"}
def route_after_termination(state: State) -> str:
return state.get("_route", "continue")
2. check_region_currency (Guard Node)¶
Responsibility: Prerequisite field collection
def check_region_currency_node(state: State) -> dict[str, Any]:
if _needs_region_question(state):
return {
"decision": PlannerDecision(
action="ask_user",
target_field=_REGION_FIELD,
user_question="Where is this business based? ...",
reasoning="Confirm region before any value lookups.",
suggested_answers=[]
),
"_route": "ask_region"
}
if _needs_currency_question(state):
return {
"decision": PlannerDecision(
action="ask_user",
target_field=_CURRENCY_FIELD,
user_question="What reporting currency should I use? ...",
reasoning="Confirm reporting currency before any value lookups.",
suggested_answers=_currency_suggestions_for(state["idea"].location)
),
"_route": "ask_currency"
}
return {"_route": "ready"}
def route_after_region_currency(state: State) -> str:
return state.get("_route", "ready")
3. compose_schema (Transformation Node)¶
Responsibility: Schema composition and decomposition management
def compose_schema_node(state: State) -> dict[str, Any]:
dynamic_decomps = dict(state.get("dynamic_decompositions") or {})
# Proactive decomposition
dynamic_decomps = _proactive_decompositions(
state.get("static_field_acquisition") or {},
state["schema"],
dynamic_decomps,
state["idea"]
)
all_recipes = build_dynamic_recipes(
dynamic_decomps,
state.get("static_field_acquisition") or {}
)
try:
schema_dict = compose_ready_fields(
state["schema"],
all_recipes,
expected_currency=state["idea"].currency
)
except ValueError as exc:
log.warning(f"component composition failed: {exc}")
schema_dict = state["schema"]
schema_changed = schema_dict != state["schema"]
out: dict[str, Any] = {}
if schema_changed:
out["schema"] = schema_dict
if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
out["dynamic_decompositions"] = dynamic_decomps
return out
4. check_calculator (Guard Node)¶
Responsibility: Calculator state evaluation
def check_calculator_node(state: State) -> dict[str, Any]:
settings = get_planner_settings()
profile_name = state.get("profile_name")
if not _profile_has_calculator(profile_name):
return {"_route": "no_calc"}
if (
state.get("calculation_attempts", 0) >= settings.max_calculation_attempts
and state.get("last_calculation_status") in {CalculatorRunStatus.BLOCKED.value, "ERROR"}
):
return {
"decision": PlannerDecision(
action="finish",
reasoning=f"Calculation cap {settings.max_calculation_attempts} reached."
),
"status": "aborted",
"_route": "finish"
}
if _successful_calculation_current(state):
return {
"decision": PlannerDecision(
action="finish",
reasoning="Deterministic calculator completed successfully."
),
"_route": "finish"
}
if state.get("last_calculation_status") == CalculatorRunStatus.BLOCKED.value:
return {"_route": "blocked"}
return {"_route": "pending"}
def route_after_calculator(state: State) -> str:
return state.get("_route", "pending")
5. acquisition_routing (Decision Node)¶
Responsibility: Handle blocked calculator and missing field acquisition
def acquisition_routing_node(state: State) -> dict[str, Any]:
settings = get_planner_settings()
dynamic_decomps = dict(state.get("dynamic_decompositions") or {})
all_recipes = build_dynamic_recipes(
dynamic_decomps,
state.get("static_field_acquisition") or {}
)
acquisition_task = None
# Blocked calculator path
if state.get("last_calculation_status") == CalculatorRunStatus.BLOCKED.value:
acquisition_task = next_acquisition_task(
recipes=all_recipes,
schema_dict=state["schema"],
attempts=state["attempts"],
idea=state["idea"],
last_calculation_errors=state.get("last_calculation_errors", []),
max_attempts_per_field=settings.max_attempts_per_field
)
if acquisition_task is None:
# Try dynamic decomposition for blocked field
blocked_path = _first_blocked_field_without_recipe(
state.get("last_calculation_errors", []),
all_recipes
)
if blocked_path and blocked_path not in dynamic_decomps:
specs = generate_decomposition(blocked_path, state["idea"], _llm())
dynamic_decomps = {**dynamic_decomps, blocked_path: specs or []}
if specs:
all_recipes = build_dynamic_recipes(
dynamic_decomps,
state.get("static_field_acquisition") or {}
)
if blocked_path and blocked_path in dynamic_decomps:
acquisition_task = next_acquisition_task(
recipes=all_recipes,
schema_dict=state["schema"],
attempts=state["attempts"],
idea=state["idea"],
last_calculation_errors=state.get("last_calculation_errors", []),
max_attempts_per_field=settings.max_attempts_per_field,
only_field=blocked_path
)
if acquisition_task is not None:
decision = _decision_from_acquisition_task(acquisition_task)
decision = _adjust_calculation_decision(state, decision)
out = {
"decision": decision,
"_route": "has_task"
}
if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
out["dynamic_decompositions"] = dynamic_decomps
return out
return {"_route": "no_task"}
def route_after_acquisition(state: State) -> str:
return state.get("_route", "no_task")
6. check_completion (Guard Node)¶
Responsibility: Auto-finish when all inputs collected
def check_completion_node(state: State) -> dict[str, Any]:
recipes = build_dynamic_recipes(
state.get("dynamic_decompositions") or {},
state.get("static_field_acquisition") or {}
)
auto_finish_ok = True
for path, leaf in iter_schema_leaves(state["schema"]):
if is_derived_field(path, recipes):
continue
src = field_source(path)
leaf_value = leaf.get("value") if isinstance(leaf, dict) else None
if src in ("web", "either") and leaf_value is None:
auto_finish_ok = False
break
if src == "user" and leaf_value is None and not state["attempts"].get(path):
auto_finish_ok = False
break
missing = missing_leaves(state["schema"])
actionable_missing = actionable_missing_fields(missing, recipes)
open_acquisition_tasks = acquisition_task_summary(recipes, state["schema"])
if not actionable_missing and open_acquisition_tasks:
# Try to get next acquisition task
settings = get_planner_settings()
acquisition_task = next_acquisition_task(
recipes=recipes,
schema_dict=state["schema"],
attempts=state["attempts"],
idea=state["idea"],
last_calculation_errors=state.get("last_calculation_errors", []),
max_attempts_per_field=settings.max_attempts_per_field
)
if acquisition_task is not None:
decision = _decision_from_acquisition_task(acquisition_task)
decision = _adjust_calculation_decision(state, decision)
return {"decision": decision, "_route": "has_task"}
if auto_finish_ok and not actionable_missing and not open_acquisition_tasks:
decision = PlannerDecision(
action="finish",
reasoning="All raw inputs collected."
)
decision = _adjust_calculation_decision(state, decision)
return {"decision": decision, "_route": "finish"}
return {"_route": "continue"}
def route_after_completion(state: State) -> str:
return state.get("_route", "continue")
7. llm_decide (LLM Node)¶
Responsibility: LLM-based decision making
def llm_decide_node(state: State) -> dict[str, Any]:
settings = get_planner_settings()
recipes = build_dynamic_recipes(
state.get("dynamic_decompositions") or {},
state.get("static_field_acquisition") or {}
)
missing = missing_leaves(state["schema"])
actionable_missing = actionable_missing_fields(missing, recipes)
open_acquisition_tasks = acquisition_task_summary(recipes, state["schema"])
system, user = planner_prompt(
idea=state["idea"],
schema_dict=state["schema"],
attempts=state["attempts"],
notes=state["notes"],
iterations=state["iterations"],
max_iters=settings.max_iters,
field_sources=_field_sources(state["schema"], recipes),
missing=actionable_missing,
acquisition_tasks=open_acquisition_tasks,
profile_name=state.get("profile_name", ""),
terminology=state.get("terminology", {}),
critical_parameters=state.get("critical_parameters", []),
last_calculation_status=state.get("last_calculation_status"),
last_calculation_errors=state.get("last_calculation_errors", []),
composite_fields=list(
_requires_components_paths(state.get("static_field_acquisition") or {})
)
)
try:
decision = _llm().structured(system, user, schema=PlannerDecision, temperature=0.0)
return {"decision": decision, "_llm_failed": False}
except (LLMInvalidOutputError, LLMTimeoutError, LLMError, ValidationError, ValueError) as e:
log.warning(f"planner structured-output failure: {e} — finishing")
return {
"decision": PlannerDecision(
action="finish",
reasoning=f"Planner output invalid: {e}"
),
"_llm_failed": True,
"_route": "finish"
}
def route_after_llm(state: State) -> str:
if state.get("_llm_failed"):
return "finish"
return "post_process"
8. post_process (Transformation Node)¶
Responsibility: Post-LLM decomposition and redirects
def post_process_node(state: State) -> dict[str, Any]:
decision = state["decision"]
dynamic_decomps = dict(state.get("dynamic_decompositions") or {})
recipes = build_dynamic_recipes(
dynamic_decomps,
state.get("static_field_acquisition") or {}
)
# Generate decomposition for requires-components fields
if (
decision.action in {"search", "ask_user"}
and decision.target_field
and decision.target_component is None
and decision.target_field in _requires_components_paths(
state.get("static_field_acquisition") or {}
)
and decision.target_field not in dynamic_decomps
):
specs = generate_decomposition(decision.target_field, state["idea"], _llm())
dynamic_decomps = {**dynamic_decomps, decision.target_field: specs or []}
if specs:
recipes = build_dynamic_recipes(
dynamic_decomps,
state.get("static_field_acquisition") or {}
)
# Apply redirects
decision = _redirect_derived_direct_decision(state, recipes, decision)
decision = _redirect_premature_ask_for_web_field(state, recipes, decision)
out = {"decision": decision}
if dynamic_decomps != (state.get("dynamic_decompositions") or {}):
out["dynamic_decompositions"] = dynamic_decomps
return out
9. enforce_caps (Guard Node)¶
Responsibility: Search and ask-user cap enforcement
def enforce_caps_node(state: State) -> dict[str, Any]:
settings = get_planner_settings()
decision = state["decision"]
recipes = build_dynamic_recipes(
state.get("dynamic_decompositions") or {},
state.get("static_field_acquisition") or {}
)
# Search cap
if decision.action == "search" and decision.target_field:
same = [
a for a in _attempts_for(state, decision.target_field, decision.target_component)
if a.action == "search"
]
prior_queries = {a.query_or_question for a in same if a.query_or_question}
duplicate = decision.search_query in prior_queries
if len(same) >= settings.max_attempts_per_field or duplicate:
reason = "duplicate query" if duplicate else f"cap {settings.max_attempts_per_field}"
log.info(f"field {decision.target_field} search loop ({reason}) → forcing ask_user")
if decision.target_component:
task = next_acquisition_task(
recipes=recipes,
schema_dict=state["schema"],
attempts=state["attempts"],
idea=state["idea"],
last_calculation_errors=state.get("last_calculation_errors", []),
max_attempts_per_field=settings.max_attempts_per_field,
only_field=decision.target_field
)
decision = (
_decision_from_acquisition_task(task)
if task is not None
else PlannerDecision(action="reflect", reasoning="Search cap reached.")
)
else:
decision = PlannerDecision(
action="ask_user",
target_field=decision.target_field,
user_question=f"I couldn't find reliable data for {decision.target_field}. ...",
reasoning="Per-field search cap reached."
)
return {"decision": decision, "_route": "ask_user"}
# Ask-user cap
if decision.action == "ask_user" and decision.target_field:
same = [
a for a in _attempts_for(state, decision.target_field, decision.target_component)
if a.action == "ask_user"
]
if len(same) >= settings.max_attempts_per_field:
log.info(f"field {decision.target_field} ask_user cap reached → finishing")
decision = PlannerDecision(
action="finish",
reasoning=f"Ask-user cap {settings.max_attempts_per_field} reached for {decision.target_field}."
)
decision = _adjust_calculation_decision(state, decision)
return {"decision": decision, "status": "aborted", "_route": "finish"}
# Final adjustment
if not state.get("_llm_failed"):
decision = _adjust_calculation_decision(state, decision)
return {"decision": decision, "_route": "ok"}
def route_after_enforce_caps(state: State) -> str:
return state.get("_route", "ok")
10. route_decision (Router Node)¶
Responsibility: Final routing based on decision action
def route_decision(state: State) -> str:
decision = state["decision"]
assert decision is not None
_emit_decision_event(decision)
return decision.action
Graph Construction¶
def _build_state_graph() -> Any:
builder = StateGraph(State)
# Guard nodes
builder.add_node("check_termination", check_termination_node)
builder.add_node("check_region_currency", check_region_currency_node)
builder.add_node("check_calculator", check_calculator_node)
builder.add_node("check_completion", check_completion_node)
builder.add_node("enforce_caps", enforce_caps_node)
# Transformation nodes
builder.add_node("compose_schema", compose_schema_node)
builder.add_node("acquisition_routing", acquisition_routing_node)
builder.add_node("post_process", post_process_node)
# LLM node
builder.add_node("llm_decide", llm_decide_node)
# Action nodes
builder.add_node("search", search_node)
builder.add_node("observe", observe_node)
builder.add_node("calculate", calculate_node)
builder.add_node("ask_user", ask_user_node)
builder.add_node("observe_user", observe_user_node)
builder.add_node("reflect", reflect_node)
builder.add_node("finish", finish_node)
# Entry
builder.add_edge(START, "check_termination")
# Termination routing
builder.add_conditional_edges(
"check_termination",
route_after_termination,
{"finish": "finish", "continue": "check_region_currency"}
)
# Region/currency routing
builder.add_conditional_edges(
"check_region_currency",
route_after_region_currency,
{
"ask_region": "ask_user",
"ask_currency": "ask_user",
"ready": "compose_schema"
}
)
# Schema composition → calculator check
builder.add_edge("compose_schema", "check_calculator")
# Calculator routing
builder.add_conditional_edges(
"check_calculator",
route_after_calculator,
{
"finish": "finish",
"blocked": "acquisition_routing",
"pending": "acquisition_routing",
"no_calc": "acquisition_routing"
}
)
# Acquisition routing
builder.add_conditional_edges(
"acquisition_routing",
route_after_acquisition,
{"has_task": "enforce_caps", "no_task": "check_completion"}
)
# Completion check
builder.add_conditional_edges(
"check_completion",
route_after_completion,
{"finish": "finish", "has_task": "enforce_caps", "continue": "llm_decide"}
)
# LLM decision
builder.add_conditional_edges(
"llm_decide",
route_after_llm,
{"finish": "finish", "post_process": "post_process"}
)
# Post-processing → cap enforcement
builder.add_edge("post_process", "enforce_caps")
# Cap enforcement routing
builder.add_conditional_edges(
"enforce_caps",
route_after_enforce_caps,
{"finish": "finish", "ask_user": "ask_user", "ok": "route_decision"}
)
# Final routing to action nodes
builder.add_conditional_edges(
"route_decision",
route_decision,
{
"search": "search",
"reflect": "reflect",
"ask_user": "ask_user",
"calculate": "calculate",
"finish": "finish"
}
)
# Action node returns
builder.add_conditional_edges(
"search",
route_after_search,
{"observe": "observe", "plan": "check_termination"}
)
builder.add_edge("observe", "check_termination")
builder.add_edge("calculate", "check_termination")
builder.add_edge("ask_user", "observe_user")
builder.add_edge("observe_user", "check_region_currency")
builder.add_edge("reflect", "check_termination")
builder.add_edge("finish", END)
return builder
Benefits¶
- Single Responsibility: Each node has one clear purpose
- Testability: Guard nodes can be tested in isolation
- Observability: Graph visualization shows decision flow clearly
- Maintainability: Changes to one concern don't affect others
- Debugging: Easier to trace which guard/condition triggered a path
- Extensibility: New guards or transformations can be inserted without modifying existing nodes
Migration Strategy¶
- Phase 1: Extract guard nodes (
check_termination,check_region_currency,check_calculator,check_completion,enforce_caps) - Phase 2: Extract transformation nodes (
compose_schema,acquisition_routing,post_process) - Phase 3: Extract LLM node (
llm_decide) - Phase 4: Update graph construction and routing
- Phase 5: Remove monolithic
plan_node
Each phase should include: - Unit tests for new nodes - Integration tests for graph flow - Regression tests against existing behavior
State Schema Changes¶
Add routing hints to state (transient, not persisted):
class State(TypedDict):
# ... existing fields ...
_route: NotRequired[str] # Transient routing hint
_llm_failed: NotRequired[bool] # Transient LLM status
These fields should be excluded from checkpointing and cleared after use.
Risks and Mitigations¶
| Risk | Mitigation |
|---|---|
| Increased graph complexity | Clear node naming and documentation |
| State mutation ordering | Explicit dependencies in graph edges |
| Checkpoint serialization | Exclude transient _route fields |
| Performance overhead | Minimal - same logic, different structure |
| Test coverage gaps | Phase-by-phase migration with regression tests |
Conclusion¶
This refactor transforms the monolithic plan node into a composable graph of single-responsibility nodes. The resulting architecture is more maintainable, testable, and observable while preserving all existing behavior.