"""Tests for no-tool text completion orchestration."""
from __future__ import annotations
from pathlib import Path
import pytest
from loader.agent.loop import Agent, AgentConfig
from loader.llm.base import Message, Role
from loader.runtime.conversation import ConversationRuntime
from loader.runtime.dod import VerificationEvidence
from loader.runtime.phases import TurnPhase
from loader.runtime.turn_completion import TurnCompletionAction
from loader.runtime.verification_observations import VerificationObservationStatus
from tests.helpers.runtime_harness import ScriptedBackend
def non_streaming_config() -> AgentConfig:
"""Shared config for direct turn-completion tests."""
return AgentConfig(auto_context=False, stream=False, max_iterations=8)
@pytest.mark.asyncio
async def test_turn_completion_requests_continuation_for_premature_text_response(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
agent = Agent(
backend=backend,
config=non_streaming_config(),
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task="Fix the README heading.",
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
decision = await runtime.turn_completion.handle_text_response(
content="I looked into it.",
response_content="I looked into it.",
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.CONTINUE
assert decision.continuation_count == 1
assert prepared.summary.completion_decision_code == "premature_completion_nudge"
assert prepared.summary.completion_decision_summary == (
"requested one continuation because the non-mutating response looked incomplete"
)
assert agent.session.last_completion_decision_code == "premature_completion_nudge"
assert [
entry.decision_code for entry in prepared.summary.completion_trace
] == ["premature_completion_nudge"]
assert prepared.summary.completion_trace[0].stage == "continuation_check"
assert [entry.kind for entry in prepared.summary.workflow_timeline[-1:]] == [
"completion_continue"
]
assert prepared.summary.workflow_timeline[-1].policy_stage == "continuation_check"
assert prepared.summary.workflow_timeline[-1].policy_outcome == "continue"
assert agent.session.messages[-1].role.value == "user"
assert "concrete evidence" in agent.session.messages[-1].content
assert "Carry out the requested change or command now" in agent.session.messages[-1].content
assert any(event.type == "completion_check" for event in events)
@pytest.mark.asyncio
async def test_turn_completion_marks_non_mutating_response_done(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
agent = Agent(
backend=backend,
config=non_streaming_config(),
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task="Explain Loader's clarify loop.",
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
decision = await runtime.turn_completion.handle_text_response(
content="Loader uses a bounded clarify loop before execution.",
response_content="Loader uses a bounded clarify loop before execution.",
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.COMPLETE
assert prepared.summary.final_response == (
"Loader uses a bounded clarify loop before execution."
)
assert prepared.summary.completion_decision_code == "non_mutating_response_accepted"
assert prepared.summary.completion_decision_summary == (
"accepted the response because no mutating work required verification"
)
assert agent.session.last_completion_decision_code == (
"non_mutating_response_accepted"
)
assert [
entry.decision_code for entry in prepared.summary.completion_trace
] == [
"completion_response_accepted",
"non_mutating_response_accepted",
]
policy_entries = [
entry
for entry in prepared.summary.workflow_timeline
if entry.kind.startswith("completion_")
]
assert [entry.kind for entry in policy_entries] == [
"completion_check",
"completion_complete",
]
assert policy_entries[0].policy_stage == "continuation_check"
assert policy_entries[-1].policy_stage == "definition_of_done"
assert [item.summary for item in prepared.summary.completion_trace[-1].evidence_provenance] == [
"verification was skipped because no mutating work required checks"
]
assert [
item.status
for item in prepared.summary.completion_trace[-1].verification_observations
] == [VerificationObservationStatus.SKIPPED.value]
assert [
item.summary
for item in prepared.summary.completion_trace[-1].verification_observations
] == ["verification was skipped because no mutating work required checks"]
assert [item.status for item in policy_entries[-1].verification_observations] == [
VerificationObservationStatus.SKIPPED.value
]
assert prepared.definition_of_done.status == "done"
assert prepared.definition_of_done.last_verification_result == "skipped"
assert any(event.type == "response" for event in events)
assert any(
event.type == "dod_status" and event.dod_status == "done"
for event in events
)
@pytest.mark.asyncio
async def test_turn_completion_blocks_false_completion_without_preserving_it(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
agent = Agent(
backend=backend,
config=non_streaming_config(),
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task=(
"Create a multi-file nginx guide under ~/Loader/guides/nginx "
"with an index and chapter files."
),
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
implementation_plan = temp_dir / "implementation.md"
implementation_plan.write_text(
"# Implementation Plan\n\n"
"## File Changes\n\n"
"1. Create main index.html file:\n"
" - `index.html`\n\n"
"2. Create chapter files:\n"
" - `chapters/01-getting-started.html`\n"
" - `chapters/06-troubleshooting.html`\n"
)
chapters_dir = temp_dir / "chapters"
chapters_dir.mkdir()
(chapters_dir / "01-getting-started.html").write_text("
Getting Started
\n")
(temp_dir / "index.html").write_text("NGINX Guide
\n")
prepared.definition_of_done.implementation_plan = str(implementation_plan)
prepared.definition_of_done.mutating_actions.append("write")
prepared.definition_of_done.touched_files.extend(
[
str(temp_dir / "index.html"),
str(chapters_dir / "01-getting-started.html"),
]
)
queued_messages: list[str] = []
runtime.context.queue_steering_message_callback = queued_messages.append
completion_claim = (
"I've successfully completed the NGINX guide with all planned files "
"and verified everything is done."
)
decision = await runtime.turn_completion.handle_text_response(
content=completion_claim,
response_content=completion_claim,
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.CONTINUE
assert prepared.summary.assistant_messages == []
assert not any(
message.role.value == "assistant" and message.content == completion_claim
for message in agent.session.messages
)
assert agent.session.messages[-1].role.value == "user"
assert agent.session.messages[-1].content.startswith(
"[PLANNED ARTIFACTS STILL MISSING]"
)
assert "`06-troubleshooting.html`" in agent.session.messages[-1].content
assert queued_messages
assert "06-troubleshooting.html" in queued_messages[-1]
assert "Do not summarize, mark completion, or write bookkeeping notes yet" in queued_messages[-1]
assert not any(event.type == "response" for event in events)
@pytest.mark.asyncio
async def test_turn_completion_interrupts_progress_intent_once_output_files_exist(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
agent = Agent(
backend=backend,
config=non_streaming_config(),
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task=(
"Create a multi-file nginx guide under ~/Loader/guides/nginx "
"with an index and chapter files."
),
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
implementation_plan = temp_dir / "implementation.md"
implementation_plan.write_text(
"# Implementation Plan\n\n"
"## File Changes\n\n"
"1. Create main index.html file:\n"
f" - `{temp_dir / 'index.html'}`\n\n"
"2. Create chapter files:\n"
f" - `{temp_dir / 'chapters' / '01-introduction.html'}`\n"
f" - `{temp_dir / 'chapters' / '02-installation.html'}`\n"
)
chapters_dir = temp_dir / "chapters"
chapters_dir.mkdir()
(temp_dir / "index.html").write_text("NGINX Guide
\n")
(chapters_dir / "01-introduction.html").write_text("Intro
\n")
prepared.definition_of_done.implementation_plan = str(implementation_plan)
prepared.definition_of_done.mutating_actions.append("write")
prepared.definition_of_done.touched_files.extend(
[
str(temp_dir / "index.html"),
str(chapters_dir / "01-introduction.html"),
]
)
prepared.definition_of_done.pending_items.append("Create chapter files for nginx guide")
content = "Now I'll create the second chapter file for the nginx guide."
decision = await runtime.turn_completion.handle_text_response(
content=content,
response_content=content,
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.CONTINUE
assert decision.continuation_count == 1
assert prepared.summary.completion_decision_code == "in_progress_transition_continue"
assert prepared.summary.assistant_messages[-1].content == content
assert agent.session.messages[-1].role.value == "user"
assert agent.session.messages[-1].content.startswith("[CONTINUE CURRENT STEP]")
assert "02-installation.html" in agent.session.messages[-1].content
assert not any(
message.role.value == "user"
and message.content.startswith("[PLANNED ARTIFACTS STILL MISSING]")
for message in agent.session.messages
)
@pytest.mark.asyncio
async def test_turn_completion_uses_quality_repair_prompt_for_rewrite_narration(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
config = non_streaming_config()
config.reasoning.completion_check = False
agent = Agent(
backend=backend,
config=config,
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task="Create an equally thorough HTML guide.",
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
chapter = temp_dir / "guides" / "nginx" / "chapters" / "01-introduction.html"
chapter.parent.mkdir(parents=True)
chapter.write_text("Intro
\n")
prepared.definition_of_done.touched_files.append(str(chapter))
prepared.definition_of_done.mutating_actions.append("write")
agent.session.append(
Message(
role=Role.USER,
content=(
"Repair focus:\n"
f"- Improve `{chapter}`: insufficient structured content "
"(12 blocks, expected at least 18).\n"
f"- Immediate next step: edit `{chapter}` with a substantial "
"expansion or replacement that satisfies its listed quality issue.\n"
),
)
)
content = (
"Let me try a different approach by rewriting the entire file with more "
"comprehensive content:"
)
decision = await runtime.turn_completion.handle_text_response(
content=content,
response_content=content,
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.CONTINUE
assert prepared.summary.completion_decision_code == "in_progress_transition_continue"
assert agent.session.messages[-1].role.value == "user"
assert agent.session.messages[-1].content.startswith("[CONTINUE QUALITY REPAIR]")
assert str(chapter.resolve(strict=False)) in agent.session.messages[-1].content
assert (
"one concrete `patch`, `edit`, or `write` tool call"
in agent.session.messages[-1].content
)
assert "Do not rewrite the whole file from memory" in agent.session.messages[-1].content
@pytest.mark.asyncio
async def test_turn_completion_uses_exact_anchor_after_stale_quality_repair_context(
temp_dir: Path,
) -> None:
backend = ScriptedBackend()
config = non_streaming_config()
config.reasoning.completion_check = False
agent = Agent(
backend=backend,
config=config,
project_root=temp_dir,
)
runtime = ConversationRuntime(agent)
events = []
async def capture(event) -> None:
events.append(event)
prepared = await runtime.turn_preparation.prepare(
task="Create an equally thorough HTML guide.",
emit=capture,
requested_mode="execute",
original_task=None,
on_user_question=None,
)
await runtime.phase_tracker.enter(
TurnPhase.ASSISTANT,
capture,
detail="Requesting assistant response",
reason_code="request_assistant_response",
)
chapter = temp_dir / "guides" / "nginx" / "chapters" / "05-load-balancing.html"
chapter.parent.mkdir(parents=True)
chapter.write_text("Load Balancing
\n")
prepared.definition_of_done.touched_files.append(str(chapter))
prepared.definition_of_done.mutating_actions.append("edit")
agent.session.append(
Message(
role=Role.USER,
content=(
"Repair focus:\n"
f"- Improve `{chapter}`: thin content "
"(846 text chars, expected at least 1758).\n"
f"- Immediate next step: edit `{chapter}`.\n"
),
)
)
agent.session.append(
Message(
role=Role.TOOL,
content=(
"Observation [edit]: Error: Failed to complete the operation after "
f"2 attempts for {chapter}. old_string not found in file."
),
)
)
content = "I'll rewrite the load balancing chapter with comprehensive content."
decision = await runtime.turn_completion.handle_text_response(
content=content,
response_content=content,
task=prepared.task,
effective_task=prepared.effective_task,
iterations=1,
max_iterations=agent.config.max_iterations,
actions_taken=[],
continuation_count=0,
dod=prepared.definition_of_done,
emit=capture,
summary=prepared.summary,
executor=prepared.executor,
rollback_plan=prepared.rollback_plan,
)
assert decision.action == TurnCompletionAction.CONTINUE
message = agent.session.messages[-1].content
assert message.startswith("[CONTINUE QUALITY REPAIR]")
assert "exactly one `edit(file_path=..., old_string=..., new_string=...)`" in message
assert "Use this exact `old_string` value from the current file" in message
assert "```html\n