Python · 5808 bytes Raw Blame History
1 """Tests for the runtime-owned decomposition lane."""
2
3 from __future__ import annotations
4
5 import json
6 from pathlib import Path
7
8 import pytest
9
10 from loader.agent.loop import Agent, AgentConfig
11 from loader.llm.base import CompletionResponse
12 from loader.runtime.decomposition_lane import DecompositionTurnRunner
13 from loader.runtime.deliberation import DECOMPOSITION_PROMPT
14 from tests.helpers.runtime_harness import ScriptedBackend
15
16
17 def _decomposition_json(*, subtasks: list[dict[str, object]]) -> CompletionResponse:
18 return CompletionResponse(content=json.dumps({"subtasks": subtasks}))
19
20
21 @pytest.mark.asyncio
22 async def test_decomposition_turn_runner_executes_subtasks_and_summarizes(
23 temp_dir: Path,
24 ) -> None:
25 backend = ScriptedBackend(
26 completions=[
27 _decomposition_json(
28 subtasks=[
29 {
30 "id": "1",
31 "description": "Read the spec",
32 "verification": "Spec is understood",
33 },
34 {
35 "id": "2",
36 "description": "Implement the feature",
37 "dependencies": ["1"],
38 "verification": "Tests pass",
39 },
40 ]
41 )
42 ]
43 )
44 agent = Agent(
45 backend=backend,
46 config=AgentConfig(auto_context=False, stream=False),
47 project_root=temp_dir,
48 )
49 events = []
50 calls: list[tuple[str, str | None, str | None]] = []
51
52 async def emit(event) -> None:
53 events.append(event)
54
55 async def run_task(
56 task: str,
57 _emit,
58 _on_confirmation,
59 _on_user_question,
60 requested_mode: str | None,
61 original_task: str | None,
62 ) -> str:
63 calls.append((task, requested_mode, original_task))
64 if task == "Read the spec":
65 return "Spec reviewed."
66 if task == "Implement the feature":
67 return "Feature implemented."
68 return "Feature shipped."
69
70 runner = DecompositionTurnRunner(agent, run_task=run_task)
71 response = await runner.run(
72 "Read the spec and implement the feature",
73 emit,
74 original_task="Read the spec and implement the feature",
75 )
76
77 assert response == "Feature shipped."
78 assert [call[0] for call in calls] == [
79 "Read the spec",
80 "Implement the feature",
81 (
82 "All subtasks completed for: Read the spec and implement the feature\n\n"
83 "Task: Read the spec and implement the feature\n\n"
84 "Subtasks:\n"
85 " ● 1. Read the spec\n"
86 " Verify: Spec is understood\n"
87 " ● 2. Implement the feature (after: 1)\n"
88 " Verify: Tests pass\n\n"
89 "Provide a brief summary of what was accomplished."
90 ),
91 ]
92 assert all(call[1] is None for call in calls)
93 assert all(
94 call[2] == "Read the spec and implement the feature"
95 for call in calls
96 )
97 assert [event.type for event in events] == [
98 "thinking",
99 "decomposition",
100 "subtask",
101 "subtask",
102 ]
103 assert backend.invocations[0].messages[1].content == DECOMPOSITION_PROMPT.format(
104 task="Read the spec and implement the feature"
105 )
106 assert [
107 message.content for message in agent.session.messages
108 ] == [
109 "Execute this subtask: Read the spec\n\nVerification: Spec is understood",
110 "Execute this subtask: Implement the feature\n\nVerification: Tests pass",
111 (
112 "All subtasks completed for: Read the spec and implement the feature\n\n"
113 "Task: Read the spec and implement the feature\n\n"
114 "Subtasks:\n"
115 " ● 1. Read the spec\n"
116 " Verify: Spec is understood\n"
117 " ● 2. Implement the feature (after: 1)\n"
118 " Verify: Tests pass\n\n"
119 "Provide a brief summary of what was accomplished."
120 ),
121 ]
122
123
124 @pytest.mark.asyncio
125 async def test_decomposition_turn_runner_returns_partial_completion_after_failed_retries(
126 temp_dir: Path,
127 ) -> None:
128 backend = ScriptedBackend(
129 completions=[
130 _decomposition_json(
131 subtasks=[
132 {
133 "id": "1",
134 "description": "Patch the file",
135 "verification": "File is updated",
136 },
137 {
138 "id": "2",
139 "description": "Run the tests",
140 "dependencies": ["1"],
141 "verification": "Tests are green",
142 },
143 ]
144 )
145 ]
146 )
147 agent = Agent(
148 backend=backend,
149 config=AgentConfig(auto_context=False, stream=False),
150 project_root=temp_dir,
151 )
152 events = []
153 responses = iter(["failed once", "failed again"])
154 calls: list[str] = []
155
156 async def emit(event) -> None:
157 events.append(event)
158
159 async def run_task(
160 task: str,
161 _emit,
162 _on_confirmation,
163 _on_user_question,
164 _requested_mode,
165 _original_task,
166 ) -> str:
167 calls.append(task)
168 return next(responses)
169
170 runner = DecompositionTurnRunner(agent, run_task=run_task)
171 response = await runner.run("Patch the file and run the tests", emit)
172
173 assert response.startswith("Task partially completed. Task: Patch the file and run the tests")
174 assert calls == ["Patch the file", "Patch the file"]
175 assert [event.content for event in events if event.type == "subtask"] == [
176 "[0/2] Patch the file",
177 "Retrying subtask: Patch the file",
178 "[0/2] Patch the file",
179 ]
180