tenseleyflow/loader / e7a1225

Browse files

Extract runtime decomposition lane

Authored by espadonne
SHA
e7a1225b0e9af62ea6d2bf8e850c214a8c143ae4
Parents
916d43e
Tree
8550fa5

3 changed files

StatusFile+-
M src/loader/agent/loop.py 9 89
A src/loader/runtime/decomposition_lane.py 149 0
M src/loader/runtime/launcher.py 44 0
src/loader/agent/loop.pymodified
@@ -9,11 +9,7 @@ from pathlib import Path
99
 from ..context.project import ProjectContext, detect_project
1010
 from ..llm.base import LLMBackend, Message, Role
1111
 from ..runtime.capabilities import resolve_backend_capability_profile
12
-from ..runtime.deliberation import (
13
-    DECOMPOSITION_PROMPT,
14
-    parse_decomposition,
15
-    should_decompose,
16
-)
12
+from ..runtime.deliberation import should_decompose
1713
 from ..runtime.dod import DefinitionOfDoneStore
1814
 from ..runtime.events import AgentEvent, TurnSummary
1915
 from ..runtime.launcher import build_runtime_launcher
@@ -23,7 +19,6 @@ from ..runtime.permissions import (
2319
     load_permission_rules,
2420
 )
2521
 from ..runtime.prompt_history import PromptSnapshot
26
-from ..runtime.reasoning_types import TaskDecomposition
2722
 from ..runtime.safeguards import RuntimeSafeguards
2823
 from ..runtime.session import ConversationSession
2924
 from ..runtime.task_classification import is_conversational
@@ -359,22 +354,6 @@ class Agent:
359354
                 Message(role=Role.ASSISTANT, content="Done."),
360355
             ]
361356
 
