Python · 4827 bytes Raw Blame History
1 """Runtime-owned task decomposition orchestration."""
2
3 from __future__ import annotations
4
5 from collections.abc import Awaitable, Callable
6
7 from ..llm.base import Message, Role
8 from .bootstrap import RuntimeBootstrapSource
9 from .conversation import ConfirmationHandler, EventSink, UserQuestionHandler
10 from .deliberation import DECOMPOSITION_PROMPT, parse_decomposition
11 from .events import AgentEvent
12
13 RunTaskCallback = Callable[
14 [
15 str,
16 EventSink,
17 ConfirmationHandler,
18 UserQuestionHandler,
19 str | None,
20 str | None,
21 ],
22 Awaitable[str],
23 ]
24
25
26 class DecompositionTurnRunner:
27 """Own runtime-managed task decomposition and subtask execution."""
28
29 def __init__(
30 self,
31 source: RuntimeBootstrapSource,
32 *,
33 run_task: RunTaskCallback,
34 ) -> None:
35 self.source = source
36 self.run_task = run_task
37
38 async def run(
39 self,
40 task: str,
41 emit: EventSink,
42 *,
43 on_confirmation: ConfirmationHandler = None,
44 on_user_question: UserQuestionHandler = None,
45 requested_mode: str | None = None,
46 original_task: str | None = None,
47 ) -> str:
48 """Run one decomposition flow or fall back to the direct task path."""
49
50 await emit(AgentEvent(type="thinking", content="Analyzing task complexity..."))
51 decomposition = await self._decompose_task(task)
52
53 if len(decomposition.subtasks) <= 1:
54 self.source.session.append(Message(role=Role.USER, content=task))
55 return await self.run_task(
56 task,
57 emit,
58 on_confirmation,
59 on_user_question,
60 requested_mode,
61 original_task,
62 )
63
64 await emit(
65 AgentEvent(
66 type="decomposition",
67 content=decomposition.to_prompt(),
68 decomposition=decomposition,
69 )
70 )
71
72 while not decomposition.is_complete() and not decomposition.has_failures():
73 subtask = decomposition.next_subtask()
74 if subtask is None:
75 break
76
77 subtask.status = "in_progress"
78 await emit(
79 AgentEvent(
80 type="subtask",
81 content=f"{decomposition.progress_str()} {subtask.description}",
82 subtask=subtask,
83 )
84 )
85
86 self.source.session.append(
87 Message(
88 role=Role.USER,
89 content=(
90 f"Execute this subtask: {subtask.description}\n\n"
91 f"Verification: {subtask.verification}"
92 ),
93 )
94 )
95 subtask_response = await self.run_task(
96 subtask.description,
97 emit,
98 on_confirmation,
99 on_user_question,
100 None,
101 original_task,
102 )
103
104 if "error" in subtask_response.lower() or "failed" in subtask_response.lower():
105 decomposition.mark_failed(subtask.id, subtask_response)
106 if decomposition.can_retry(subtask.id):
107 decomposition.reset_for_retry(subtask.id)
108 await emit(
109 AgentEvent(
110 type="subtask",
111 content=f"Retrying subtask: {subtask.description}",
112 subtask=subtask,
113 )
114 )
115 else:
116 decomposition.mark_completed(subtask.id, subtask_response)
117
118 if not decomposition.is_complete():
119 return f"Task partially completed. {decomposition.to_prompt()}"
120
121 summary_prompt = (
122 f"All subtasks completed for: {task}\n\n"
123 f"{decomposition.to_prompt()}\n\n"
124 "Provide a brief summary of what was accomplished."
125 )
126 self.source.session.append(Message(role=Role.USER, content=summary_prompt))
127 return await self.run_task(
128 summary_prompt,
129 emit,
130 on_confirmation,
131 on_user_question,
132 None,
133 original_task,
134 )
135
136 async def _decompose_task(self, task: str):
137 """Request one structured decomposition from the active backend."""
138
139 prompt = DECOMPOSITION_PROMPT.format(task=task)
140 response = await self.source.backend.complete(
141 messages=[
142 self.source.session.system_message_factory(),
143 Message(role=Role.USER, content=prompt),
144 ],
145 tools=None,
146 temperature=0.3,
147 max_tokens=1000,
148 )
149 return parse_decomposition(response.content, task)