| 1 |
"""Parse `### Q` / `### A` pairs out of an `::instruction::` section body. |
| 2 |
|
| 3 |
Grammar (strict): |
| 4 |
|
| 5 |
### Q |
| 6 |
<question body, one or more lines, first blank line ends it> |
| 7 |
### A |
| 8 |
<answer body, same rule> |
| 9 |
(blank line) |
| 10 |
### Q |
| 11 |
... |
| 12 |
|
| 13 |
Rules: |
| 14 |
|
| 15 |
- Headers must be `### Q` / `### A` alone on their line (leading/trailing |
| 16 |
whitespace tolerated). Inline content like `### Q what's this?` is a |
| 17 |
parse error — the body begins on the *next* line. |
| 18 |
- Every `### Q` must be followed (after its body) by a matching `### A`. |
| 19 |
An unterminated question, two questions in a row, or a bare `### A` |
| 20 |
raises `InstructionParseError` with the 1-indexed section-relative line |
| 21 |
where the violation was detected. |
| 22 |
- Empty question or empty answer bodies are errors — training on an |
| 23 |
empty turn is almost always a mistake. |
| 24 |
- Non-header, non-blank lines outside a field body are errors; prose |
| 25 |
that isn't part of a turn belongs in a default PROSE section. |
| 26 |
""" |
| 27 |
|
| 28 |
from __future__ import annotations |
| 29 |
|
| 30 |
from dataclasses import dataclass |
| 31 |
|
| 32 |
from dlm.data.errors import InstructionParseError |
| 33 |
|
| 34 |
_Q_HEADER = "### Q" |
| 35 |
_A_HEADER = "### A" |
| 36 |
|
| 37 |
|
| 38 |
@dataclass(frozen=True) |
| 39 |
class QAPair: |
| 40 |
"""A single instruction turn.""" |
| 41 |
|
| 42 |
question: str |
| 43 |
answer: str |
| 44 |
|
| 45 |
|
| 46 |
def parse_instruction_body(body: str, *, section_id: str) -> list[QAPair]: |
| 47 |
"""Return the list of Q/A pairs in `body`. |
| 48 |
|
| 49 |
`section_id` is stamped onto any raised `InstructionParseError` so |
| 50 |
the caller can point the user back at the offending `.dlm` section. |
| 51 |
""" |
| 52 |
lines = body.splitlines() |
| 53 |
it = _PeekableLines(lines) |
| 54 |
it.skip_blank() |
| 55 |
|
| 56 |
pairs: list[QAPair] = [] |
| 57 |
while not it.eof(): |
| 58 |
pairs.append(_parse_pair(it, section_id=section_id)) |
| 59 |
it.skip_blank() |
| 60 |
|
| 61 |
if not pairs: |
| 62 |
raise InstructionParseError( |
| 63 |
"instruction block has no ### Q / ### A pairs", |
| 64 |
section_id=section_id, |
| 65 |
section_line=1, |
| 66 |
) |
| 67 |
return pairs |
| 68 |
|
| 69 |
|
| 70 |
def _parse_pair(it: _PeekableLines, *, section_id: str) -> QAPair: |
| 71 |
q_line = it.peek_line() |
| 72 |
if not _is_header(q_line, _Q_HEADER): |
| 73 |
raise InstructionParseError( |
| 74 |
f"expected `{_Q_HEADER}` header alone on its line, got {q_line!r}", |
| 75 |
section_id=section_id, |
| 76 |
section_line=it.line_no(), |
| 77 |
) |
| 78 |
it.advance() |
| 79 |
|
| 80 |
question = _read_field_body(it) |
| 81 |
if not question: |
| 82 |
raise InstructionParseError( |
| 83 |
"### Q body is empty", |
| 84 |
section_id=section_id, |
| 85 |
section_line=it.line_no(), |
| 86 |
) |
| 87 |
|
| 88 |
a_line = it.peek_line() |
| 89 |
if a_line is None: |
| 90 |
raise InstructionParseError( |
| 91 |
f"### Q without matching `{_A_HEADER}` at end of section", |
| 92 |
section_id=section_id, |
| 93 |
section_line=it.line_no(), |
| 94 |
) |
| 95 |
if not _is_header(a_line, _A_HEADER): |
| 96 |
raise InstructionParseError( |
| 97 |
f"### Q must be followed by `{_A_HEADER}` alone on its line, got {a_line!r}", |
| 98 |
section_id=section_id, |
| 99 |
section_line=it.line_no(), |
| 100 |
) |
| 101 |
it.advance() |
| 102 |
|
| 103 |
answer = _read_field_body(it) |
| 104 |
if not answer: |
| 105 |
raise InstructionParseError( |
| 106 |
"### A body is empty", |
| 107 |
section_id=section_id, |
| 108 |
section_line=it.line_no(), |
| 109 |
) |
| 110 |
|
| 111 |
return QAPair(question=question, answer=answer) |
| 112 |
|
| 113 |
|
| 114 |
def _read_field_body(it: _PeekableLines) -> str: |
| 115 |
"""Read until a blank line or the start of another header. |
| 116 |
|
| 117 |
The terminating blank line is consumed so the outer loop sees the |
| 118 |
next header directly; headers are left for the outer loop. |
| 119 |
""" |
| 120 |
buf: list[str] = [] |
| 121 |
while not it.eof(): |
| 122 |
line = it.peek_line() |
| 123 |
assert line is not None |
| 124 |
if line.strip() == "": |
| 125 |
it.advance() |
| 126 |
break |
| 127 |
if _is_header(line, _Q_HEADER) or _is_header(line, _A_HEADER): |
| 128 |
break |
| 129 |
buf.append(line) |
| 130 |
it.advance() |
| 131 |
return "\n".join(buf).strip() |
| 132 |
|
| 133 |
|
| 134 |
def _is_header(line: str | None, header: str) -> bool: |
| 135 |
return line is not None and line.strip() == header |
| 136 |
|
| 137 |
|
| 138 |
class _PeekableLines: |
| 139 |
"""Minimal line-at-a-time iterator with 1-indexed line tracking.""" |
| 140 |
|
| 141 |
def __init__(self, lines: list[str]) -> None: |
| 142 |
self._lines = lines |
| 143 |
self._i = 0 |
| 144 |
|
| 145 |
def peek_line(self) -> str | None: |
| 146 |
if self._i >= len(self._lines): |
| 147 |
return None |
| 148 |
return self._lines[self._i] |
| 149 |
|
| 150 |
def advance(self) -> None: |
| 151 |
self._i += 1 |
| 152 |
|
| 153 |
def eof(self) -> bool: |
| 154 |
return self._i >= len(self._lines) |
| 155 |
|
| 156 |
def line_no(self) -> int: |
| 157 |
return self._i + 1 |
| 158 |
|
| 159 |
def skip_blank(self) -> None: |
| 160 |
while not self.eof(): |
| 161 |
line = self.peek_line() |
| 162 |
if line is None or line.strip() != "": |
| 163 |
return |
| 164 |
self.advance() |