Python · 4680 bytes Raw Blame History
1 """Deterministic runtime harness utilities for Loader tests."""
2
3 from __future__ import annotations
4
5 from dataclasses import dataclass
6 from pathlib import Path
7 from typing import Any
8
9 from loader.agent.loop import AgentConfig
10 from loader.llm.base import CompletionResponse, LLMBackend, Message, StreamChunk
11 from loader.runtime.events import AgentEvent
12 from loader.runtime.runtime_handle import RuntimeHandle
13 from loader.tools.base import ToolRegistry, create_default_registry
14
15
16 @dataclass
17 class BackendInvocation:
18 """Record of one backend request made by the agent."""
19
20 mode: str
21 messages: list[Message]
22 tools: list[dict[str, Any]] | None
23 temperature: float
24 max_tokens: int
25
26
27 @dataclass
28 class ScenarioRun:
29 """Captured result of a scripted runtime-owner scenario."""
30
31 response: str
32 events: list[AgentEvent]
33 invocations: list[BackendInvocation]
34 agent: RuntimeHandle
35
36
37 class ScriptedBackend(LLMBackend):
38 """LLM backend that replays scripted completions or stream chunks."""
39
40 def __init__(
41 self,
42 *,
43 completions: list[CompletionResponse] | None = None,
44 streams: list[list[StreamChunk]] | None = None,
45 supports_native_tools: bool = True,
46 ) -> None:
47 self._completions = list(completions or [])
48 self._streams = list(streams or [])
49 self._supports_native_tools = supports_native_tools
50 self.invocations: list[BackendInvocation] = []
51
52 def supports_native_tools(self) -> bool:
53 """Mirror Ollama's native-tool capability surface."""
54
55 return self._supports_native_tools
56
57 async def complete(
58 self,
59 messages: list[Message],
60 tools: list[dict[str, Any]] | None = None,
61 temperature: float = 0.7,
62 max_tokens: int = 4096,
63 ) -> CompletionResponse:
64 self.invocations.append(
65 BackendInvocation(
66 mode="complete",
67 messages=list(messages),
68 tools=tools,
69 temperature=temperature,
70 max_tokens=max_tokens,
71 )
72 )
73 if not self._completions:
74 return CompletionResponse(content="Done.")
75 return self._completions.pop(0)
76
77 async def stream(
78 self,
79 messages: list[Message],
80 tools: list[dict[str, Any]] | None = None,
81 temperature: float = 0.7,
82 max_tokens: int = 4096,
83 ):
84 self.invocations.append(
85 BackendInvocation(
86 mode="stream",
87 messages=list(messages),
88 tools=tools,
89 temperature=temperature,
90 max_tokens=max_tokens,
91 )
92 )
93 if not self._streams:
94 raise AssertionError("No scripted stream left for this scenario")
95 for chunk in self._streams.pop(0):
96 yield chunk
97
98 async def health_check(self) -> bool:
99 return True
100
101
102 async def run_scenario(
103 prompt: str,
104 backend: ScriptedBackend,
105 *,
106 registry: ToolRegistry | None = None,
107 config: AgentConfig | None = None,
108 project_root: Path | str | None = None,
109 on_confirmation=None,
110 on_user_question=None,
111 ) -> ScenarioRun:
112 """Run a scripted runtime scenario and collect emitted events."""
113
114 agent = RuntimeHandle(
115 backend=backend,
116 registry=registry or create_default_registry(project_root),
117 config=config or AgentConfig(auto_context=False),
118 project_root=project_root,
119 )
120 events: list[AgentEvent] = []
121
122 async def capture(event: AgentEvent) -> None:
123 events.append(event)
124
125 response = await agent.run(
126 prompt,
127 on_event=capture,
128 on_confirmation=on_confirmation,
129 on_user_question=on_user_question,
130 )
131 return ScenarioRun(
132 response=response,
133 events=events,
134 invocations=list(backend.invocations),
135 agent=agent,
136 )
137
138
139 async def run_explore_scenario(
140 prompt: str,
141 backend: ScriptedBackend,
142 *,
143 config: AgentConfig | None = None,
144 project_root: Path | str | None = None,
145 ) -> ScenarioRun:
146 """Run a scripted explore query through the runtime-first harness."""
147
148 agent = RuntimeHandle(
149 backend=backend,
150 config=config or AgentConfig(auto_context=False),
151 project_root=project_root,
152 )
153 events: list[AgentEvent] = []
154
155 async def capture(event: AgentEvent) -> None:
156 events.append(event)
157
158 response = await agent.run_explore(
159 prompt,
160 on_event=capture,
161 )
162 return ScenarioRun(
163 response=response,
164 events=events,
165 invocations=list(backend.invocations),
166 agent=agent,
167 )