Python · 4651 bytes Raw Blame History
1 """Tests for per-iteration turn prelude handling."""
2
3 from __future__ import annotations
4
5 from pathlib import Path
6
7 import pytest
8
9 from loader.agent.loop import Agent, AgentConfig
10 from loader.llm.base import Message, Role
11 from loader.runtime.conversation import ConversationRuntime
12 from tests.helpers.runtime_harness import ScriptedBackend
13
14
15 def non_streaming_config() -> AgentConfig:
16 """Shared config for direct turn-preamble tests."""
17
18 return AgentConfig(auto_context=False, stream=False, max_iterations=8)
19
20
21 async def _prepare_runtime(
22 runtime: ConversationRuntime,
23 *,
24 task: str,
25 ) -> tuple:
26 events = []
27
28 async def capture(event) -> None:
29 events.append(event)
30
31 prepared = await runtime.turn_preparation.prepare(
32 task=task,
33 emit=capture,
34 requested_mode="execute",
35 original_task=None,
36 on_user_question=None,
37 )
38 return prepared, events, capture
39
40
41 @pytest.mark.asyncio
42 async def test_turn_preamble_drains_steering_without_prefill_hint(
43 temp_dir: Path,
44 ) -> None:
45 backend = ScriptedBackend()
46 agent = Agent(
47 backend=backend,
48 config=non_streaming_config(),
49 project_root=temp_dir,
50 )
51 runtime = ConversationRuntime(agent)
52
53 prepared, events, capture = await _prepare_runtime(
54 runtime,
55 task="Create a README for the runtime controller.",
56 )
57 agent.messages.append(Message(role=Role.USER, content=prepared.task))
58 agent.queue_steering_message("Stay inside src/loader/runtime.")
59
60 decision = await runtime.turn_preamble.prepare_iteration(
61 task=prepared.task,
62 original_task=None,
63 iterations=1,
64 dod=prepared.definition_of_done,
65 emit=capture,
66 summary=prepared.summary,
67 on_user_question=None,
68 executor=prepared.executor,
69 )
70
71 assert not decision.should_continue
72 assert prepared.summary.iterations == 1
73 assert not any(
74 message.role.value == "assistant" and message.content == "["
75 for message in agent.session.messages
76 )
77 assert (
78 agent.session.messages[-1].content
79 == "[USER INTERRUPTION]: Stay inside src/loader/runtime."
80 )
81 assert any(
82 event.type == "steering" and event.content == "Stay inside src/loader/runtime."
83 for event in events
84 )
85
86
87 @pytest.mark.asyncio
88 async def test_turn_preamble_keeps_ephemeral_steering_out_of_model_history(
89 temp_dir: Path,
90 ) -> None:
91 backend = ScriptedBackend()
92 agent = Agent(
93 backend=backend,
94 config=non_streaming_config(),
95 project_root=temp_dir,
96 )
97 runtime = ConversationRuntime(agent)
98
99 prepared, events, capture = await _prepare_runtime(
100 runtime,
101 task="Create a README for the runtime controller.",
102 )
103 agent.messages.append(Message(role=Role.USER, content=prepared.task))
104 agent.queue_ephemeral_steering_message("Create 01-introduction.html now.")
105
106 decision = await runtime.turn_preamble.prepare_iteration(
107 task=prepared.task,
108 original_task=None,
109 iterations=1,
110 dod=prepared.definition_of_done,
111 emit=capture,
112 summary=prepared.summary,
113 on_user_question=None,
114 executor=prepared.executor,
115 )
116
117 assert not decision.should_continue
118 assert not any(
119 message.content == "[USER INTERRUPTION]: Create 01-introduction.html now."
120 for message in agent.session.messages
121 )
122 assert any(
123 event.type == "steering" and event.content == "Create 01-introduction.html now."
124 for event in events
125 )
126
127
128 @pytest.mark.asyncio
129 async def test_turn_preamble_skips_iteration_when_recovery_refreshes(
130 temp_dir: Path,
131 ) -> None:
132 backend = ScriptedBackend()
133 agent = Agent(
134 backend=backend,
135 config=non_streaming_config(),
136 project_root=temp_dir,
137 )
138 runtime = ConversationRuntime(agent)
139
140 prepared, _, capture = await _prepare_runtime(
141 runtime,
142 task="Explain the runtime controller shape.",
143 )
144 calls: list[str] = []
145
146 async def fake_refresh(**kwargs) -> bool:
147 calls.append(kwargs["task"])
148 return True
149
150 runtime.workflow_recovery.maybe_refresh_plan_for_drift = fake_refresh
151
152 decision = await runtime.turn_preamble.prepare_iteration(
153 task=prepared.task,
154 original_task="Original task text",
155 iterations=2,
156 dod=prepared.definition_of_done,
157 emit=capture,
158 summary=prepared.summary,
159 on_user_question=None,
160 executor=prepared.executor,
161 )
162
163 assert decision.should_continue
164 assert calls == ["Original task text"]