Python · 4718 bytes Raw Blame History
1 """Parse `### Q` / `### A` pairs out of an `::instruction::` section body.
2
3 Grammar (strict):
4
5 ### Q
6 <question body, one or more lines, first blank line ends it>
7 ### A
8 <answer body, same rule>
9 (blank line)
10 ### Q
11 ...
12
13 Rules:
14
15 - Headers must be `### Q` / `### A` alone on their line (leading/trailing
16 whitespace tolerated). Inline content like `### Q what's this?` is a
17 parse error — the body begins on the *next* line.
18 - Every `### Q` must be followed (after its body) by a matching `### A`.
19 An unterminated question, two questions in a row, or a bare `### A`
20 raises `InstructionParseError` with the 1-indexed section-relative line
21 where the violation was detected.
22 - Empty question or empty answer bodies are errors — training on an
23 empty turn is almost always a mistake.
24 - Non-header, non-blank lines outside a field body are errors; prose
25 that isn't part of a turn belongs in a default PROSE section.
26 """
27
28 from __future__ import annotations
29
30 from dataclasses import dataclass
31
32 from dlm.data.errors import InstructionParseError
33
34 _Q_HEADER = "### Q"
35 _A_HEADER = "### A"
36
37
38 @dataclass(frozen=True)
39 class QAPair:
40 """A single instruction turn."""
41
42 question: str
43 answer: str
44
45
46 def parse_instruction_body(body: str, *, section_id: str) -> list[QAPair]:
47 """Return the list of Q/A pairs in `body`.
48
49 `section_id` is stamped onto any raised `InstructionParseError` so
50 the caller can point the user back at the offending `.dlm` section.
51 """
52 lines = body.splitlines()
53 it = _PeekableLines(lines)
54 it.skip_blank()
55
56 pairs: list[QAPair] = []
57 while not it.eof():
58 pairs.append(_parse_pair(it, section_id=section_id))
59 it.skip_blank()
60
61 if not pairs:
62 raise InstructionParseError(
63 "instruction block has no ### Q / ### A pairs",
64 section_id=section_id,
65 section_line=1,
66 )
67 return pairs
68
69
70 def _parse_pair(it: _PeekableLines, *, section_id: str) -> QAPair:
71 q_line = it.peek_line()
72 if not _is_header(q_line, _Q_HEADER):
73 raise InstructionParseError(
74 f"expected `{_Q_HEADER}` header alone on its line, got {q_line!r}",
75 section_id=section_id,
76 section_line=it.line_no(),
77 )
78 it.advance()
79
80 question = _read_field_body(it)
81 if not question:
82 raise InstructionParseError(
83 "### Q body is empty",
84 section_id=section_id,
85 section_line=it.line_no(),
86 )
87
88 a_line = it.peek_line()
89 if a_line is None:
90 raise InstructionParseError(
91 f"### Q without matching `{_A_HEADER}` at end of section",
92 section_id=section_id,
93 section_line=it.line_no(),
94 )
95 if not _is_header(a_line, _A_HEADER):
96 raise InstructionParseError(
97 f"### Q must be followed by `{_A_HEADER}` alone on its line, got {a_line!r}",
98 section_id=section_id,
99 section_line=it.line_no(),
100 )
101 it.advance()
102
103 answer = _read_field_body(it)
104 if not answer:
105 raise InstructionParseError(
106 "### A body is empty",
107 section_id=section_id,
108 section_line=it.line_no(),
109 )
110
111 return QAPair(question=question, answer=answer)
112
113
114 def _read_field_body(it: _PeekableLines) -> str:
115 """Read until a blank line or the start of another header.
116
117 The terminating blank line is consumed so the outer loop sees the
118 next header directly; headers are left for the outer loop.
119 """
120 buf: list[str] = []
121 while not it.eof():
122 line = it.peek_line()
123 assert line is not None
124 if line.strip() == "":
125 it.advance()
126 break
127 if _is_header(line, _Q_HEADER) or _is_header(line, _A_HEADER):
128 break
129 buf.append(line)
130 it.advance()
131 return "\n".join(buf).strip()
132
133
134 def _is_header(line: str | None, header: str) -> bool:
135 return line is not None and line.strip() == header
136
137
138 class _PeekableLines:
139 """Minimal line-at-a-time iterator with 1-indexed line tracking."""
140
141 def __init__(self, lines: list[str]) -> None:
142 self._lines = lines
143 self._i = 0
144
145 def peek_line(self) -> str | None:
146 if self._i >= len(self._lines):
147 return None
148 return self._lines[self._i]
149
150 def advance(self) -> None:
151 self._i += 1
152
153 def eof(self) -> bool:
154 return self._i >= len(self._lines)
155
156 def line_no(self) -> int:
157 return self._i + 1
158
159 def skip_blank(self) -> None:
160 while not self.eof():
161 line = self.peek_line()
162 if line is None or line.strip() != "":
163 return
164 self.advance()