Python · 23517 bytes Raw Blame History
1 """Runtime-owned helpers for public shell prompt and session lifecycle."""
2
3 from __future__ import annotations
4
5 import asyncio
6 import contextlib
7 import inspect
8 from collections import deque
9 from collections.abc import AsyncIterator, Awaitable, Callable
10 from dataclasses import dataclass
11 from pathlib import Path
12 from typing import Any, Protocol
13
14 from ..context.project import ProjectContext
15 from ..llm.base import Message, Role
16 from ..tools.base import ToolRegistry
17 from .capabilities import CapabilityProfile, resolve_backend_capability_profile
18 from .dod import DefinitionOfDoneStore
19 from .events import AgentEvent, TurnSummary
20 from .launcher import build_runtime_launcher
21 from .owner_metadata import build_runtime_owner_metadata
22 from .permissions import PermissionConfigStatus, PermissionMode, PermissionPolicy
23 from .prompt_history import PromptSnapshot
24 from .prompting import build_system_prompt_result
25 from .session import ConversationSession
26
27
28 @dataclass(slots=True)
29 class RuntimePromptState:
30 """Rendered runtime prompt plus the session metadata it updates."""
31
32 system_message: Message
33 prompt_format: str
34 prompt_sections: list[str]
35
36
37 @dataclass(slots=True)
38 class RestoredSessionState:
39 """Mutable agent-shell state reconstructed from a persisted session."""
40
41 messages: list[Message]
42 current_task: str | None
43 workflow_mode: str
44 permission_mode: str
45 prompt_format: str | None
46 prompt_sections: list[str]
47 last_completion_decision_code: str | None
48 last_completion_decision_summary: str | None
49 last_turn_summary: TurnSummary | None
50
51
52 @dataclass(slots=True)
53 class RuntimeSessionInstall:
54 """A persisted session plus the shell state restored from it."""
55
56 session: ConversationSession
57 restored: RestoredSessionState
58
59
60 @dataclass(slots=True)
61 class CapabilityRefresh:
62 """Result of recomputing the active capability profile."""
63
64 capability_profile: CapabilityProfile
65 prompt_reset_required: bool
66
67
68 class SteeringMailbox:
69 """Small public-shell owner for steering and running-state bookkeeping."""
70
71 def __init__(self) -> None:
72 self._pending: deque[str] = deque()
73 self._is_running = False
74
75 @property
76 def is_running(self) -> bool:
77 """Return whether the owning shell is currently running."""
78
79 return self._is_running
80
81 def mark_running(self) -> None:
82 """Mark the owning shell as currently running."""
83
84 self._is_running = True
85
86 def mark_idle(self) -> None:
87 """Mark the owning shell as currently idle."""
88
89 self._is_running = False
90
91 def steer(self, message: str) -> bool:
92 """Queue one steering message only when the owner is running."""
93
94 if not self._is_running:
95 return False
96 self.queue(message)
97 return True
98
99 def queue(self, message: str) -> None:
100 """Queue one steering message regardless of running state."""
101
102 self._pending.append(message)
103
104 def drain(self) -> list[str]:
105 """Drain all pending steering messages in FIFO order."""
106
107 drained = list(self._pending)
108 self._pending.clear()
109 return drained
110
111 def clear(self) -> None:
112 """Drop queued steering state and mark the owner idle."""
113
114 self._pending.clear()
115 self._is_running = False
116
117
118 class RuntimeShellConfigProtocol(Protocol):
119 """Typed view of shell config used for session lifecycle helpers."""
120
121 force_react: bool
122 session_rotate_after_bytes: int
123 session_auto_compaction_input_tokens_threshold: int
124 session_compaction_keep_last_messages: int
125
126
127 class RuntimeShellOwner(Protocol):
128 """Typed public-shell owner for session lifecycle helpers."""
129
130 project_root: Path
131 backend: Any
132 registry: ToolRegistry
133 session: ConversationSession
134 messages: list[Message]
135 config: RuntimeShellConfigProtocol
136 project_context: ProjectContext | None
137 permission_policy: PermissionPolicy
138 permission_config_status: PermissionConfigStatus
139 capability_profile: CapabilityProfile
140 workflow_mode: str
141 prompt_format: str | None
142 prompt_sections: list[str]
143 current_task: str | None
144 last_turn_summary: TurnSummary | None
145 steering: SteeringMailbox
146 safeguards: Any
147 _system_message: Message | None
148 _use_react: bool | None
149
150 def set_workflow_mode(self, workflow_mode: str) -> None:
151 """Update the active workflow mode."""
152
153 def queue_steering_message(self, message: str) -> None:
154 """Queue one steering message for the runtime."""
155
156 def drain_steering_messages(self) -> list[str]:
157 """Drain queued steering messages."""
158
159 def refresh_capability_profile(self) -> None:
160 """Refresh the active capability profile."""
161
162 def _get_system_message(self) -> Message:
163 """Build the active system message."""
164
165 def _get_few_shot_examples(self) -> list[Message]:
166 """Build the active few-shot examples."""
167
168
169 def create_runtime_session(
170 *,
171 project_root: Path,
172 messages: list[Message] | None,
173 permission_policy: PermissionPolicy,
174 permission_config_status: PermissionConfigStatus,
175 prompt_format: str | None,
176 prompt_sections: list[str],
177 workflow_mode: str,
178 runtime_owner_type: str | None,
179 runtime_owner_path: str | None,
180 rotate_after_bytes: int,
181 auto_compaction_input_tokens_threshold: int,
182 compaction_keep_last_messages: int,
183 system_message_factory: Callable[[], Message],
184 few_shot_factory: Callable[[], list[Message]],
185 ) -> ConversationSession:
186 """Create one persisted conversation session from public shell state."""
187
188 return ConversationSession(
189 system_message_factory=system_message_factory,
190 few_shot_factory=few_shot_factory,
191 project_root=project_root,
192 messages=messages or [],
193 runtime_owner_type=runtime_owner_type,
194 runtime_owner_path=runtime_owner_path,
195 permission_mode=permission_policy.active_mode.as_str(),
196 permission_prompting_enabled=permission_policy.prompting_enabled,
197 permission_rule_counts=_copy_rule_counts(permission_policy.rule_counts()),
198 permission_rules_source=str(permission_config_status.source_path),
199 prompt_format=prompt_format,
200 prompt_sections=list(prompt_sections),
201 workflow_mode=workflow_mode,
202 rotate_after_bytes=rotate_after_bytes,
203 auto_compaction_input_tokens_threshold=(
204 auto_compaction_input_tokens_threshold
205 ),
206 compaction_keep_last_messages=compaction_keep_last_messages,
207 )
208
209
210 def create_runtime_session_install(
211 *,
212 project_root: Path,
213 messages: list[Message] | None,
214 permission_policy: PermissionPolicy,
215 permission_config_status: PermissionConfigStatus,
216 prompt_format: str | None,
217 prompt_sections: list[str],
218 workflow_mode: str,
219 runtime_owner_type: str | None,
220 runtime_owner_path: str | None,
221 rotate_after_bytes: int,
222 auto_compaction_input_tokens_threshold: int,
223 compaction_keep_last_messages: int,
224 system_message_factory: Callable[[], Message],
225 few_shot_factory: Callable[[], list[Message]],
226 ) -> RuntimeSessionInstall:
227 """Create a fresh persisted session and its restored shell view."""
228
229 session = create_runtime_session(
230 project_root=project_root,
231 messages=messages,
232 permission_policy=permission_policy,
233 permission_config_status=permission_config_status,
234 prompt_format=prompt_format,
235 prompt_sections=prompt_sections,
236 workflow_mode=workflow_mode,
237 runtime_owner_type=runtime_owner_type,
238 runtime_owner_path=runtime_owner_path,
239 rotate_after_bytes=rotate_after_bytes,
240 auto_compaction_input_tokens_threshold=(
241 auto_compaction_input_tokens_threshold
242 ),
243 compaction_keep_last_messages=compaction_keep_last_messages,
244 system_message_factory=system_message_factory,
245 few_shot_factory=few_shot_factory,
246 )
247 return RuntimeSessionInstall(
248 session=session,
249 restored=restore_runtime_session_state(
250 project_root=project_root,
251 session=session,
252 ),
253 )
254
255
256 def apply_runtime_session_install(
257 owner: RuntimeShellOwner,
258 install: RuntimeSessionInstall,
259 ) -> None:
260 """Apply one restored runtime session onto the public shell owner."""
261
262 owner.steering.clear()
263 owner.session = install.session
264 owner.messages = install.restored.messages
265 owner.current_task = install.restored.current_task
266 owner.set_workflow_mode(install.restored.workflow_mode)
267 owner.permission_policy.active_mode = PermissionMode.from_str(
268 install.restored.permission_mode
269 )
270 owner.prompt_format = install.restored.prompt_format
271 owner.prompt_sections = list(install.restored.prompt_sections)
272 owner.last_turn_summary = install.restored.last_turn_summary
273 owner._system_message = None
274 owner_metadata = build_runtime_owner_metadata(owner)
275 if (
276 install.session.runtime_owner_type != owner_metadata["owner_type"]
277 or install.session.runtime_owner_path != owner_metadata["owner_path"]
278 ):
279 install.session.update_runtime_state(
280 runtime_owner_type=owner_metadata["owner_type"],
281 runtime_owner_path=owner_metadata["owner_path"],
282 )
283
284
285 def build_fresh_runtime_session_install(
286 owner: RuntimeShellOwner,
287 *,
288 messages: list[Message] | None = None,
289 workflow_mode: str | None = None,
290 ) -> RuntimeSessionInstall:
291 """Build a fresh runtime session install from the current public shell."""
292
293 owner_metadata = build_runtime_owner_metadata(owner)
294 return create_runtime_session_install(
295 project_root=owner.project_root,
296 messages=messages,
297 permission_policy=owner.permission_policy,
298 permission_config_status=owner.permission_config_status,
299 prompt_format=owner.prompt_format,
300 prompt_sections=list(owner.prompt_sections),
301 workflow_mode=workflow_mode or owner.workflow_mode,
302 runtime_owner_type=owner_metadata["owner_type"],
303 runtime_owner_path=owner_metadata["owner_path"],
304 rotate_after_bytes=owner.config.session_rotate_after_bytes,
305 auto_compaction_input_tokens_threshold=(
306 owner.config.session_auto_compaction_input_tokens_threshold
307 ),
308 compaction_keep_last_messages=owner.config.session_compaction_keep_last_messages,
309 system_message_factory=owner._get_system_message,
310 few_shot_factory=owner._get_few_shot_examples,
311 )
312
313
314 def restore_runtime_session_state(
315 *,
316 project_root: Path,
317 session: ConversationSession,
318 ) -> RestoredSessionState:
319 """Reconstruct public shell state from one persisted runtime session."""
320
321 last_turn_summary: TurnSummary | None = None
322 if session.active_dod_path:
323 dod_path = Path(session.active_dod_path)
324 if dod_path.exists():
325 dod = DefinitionOfDoneStore(project_root).load(dod_path)
326 last_turn_summary = TurnSummary(
327 final_response="",
328 definition_of_done=dod,
329 workflow_mode=session.workflow_mode,
330 workflow_reason_code=session.workflow_reason_code,
331 workflow_reason_summary=session.workflow_reason_summary,
332 workflow_decision_kind=session.workflow_decision_kind,
333 completion_decision_code=session.last_completion_decision_code,
334 completion_decision_summary=session.last_completion_decision_summary,
335 completion_trace=list(session.completion_trace),
336 workflow_timeline=list(session.workflow_timeline),
337 session_id=session.session_id,
338 cumulative_usage=dict(session.usage_totals),
339 )
340
341 return RestoredSessionState(
342 messages=session.messages,
343 current_task=session.current_task,
344 workflow_mode=session.workflow_mode,
345 permission_mode=session.permission_mode,
346 prompt_format=session.prompt_format,
347 prompt_sections=list(session.prompt_sections),
348 last_completion_decision_code=session.last_completion_decision_code,
349 last_completion_decision_summary=session.last_completion_decision_summary,
350 last_turn_summary=last_turn_summary,
351 )
352
353
354 def load_runtime_session_install(
355 *,
356 project_root: Path,
357 system_message_factory: Callable[[], Message],
358 few_shot_factory: Callable[[], list[Message]],
359 session_id: str | None = None,
360 rotate_after_bytes: int,
361 auto_compaction_input_tokens_threshold: int,
362 compaction_keep_last_messages: int,
363 ) -> RuntimeSessionInstall | None:
364 """Load the latest or named session together with restored shell state."""
365
366 session = ConversationSession.load(
367 project_root=project_root,
368 system_message_factory=system_message_factory,
369 few_shot_factory=few_shot_factory,
370 session_id=session_id,
371 rotate_after_bytes=rotate_after_bytes,
372 auto_compaction_input_tokens_threshold=(
373 auto_compaction_input_tokens_threshold
374 ),
375 compaction_keep_last_messages=compaction_keep_last_messages,
376 )
377 if session is None:
378 return None
379 return RuntimeSessionInstall(
380 session=session,
381 restored=restore_runtime_session_state(
382 project_root=project_root,
383 session=session,
384 ),
385 )
386
387
388 def resume_runtime_shell_session(
389 owner: RuntimeShellOwner,
390 *,
391 session_id: str | None = None,
392 ) -> bool:
393 """Resume the latest or named persisted session onto the public shell."""
394
395 loaded = load_runtime_session_install(
396 project_root=owner.project_root,
397 system_message_factory=owner._get_system_message,
398 few_shot_factory=owner._get_few_shot_examples,
399 session_id=session_id,
400 rotate_after_bytes=owner.config.session_rotate_after_bytes,
401 auto_compaction_input_tokens_threshold=(
402 owner.config.session_auto_compaction_input_tokens_threshold
403 ),
404 compaction_keep_last_messages=owner.config.session_compaction_keep_last_messages,
405 )
406 if loaded is None:
407 return False
408 apply_runtime_session_install(owner, loaded)
409 return True
410
411
412 def clear_runtime_shell_history(owner: RuntimeShellOwner) -> None:
413 """Reset the public shell onto a fresh runtime session."""
414
415 owner.messages = []
416 owner.prompt_format = None
417 owner.prompt_sections = []
418 owner.current_task = None
419 owner.last_turn_summary = None
420 owner.set_workflow_mode("execute")
421 apply_runtime_session_install(
422 owner,
423 build_fresh_runtime_session_install(
424 owner,
425 messages=owner.messages,
426 workflow_mode="execute",
427 ),
428 )
429 owner._system_message = None
430 owner.safeguards.reset()
431
432
433 def resolve_runtime_shell_use_react(owner: RuntimeShellOwner) -> bool:
434 """Resolve the active prompt/tool format for the public shell."""
435
436 if owner._use_react is not None:
437 return owner._use_react
438
439 if owner.config.force_react:
440 owner._use_react = True
441 return True
442
443 owner._use_react = not owner.capability_profile.supports_native_tools
444 return owner._use_react
445
446
447 def set_runtime_shell_workflow_mode(
448 owner: RuntimeShellOwner,
449 workflow_mode: str,
450 ) -> None:
451 """Update workflow mode and invalidate prompt state when it changes."""
452
453 if workflow_mode == owner.workflow_mode:
454 return
455 owner.workflow_mode = workflow_mode
456 owner._system_message = None
457
458
459 def get_runtime_shell_system_message(owner: RuntimeShellOwner) -> Message:
460 """Build or reuse the cached runtime system message for the public shell."""
461
462 if owner._system_message is None:
463 prompt_state = build_runtime_system_message(
464 registry=owner.registry,
465 use_react=resolve_runtime_shell_use_react(owner),
466 project_context=owner.project_context,
467 workflow_mode=owner.workflow_mode,
468 permission_mode=owner.permission_policy.active_mode.as_str(),
469 cwd=owner.project_root,
470 current_task=owner.current_task,
471 session=owner.session,
472 )
473 owner.prompt_format = prompt_state.prompt_format
474 owner.prompt_sections = list(prompt_state.prompt_sections)
475 owner._system_message = prompt_state.system_message
476 return owner._system_message
477
478
479 def get_runtime_shell_few_shot_examples(owner: RuntimeShellOwner) -> list[Message]:
480 """Return few-shot examples for the owner's active shell tool format."""
481
482 return build_runtime_few_shot_examples(
483 use_react=resolve_runtime_shell_use_react(owner)
484 )
485
486
487 def build_event_emitter(
488 on_event: Callable[[AgentEvent], None] | Callable[[AgentEvent], Awaitable[None]] | None,
489 ) -> Callable[[AgentEvent], Awaitable[None]]:
490 """Normalize public-shell event callbacks into one async emitter."""
491
492 async def emit(event: AgentEvent) -> None:
493 if on_event is None:
494 return
495 result = on_event(event)
496 if inspect.iscoroutine(result):
497 await result
498
499 return emit
500
501
502 async def run_runtime_shell(
503 owner: RuntimeShellOwner,
504 user_message: str,
505 *,
506 on_event: Callable[[AgentEvent], None]
507 | Callable[[AgentEvent], Awaitable[None]]
508 | None = None,
509 on_confirmation: Callable[[str, str, str], Awaitable[bool]] | None = None,
510 on_user_question: Callable[[str, list[str] | None], Awaitable[str]] | None = None,
511 use_plan: bool | None = None,
512 ) -> str:
513 """Run one user message through the runtime-owned public shell entrypoint."""
514
515 emit = build_event_emitter(on_event)
516 owner.steering.mark_running()
517 try:
518 launcher = build_runtime_launcher(owner)
519 return await launcher.run_user_message(
520 user_message,
521 emit,
522 on_confirmation=on_confirmation,
523 on_user_question=on_user_question,
524 use_plan=use_plan,
525 )
526 finally:
527 owner.steering.mark_idle()
528
529
530 async def stream_runtime_shell(
531 owner: RuntimeShellOwner,
532 user_message: str,
533 ) -> AsyncIterator[AgentEvent]:
534 """Yield the streamed event sequence from the runtime-owned public shell."""
535
536 queue: asyncio.Queue[AgentEvent | BaseException | None] = asyncio.Queue()
537
538 async def on_event(event: AgentEvent) -> None:
539 await queue.put(event)
540
541 async def run_owner() -> None:
542 try:
543 await run_runtime_shell(owner, user_message, on_event=on_event)
544 except BaseException as exc: # pragma: no cover - propagated below
545 await queue.put(exc)
546 finally:
547 await queue.put(None)
548
549 task = asyncio.create_task(run_owner())
550 try:
551 while True:
552 item = await queue.get()
553 if item is None:
554 break
555 if isinstance(item, BaseException):
556 raise item
557 yield item
558 await task
559 finally:
560 if not task.done():
561 task.cancel()
562 with contextlib.suppress(asyncio.CancelledError):
563 await task
564
565
566 async def run_runtime_shell_explore(
567 owner: RuntimeShellOwner,
568 user_message: str,
569 *,
570 on_event: Callable[[AgentEvent], None]
571 | Callable[[AgentEvent], Awaitable[None]]
572 | None = None,
573 fresh: bool = False,
574 ) -> str:
575 """Run one read-only explore query through the runtime-owned public shell."""
576
577 emit = build_event_emitter(on_event)
578 launcher = build_runtime_launcher(owner)
579 owner.last_turn_summary = await launcher.run_explore(
580 user_message,
581 emit,
582 fresh=fresh,
583 )
584 return owner.last_turn_summary.final_response
585
586
587 def refresh_runtime_capability_state(
588 *,
589 backend,
590 current_profile: CapabilityProfile,
591 ) -> CapabilityRefresh:
592 """Recompute backend capability state and report whether prompts must reset."""
593
594 refreshed_profile = resolve_backend_capability_profile(backend)
595 return CapabilityRefresh(
596 capability_profile=refreshed_profile,
597 prompt_reset_required=refreshed_profile != current_profile,
598 )
599
600
601 def refresh_runtime_shell_capability_profile(
602 owner: RuntimeShellOwner,
603 ) -> CapabilityRefresh:
604 """Refresh backend capabilities and invalidate prompt caches as needed."""
605
606 refresh = refresh_runtime_capability_state(
607 backend=owner.backend,
608 current_profile=owner.capability_profile,
609 )
610 owner.capability_profile = refresh.capability_profile
611 if refresh.prompt_reset_required:
612 owner._system_message = None
613 owner._use_react = None
614 return refresh
615
616
617 def build_runtime_system_message(
618 *,
619 registry: ToolRegistry,
620 use_react: bool,
621 project_context: ProjectContext | None,
622 workflow_mode: str,
623 permission_mode: str,
624 cwd: Path,
625 current_task: str | None,
626 session: ConversationSession,
627 ) -> RuntimePromptState:
628 """Build and persist the active runtime system prompt contract."""
629
630 prompt_result = build_system_prompt_result(
631 tools=registry.get_schemas(),
632 use_react=use_react,
633 project_context=project_context,
634 workflow_mode=workflow_mode,
635 permission_mode=permission_mode,
636 cwd=cwd,
637 current_task=current_task,
638 )
639 prompt_sections = list(prompt_result.dynamic_section_names)
640 session.update_runtime_state(
641 prompt_format=prompt_result.prompt_format,
642 prompt_sections=prompt_sections,
643 )
644 session.append_prompt_snapshot(
645 PromptSnapshot.create(
646 workflow_mode=workflow_mode,
647 permission_mode=permission_mode,
648 current_task=current_task,
649 prompt_format=prompt_result.prompt_format,
650 prompt_sections=prompt_sections,
651 content=prompt_result.content,
652 )
653 )
654 return RuntimePromptState(
655 system_message=Message(
656 role=Role.SYSTEM,
657 content=prompt_result.content,
658 ),
659 prompt_format=prompt_result.prompt_format,
660 prompt_sections=prompt_sections,
661 )
662
663
664 def build_runtime_few_shot_examples(*, use_react: bool) -> list[Message]:
665 """Return the runtime-owned few-shot examples for the active tool format."""
666
667 if use_react:
668 return [
669 Message(
670 role=Role.USER,
671 content="Create a file called hello.py that prints hello",
672 ),
673 Message(
674 role=Role.ASSISTANT,
675 content=(
676 '<tool_call>\n'
677 '{"name": "write", "arguments": {"file_path": "hello.py", '
678 '"content": "print(\'hello\')"}}\n'
679 "</tool_call>"
680 ),
681 ),
682 Message(role=Role.TOOL, content="Created hello.py"),
683 Message(role=Role.ASSISTANT, content="Done."),
684 ]
685 return [
686 Message(
687 role=Role.USER,
688 content="Create a file called hello.py that prints hello",
689 ),
690 Message(
691 role=Role.ASSISTANT,
692 content='[write: file_path="hello.py", content="print(\'hello\')"]',
693 ),
694 Message(role=Role.TOOL, content="Created hello.py"),
695 Message(role=Role.ASSISTANT, content="Done."),
696 ]
697
698
699 def _copy_rule_counts(rule_counts: dict[str, int]) -> dict[str, int]:
700 return {
701 "allow": int(rule_counts.get("allow", 0)),
702 "deny": int(rule_counts.get("deny", 0)),
703 "ask": int(rule_counts.get("ask", 0)),
704 }