Python · 2663 bytes Raw Blame History
1 """In-memory probe queue — the push path for sway-driven training.
2
3 Probes POSTed to the RPC endpoint land here until the next cycle
4 boundary, at which point `drain()` hands them to `build_dataset`. The
5 queue is bounded (default 1000); enqueue past capacity raises
6 `QueueFullError` so the RPC layer can map that to HTTP 429.
7
8 The dataclass is frozen so a drained probe can be attached verbatim to
9 the training summary's audit trail without defensive copies.
10 """
11
12 from __future__ import annotations
13
14 import threading
15 from collections import deque
16 from dataclasses import dataclass, field
17 from datetime import UTC, datetime
18
19
20 class QueueFullError(Exception):
21 """Raised when `InjectedProbeQueue.enqueue` is called past capacity."""
22
23
24 @dataclass(frozen=True)
25 class InjectedProbe:
26 """One probe pushed over the RPC channel.
27
28 `accepted_at` is set by the queue when the probe is enqueued; it's
29 the server-side receipt timestamp, not whatever the client claimed.
30 """
31
32 prompt: str
33 reference: str
34 tags: tuple[str, ...] = ()
35 source_addr: str = ""
36 accepted_at: datetime = field(default_factory=lambda: datetime.now(UTC))
37
38
39 class InjectedProbeQueue:
40 """Thread-safe FIFO for RPC-pushed probes.
41
42 The RPC server writes; the cycle-boundary hook drains. One lock
43 guards both sides; contention is negligible at the expected rate
44 (probes per minute, not per ms).
45 """
46
47 def __init__(self, *, capacity: int = 1000) -> None:
48 if capacity <= 0:
49 raise ValueError(f"capacity must be positive, got {capacity}")
50 self._capacity = capacity
51 self._items: deque[InjectedProbe] = deque()
52 self._lock = threading.Lock()
53
54 @property
55 def capacity(self) -> int:
56 return self._capacity
57
58 def enqueue(self, probe: InjectedProbe) -> None:
59 """Append `probe` to the queue, or raise `QueueFullError`."""
60 with self._lock:
61 if len(self._items) >= self._capacity:
62 raise QueueFullError(
63 f"probe queue at capacity ({self._capacity}); "
64 "drain the queue or raise --queue-capacity"
65 )
66 self._items.append(probe)
67
68 def drain(self) -> list[InjectedProbe]:
69 """Remove and return all queued probes in FIFO order.
70
71 Called at the top of each training cycle's `build_dataset`.
72 After this call the queue is empty until the next RPC push.
73 """
74 with self._lock:
75 drained = list(self._items)
76 self._items.clear()
77 return drained
78
79 def depth(self) -> int:
80 with self._lock:
81 return len(self._items)