362
-    # === Reasoning Stage Methods ===
363
-
364
-    async def _decompose_task(self, task: str) -> TaskDecomposition:
365
-        """Decompose a complex task into atomic subtasks."""
366
-        prompt = DECOMPOSITION_PROMPT.format(task=task)
367
-        response = await self.backend.complete(
368
-            messages=[
369
-                self._get_system_message(),
370
-                Message(role=Role.USER, content=prompt),
371
-            ],
372
-            tools=None,
373
-            temperature=0.3,  # Lower temp for structured output
374
-            max_tokens=1000,
375
-        )
376
-        return parse_decomposition(response.content, task)
377
-
378357
     async def run(
379358
         self,
380359
         user_message: str,
@@ -440,73 +419,14 @@ class Agent:
440419
 
441420
         # Check if we should decompose the task (higher priority than planning)
442421
         if cfg.decomposition and should_decompose(user_message):
443
-            await emit(AgentEvent(type="thinking", content="Analyzing task complexity..."))
444
-            decomposition = await self._decompose_task(user_message)
445
-
446
-            if len(decomposition.subtasks) > 1:
447
-                await emit(AgentEvent(
448
-                    type="decomposition",
449
-                    content=decomposition.to_prompt(),
450
-                    decomposition=decomposition,
451
-                ))
452
-
453
-                # Execute each subtask
454
-                while not decomposition.is_complete() and not decomposition.has_failures():
455
-                    subtask = decomposition.next_subtask()
456
-                    if not subtask:
457
-                        break
458
-
459
-                    subtask.status = "in_progress"
460
-                    await emit(AgentEvent(
461
-                        type="subtask",
462
-                        content=f"{decomposition.progress_str()} {subtask.description}",
463
-                        subtask=subtask,
464
-                    ))
465
-
466
-                    # Run the subtask
467
-                    self.session.append(Message(
468
-                        role=Role.USER,
469
-                        content=f"Execute this subtask: {subtask.description}\n\n"
470
-                                f"Verification: {subtask.verification}",
471
-                    ))
472
-                    subtask_response = await self._run_inner(
473
-                        subtask.description,
474
-                        emit,
475
-                        on_confirmation,
476
-                        on_user_question=on_user_question,
477
-                        original_task=self._current_task,
478
-                    )
479
-
480
-                    # Mark based on result (simple heuristic)
481
-                    if "error" in subtask_response.lower() or "failed" in subtask_response.lower():
482
-                        decomposition.mark_failed(subtask.id, subtask_response)
483
-                        if decomposition.can_retry(subtask.id):
484
-                            decomposition.reset_for_retry(subtask.id)
485
-                            await emit(AgentEvent(
486
-                                type="subtask",
487
-                                content=f"Retrying subtask: {subtask.description}",
488
-                                subtask=subtask,
489
-                            ))
490
-                    else:
491
-                        decomposition.mark_completed(subtask.id, subtask_response)
492
-
493
-                # Final summary
494
-                if decomposition.is_complete():
495
-                    summary_prompt = (
496
-                        f"All subtasks completed for: {user_message}\n\n"
497
-                        f"{decomposition.to_prompt()}\n\n"
498
-                        "Provide a brief summary of what was accomplished."
499
-                    )
500
-                    self.session.append(Message(role=Role.USER, content=summary_prompt))
501
-                    return await self._run_inner(
502
-                        summary_prompt,
503
-                        emit,
504
-                        on_confirmation,
505
-                        on_user_question=on_user_question,
506
-                        original_task=self._current_task,
507
-                    )
508
-                else:
509
-                    return f"Task partially completed. {decomposition.to_prompt()}"
422
+            return await launcher.run_decomposed(
423
+                user_message,
424
+                emit,
425
+                on_confirmation=on_confirmation,
426
+                on_user_question=on_user_question,
427
+                requested_mode=self._requested_workflow_mode(use_plan),
428
+                original_task=self._current_task,
429
+            )
510430
 
511431
         # No planning or decomposition - run directly
512432
         self.session.append(Message(role=Role.USER, content=user_message))
src/loader/runtime/decomposition_lane.pyadded
@@ -0,0 +1,149 @@
1
+"""Runtime-owned task decomposition orchestration."""
2
+
3
+from __future__ import annotations
4
+
5
+from collections.abc import Awaitable, Callable
6
+
7
+from ..llm.base import Message, Role
8
+from .bootstrap import RuntimeBootstrapSource
9
+from .conversation import ConfirmationHandler, EventSink, UserQuestionHandler
10
+from .deliberation import DECOMPOSITION_PROMPT, parse_decomposition
11
+from .events import AgentEvent
12
+
13
+RunTaskCallback = Callable[
14
+    [
15
+        str,
16
+        EventSink,
17
+        ConfirmationHandler,
18
+        UserQuestionHandler,
19
+        str | None,
20
+        str | None,
21
+    ],
22
+    Awaitable[str],
23
+]
24
+
25
+
26
+class DecompositionTurnRunner:
27
+    """Own runtime-managed task decomposition and subtask execution."""
28
+
29
+    def __init__(
30
+        self,
31
+        source: RuntimeBootstrapSource,
32
+        *,
33
+        run_task: RunTaskCallback,
34
+    ) -> None:
35
+        self.source = source
36
+        self.run_task = run_task
37
+
38
+    async def run(
39
+        self,
40
+        task: str,
41
+        emit: EventSink,
42
+        *,
43
+        on_confirmation: ConfirmationHandler = None,
44
+        on_user_question: UserQuestionHandler = None,
45
+        requested_mode: str | None = None,
46
+        original_task: str | None = None,
47
+    ) -> str:
48
+        """Run one decomposition flow or fall back to the direct task path."""
49
+
50
+        await emit(AgentEvent(type="thinking", content="Analyzing task complexity..."))
51
+        decomposition = await self._decompose_task(task)
52
+
53
+        if len(decomposition.subtasks) <= 1:
54
+            self.source.session.append(Message(role=Role.USER, content=task))
55
+            return await self.run_task(
56
+                task,
57
+                emit,
58
+                on_confirmation,
59
+                on_user_question,
60
+                requested_mode,
61
+                original_task,
62
+            )
63
+
64
+        await emit(
65
+            AgentEvent(
66
+                type="decomposition",
67
+                content=decomposition.to_prompt(),
68
+                decomposition=decomposition,
69
+            )
70
+        )
71
+
72
+        while not decomposition.is_complete() and not decomposition.has_failures():
73
+            subtask = decomposition.next_subtask()
74
+            if subtask is None:
75
+                break
76
+
77
+            subtask.status = "in_progress"
78
+            await emit(
79
+                AgentEvent(
80
+                    type="subtask",
81
+                    content=f"{decomposition.progress_str()} {subtask.description}",
82
+                    subtask=subtask,
83
+                )
84
+            )
85
+
86
+            self.source.session.append(
87
+                Message(
88
+                    role=Role.USER,
89
+                    content=(
90
+                        f"Execute this subtask: {subtask.description}\n\n"
91
+                        f"Verification: {subtask.verification}"
92
+                    ),
93
+                )
94
+            )
95
+            subtask_response = await self.run_task(
96
+                subtask.description,
97
+                emit,
98
+                on_confirmation,
99
+                on_user_question,
100
+                None,
101
+                original_task,
102
+            )
103
+
104
+            if "error" in subtask_response.lower() or "failed" in subtask_response.lower():
105
+                decomposition.mark_failed(subtask.id, subtask_response)
106
+                if decomposition.can_retry(subtask.id):
107
+                    decomposition.reset_for_retry(subtask.id)
108
+                    await emit(
109
+                        AgentEvent(
110
+                            type="subtask",
111
+                            content=f"Retrying subtask: {subtask.description}",
112
+                            subtask=subtask,
113
+                        )
114
+                    )
115
+            else:
116
+                decomposition.mark_completed(subtask.id, subtask_response)
117
+
118
+        if not decomposition.is_complete():
119
+            return f"Task partially completed. {decomposition.to_prompt()}"
120
+
121
+        summary_prompt = (
122
+            f"All subtasks completed for: {task}\n\n"
123
+            f"{decomposition.to_prompt()}\n\n"
124
+            "Provide a brief summary of what was accomplished."
125
+        )
126
+        self.source.session.append(Message(role=Role.USER, content=summary_prompt))
127
+        return await self.run_task(
128
+            summary_prompt,
129
+            emit,
130
+            on_confirmation,
131
+            on_user_question,
132
+            None,
133
+            original_task,
134
+        )
135
+
136
+    async def _decompose_task(self, task: str):
137
+        """Request one structured decomposition from the active backend."""
138
+
139
+        prompt = DECOMPOSITION_PROMPT.format(task=task)
140
+        response = await self.source.backend.complete(
141
+            messages=[
142
+                self.source.session.system_message_factory(),
143
+                Message(role=Role.USER, content=prompt),
144
+            ],
145
+            tools=None,
146
+            temperature=0.3,
147
+            max_tokens=1000,
148
+        )
149
+        return parse_decomposition(response.content, task)
src/loader/runtime/launcher.pymodified
@@ -5,6 +5,7 @@ from __future__ import annotations
55
 from .bootstrap import RuntimeBootstrapSource
66
 from .chat_lane import ConversationalTurnRunner
77
 from .conversation import ConfirmationHandler, ConversationRuntime, EventSink, UserQuestionHandler
8
+from .decomposition_lane import DecompositionTurnRunner
89
 from .events import TurnSummary
910
 from .explore import ExploreRuntime
1011
 
@@ -47,6 +48,28 @@ class RuntimeLauncher:
4748
             original_task=original_task,
4849
         )
