| 1 | """Runtime-owned task decomposition orchestration.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from collections.abc import Awaitable, Callable |
| 6 | |
| 7 | from ..llm.base import Message, Role |
| 8 | from .bootstrap import RuntimeBootstrapSource |
| 9 | from .conversation import ConfirmationHandler, EventSink, UserQuestionHandler |
| 10 | from .deliberation import DECOMPOSITION_PROMPT, parse_decomposition |
| 11 | from .events import AgentEvent |
| 12 | |
| 13 | RunTaskCallback = Callable[ |
| 14 | [ |
| 15 | str, |
| 16 | EventSink, |
| 17 | ConfirmationHandler, |
| 18 | UserQuestionHandler, |
| 19 | str | None, |
| 20 | str | None, |
| 21 | ], |
| 22 | Awaitable[str], |
| 23 | ] |
| 24 | |
| 25 | |
| 26 | class DecompositionTurnRunner: |
| 27 | """Own runtime-managed task decomposition and subtask execution.""" |
| 28 | |
| 29 | def __init__( |
| 30 | self, |
| 31 | source: RuntimeBootstrapSource, |
| 32 | *, |
| 33 | run_task: RunTaskCallback, |
| 34 | ) -> None: |
| 35 | self.source = source |
| 36 | self.run_task = run_task |
| 37 | |
| 38 | async def run( |
| 39 | self, |
| 40 | task: str, |
| 41 | emit: EventSink, |
| 42 | *, |
| 43 | on_confirmation: ConfirmationHandler = None, |
| 44 | on_user_question: UserQuestionHandler = None, |
| 45 | requested_mode: str | None = None, |
| 46 | original_task: str | None = None, |
| 47 | ) -> str: |
| 48 | """Run one decomposition flow or fall back to the direct task path.""" |
| 49 | |
| 50 | await emit(AgentEvent(type="thinking", content="Analyzing task complexity...")) |
| 51 | decomposition = await self._decompose_task(task) |
| 52 | |
| 53 | if len(decomposition.subtasks) <= 1: |
| 54 | self.source.session.append(Message(role=Role.USER, content=task)) |
| 55 | return await self.run_task( |
| 56 | task, |
| 57 | emit, |
| 58 | on_confirmation, |
| 59 | on_user_question, |
| 60 | requested_mode, |
| 61 | original_task, |
| 62 | ) |
| 63 | |
| 64 | await emit( |
| 65 | AgentEvent( |
| 66 | type="decomposition", |
| 67 | content=decomposition.to_prompt(), |
| 68 | decomposition=decomposition, |
| 69 | ) |
| 70 | ) |
| 71 | |
| 72 | while not decomposition.is_complete() and not decomposition.has_failures(): |
| 73 | subtask = decomposition.next_subtask() |
| 74 | if subtask is None: |
| 75 | break |
| 76 | |
| 77 | subtask.status = "in_progress" |
| 78 | await emit( |
| 79 | AgentEvent( |
| 80 | type="subtask", |
| 81 | content=f"{decomposition.progress_str()} {subtask.description}", |
| 82 | subtask=subtask, |
| 83 | ) |
| 84 | ) |
| 85 | |
| 86 | self.source.session.append( |
| 87 | Message( |
| 88 | role=Role.USER, |
| 89 | content=( |
| 90 | f"Execute this subtask: {subtask.description}\n\n" |
| 91 | f"Verification: {subtask.verification}" |
| 92 | ), |
| 93 | ) |
| 94 | ) |
| 95 | subtask_response = await self.run_task( |
| 96 | subtask.description, |
| 97 | emit, |
| 98 | on_confirmation, |
| 99 | on_user_question, |
| 100 | None, |
| 101 | original_task, |
| 102 | ) |
| 103 | |
| 104 | if "error" in subtask_response.lower() or "failed" in subtask_response.lower(): |
| 105 | decomposition.mark_failed(subtask.id, subtask_response) |
| 106 | if decomposition.can_retry(subtask.id): |
| 107 | decomposition.reset_for_retry(subtask.id) |
| 108 | await emit( |
| 109 | AgentEvent( |
| 110 | type="subtask", |
| 111 | content=f"Retrying subtask: {subtask.description}", |
| 112 | subtask=subtask, |
| 113 | ) |
| 114 | ) |
| 115 | else: |
| 116 | decomposition.mark_completed(subtask.id, subtask_response) |
| 117 | |
| 118 | if not decomposition.is_complete(): |
| 119 | return f"Task partially completed. {decomposition.to_prompt()}" |
| 120 | |
| 121 | summary_prompt = ( |
| 122 | f"All subtasks completed for: {task}\n\n" |
| 123 | f"{decomposition.to_prompt()}\n\n" |
| 124 | "Provide a brief summary of what was accomplished." |
| 125 | ) |
| 126 | self.source.session.append(Message(role=Role.USER, content=summary_prompt)) |
| 127 | return await self.run_task( |
| 128 | summary_prompt, |
| 129 | emit, |
| 130 | on_confirmation, |
| 131 | on_user_question, |
| 132 | None, |
| 133 | original_task, |
| 134 | ) |
| 135 | |
| 136 | async def _decompose_task(self, task: str): |
| 137 | """Request one structured decomposition from the active backend.""" |
| 138 | |
| 139 | prompt = DECOMPOSITION_PROMPT.format(task=task) |
| 140 | response = await self.source.backend.complete( |
| 141 | messages=[ |
| 142 | self.source.session.system_message_factory(), |
| 143 | Message(role=Role.USER, content=prompt), |
| 144 | ], |
| 145 | tools=None, |
| 146 | temperature=0.3, |
| 147 | max_tokens=1000, |
| 148 | ) |
| 149 | return parse_decomposition(response.content, task) |