| 1 | """Streaming text widget for LLM responses.""" |
| 2 | |
| 3 | from rich.text import Text |
| 4 | from textual.reactive import reactive |
| 5 | from textual.widgets import Static |
| 6 | |
| 7 | |
| 8 | class StreamingText(Static): |
| 9 | """Widget that displays streaming text content with cursor.""" |
| 10 | |
| 11 | content: reactive[str] = reactive("") |
| 12 | is_streaming: reactive[bool] = reactive(False) |
| 13 | |
| 14 | def __init__(self, **kwargs) -> None: |
| 15 | super().__init__(**kwargs) |
| 16 | self._content_buffer = "" |
| 17 | |
| 18 | def render(self) -> Text: |
| 19 | """Render the content with optional cursor.""" |
| 20 | # Use Text object to avoid markup interpretation of LLM output |
| 21 | # Clean any tool_call tags that slipped through filtering |
| 22 | content = self._clean_tool_tags(self._content_buffer) |
| 23 | text = Text(content) |
| 24 | if self.is_streaming: |
| 25 | text.append("|", style="dim") # Cursor indicator |
| 26 | return text |
| 27 | |
| 28 | def _clean_tool_tags(self, content: str) -> str: |
| 29 | """Remove any tool_call/think tags that weren't filtered during streaming.""" |
| 30 | import re |
| 31 | # Remove <tool_call>...</tool_call> blocks |
| 32 | content = re.sub(r'<tool_call>.*?</tool_call>', '', content, flags=re.DOTALL | re.IGNORECASE) |
| 33 | # Remove orphaned tags |
| 34 | content = re.sub(r'</?tool_call>', '', content, flags=re.IGNORECASE) |
| 35 | content = re.sub(r'</?think>', '', content, flags=re.IGNORECASE) |
| 36 | # Clean up excess newlines from removed blocks |
| 37 | content = re.sub(r'\n{3,}', '\n\n', content) |
| 38 | return content |
| 39 | |
| 40 | def append(self, chunk: str) -> None: |
| 41 | """Append a chunk to the content.""" |
| 42 | self._content_buffer += chunk |
| 43 | self.refresh(layout=True) # Force layout recalculation for multi-line |
| 44 | |
| 45 | def start_streaming(self) -> None: |
| 46 | """Start streaming mode.""" |
| 47 | self._content_buffer = "" |
| 48 | self.is_streaming = True |
| 49 | self.add_class("streaming") |
| 50 | self.refresh(layout=True) |
| 51 | |
| 52 | def stop_streaming(self) -> None: |
| 53 | """Stop streaming mode.""" |
| 54 | self.is_streaming = False |
| 55 | self.remove_class("streaming") |
| 56 | self.refresh(layout=True) |
| 57 | |
| 58 | def get_content(self) -> str: |
| 59 | """Get the accumulated content.""" |
| 60 | return self._content_buffer |
| 61 | |
| 62 | def clear(self) -> None: |
| 63 | """Clear the content.""" |
| 64 | self._content_buffer = "" |
| 65 | self.refresh() |