4950
 
51
+    async def run_decomposed(
52
+        self,
53
+        task: str,
54
+        emit: EventSink,
55
+        *,
56
+        on_confirmation: ConfirmationHandler = None,
57
+        on_user_question: UserQuestionHandler = None,
58
+        requested_mode: str | None = None,
59
+        original_task: str | None = None,
60
+    ) -> str:
61
+        """Run a decomposition-guided task through the shared launcher seam."""
62
+
63
+        runner = DecompositionTurnRunner(self.source, run_task=self._run_task_response)
64
+        return await runner.run(
65
+            task,
66
+            emit,
67
+            on_confirmation=on_confirmation,
68
+            on_user_question=on_user_question,
69
+            requested_mode=requested_mode,
70
+            original_task=original_task,
71
+        )
72
+
5073
     async def run_explore(
5174
         self,
5275
         prompt: str,
@@ -57,6 +80,27 @@ class RuntimeLauncher:
5780
         runtime = ExploreRuntime(self.source)
5881
         return await runtime.run_query(prompt, emit)
5982
 
83
+    async def _run_task_response(
84
+        self,
85
+        task: str,
86
+        emit: EventSink,
87
+        on_confirmation: ConfirmationHandler = None,
88
+        on_user_question: UserQuestionHandler = None,
89
+        requested_mode: str | None = None,
90
+        original_task: str | None = None,
91
+    ) -> str:
92
+        """Run one runtime turn and return only the final response text."""
93
+
94
+        summary = await self.run_turn(
95
+            task,
96
+            emit,
97
+            on_confirmation=on_confirmation,
98
+            on_user_question=on_user_question,
99
+            requested_mode=requested_mode,
100
+            original_task=original_task,
101
+        )
102
+        return summary.final_response
103
+
60104
 
61105
 def build_runtime_launcher(source: RuntimeBootstrapSource) -> RuntimeLauncher:
62106
     """Build a public runtime launcher from the shared bootstrap source."""