| 1 |
"""In-memory probe queue — the push path for sway-driven training. |
| 2 |
|
| 3 |
Probes POSTed to the RPC endpoint land here until the next cycle |
| 4 |
boundary, at which point `drain()` hands them to `build_dataset`. The |
| 5 |
queue is bounded (default 1000); enqueue past capacity raises |
| 6 |
`QueueFullError` so the RPC layer can map that to HTTP 429. |
| 7 |
|
| 8 |
The dataclass is frozen so a drained probe can be attached verbatim to |
| 9 |
the training summary's audit trail without defensive copies. |
| 10 |
""" |
| 11 |
|
| 12 |
from __future__ import annotations |
| 13 |
|
| 14 |
import threading |
| 15 |
from collections import deque |
| 16 |
from dataclasses import dataclass, field |
| 17 |
from datetime import UTC, datetime |
| 18 |
|
| 19 |
|
| 20 |
class QueueFullError(Exception): |
| 21 |
"""Raised when `InjectedProbeQueue.enqueue` is called past capacity.""" |
| 22 |
|
| 23 |
|
| 24 |
@dataclass(frozen=True) |
| 25 |
class InjectedProbe: |
| 26 |
"""One probe pushed over the RPC channel. |
| 27 |
|
| 28 |
`accepted_at` is set by the queue when the probe is enqueued; it's |
| 29 |
the server-side receipt timestamp, not whatever the client claimed. |
| 30 |
""" |
| 31 |
|
| 32 |
prompt: str |
| 33 |
reference: str |
| 34 |
tags: tuple[str, ...] = () |
| 35 |
source_addr: str = "" |
| 36 |
accepted_at: datetime = field(default_factory=lambda: datetime.now(UTC)) |
| 37 |
|
| 38 |
|
| 39 |
class InjectedProbeQueue: |
| 40 |
"""Thread-safe FIFO for RPC-pushed probes. |
| 41 |
|
| 42 |
The RPC server writes; the cycle-boundary hook drains. One lock |
| 43 |
guards both sides; contention is negligible at the expected rate |
| 44 |
(probes per minute, not per ms). |
| 45 |
""" |
| 46 |
|
| 47 |
def __init__(self, *, capacity: int = 1000) -> None: |
| 48 |
if capacity <= 0: |
| 49 |
raise ValueError(f"capacity must be positive, got {capacity}") |
| 50 |
self._capacity = capacity |
| 51 |
self._items: deque[InjectedProbe] = deque() |
| 52 |
self._lock = threading.Lock() |
| 53 |
|
| 54 |
@property |
| 55 |
def capacity(self) -> int: |
| 56 |
return self._capacity |
| 57 |
|
| 58 |
def enqueue(self, probe: InjectedProbe) -> None: |
| 59 |
"""Append `probe` to the queue, or raise `QueueFullError`.""" |
| 60 |
with self._lock: |
| 61 |
if len(self._items) >= self._capacity: |
| 62 |
raise QueueFullError( |
| 63 |
f"probe queue at capacity ({self._capacity}); " |
| 64 |
"drain the queue or raise --queue-capacity" |
| 65 |
) |
| 66 |
self._items.append(probe) |
| 67 |
|
| 68 |
def drain(self) -> list[InjectedProbe]: |
| 69 |
"""Remove and return all queued probes in FIFO order. |
| 70 |
|
| 71 |
Called at the top of each training cycle's `build_dataset`. |
| 72 |
After this call the queue is empty until the next RPC push. |
| 73 |
""" |
| 74 |
with self._lock: |
| 75 |
drained = list(self._items) |
| 76 |
self._items.clear() |
| 77 |
return drained |
| 78 |
|
| 79 |
def depth(self) -> int: |
| 80 |
with self._lock: |
| 81 |
return len(self._items) |