Python · 5699 bytes Raw Blame History
1 """Path confinement, binary-file detection, and size-cap enforcement.
2
3 Three primitives that the expansion loop composes:
4
5 - `confine_path(path, root, strict)` — resolves symlinks and, under
6 strict policy, verifies `resolved.is_relative_to(root)`. Strict
7 mode raises `DirectivePolicyError`; permissive logs a warning for
8 symlink escapes and proceeds.
9 - `is_probably_binary(data)` — NUL-byte scan of the first KiB. The
10 standard heuristic (git, grep). Catches images, archives, compiled
11 objects; misses text-encoded binaries (base64 blobs) but those are
12 legitimately training material.
13 - `enumerate_with_caps(root, include, exclude, max_files,
14 max_bytes_per_file)` — deterministic lexicographic walk with
15 include/exclude glob filtering + size/count caps.
16
17 All three stay pure Python (no third-party deps) so test doubles are
18 easy to set up in `tmp_path`.
19 """
20
21 from __future__ import annotations
22
23 import logging
24 import re
25 from collections.abc import Iterable, Iterator
26 from pathlib import Path
27
28 from dlm.directives.errors import DirectivePolicyError
29
30 __all__ = [
31 "confine_path",
32 "enumerate_matching_files",
33 "is_probably_binary",
34 ]
35
36 _LOG = logging.getLogger(__name__)
37
38 _BINARY_SNIFF_BYTES = 1024
39
40
41 def _compile_glob(pattern: str) -> re.Pattern[str]:
42 """Translate a `**`-aware glob to a regex matching POSIX-style paths.
43
44 Rules:
45 - `**` matches any number of path segments (including zero).
46 - `*` matches any run of non-`/` characters.
47 - `?` matches a single non-`/` character.
48 - Other characters are literal (regex-escaped).
49
50 Trailing-`/**` is treated as "anything beneath this prefix" —
51 `tests/**` matches `tests/a`, `tests/a/b`, etc.
52 """
53 i = 0
54 n = len(pattern)
55 out: list[str] = ["^"]
56 while i < n:
57 c = pattern[i]
58 if c == "*":
59 if i + 1 < n and pattern[i + 1] == "*":
60 out.append(".*")
61 i += 2
62 # consume a trailing `/` after `**` so `tests/**/x` matches
63 # both `tests/x` and `tests/a/b/x`
64 if i < n and pattern[i] == "/":
65 i += 1
66 else:
67 out.append("[^/]*")
68 i += 1
69 elif c == "?":
70 out.append("[^/]")
71 i += 1
72 else:
73 out.append(re.escape(c))
74 i += 1
75 out.append("$")
76 return re.compile("".join(out))
77
78
79 def confine_path(path: Path, root: Path, *, strict: bool) -> Path:
80 """Resolve `path` and, under strict policy, verify containment under
81 `root`. Returns the resolved absolute path.
82
83 The resolve uses `strict=False` on the Path so callers get a
84 meaningful `DirectivePathError` from upstream rather than a
85 `FileNotFoundError` here; the caller validates existence
86 separately via `path.exists()`.
87
88 Permissive policy still *resolves* the path (to normalize ~ and
89 symlinks) but doesn't enforce containment. Symlink escapes under
90 permissive log one WARN so operators see the escape in training
91 logs without failing the run.
92 """
93 resolved = path.expanduser().resolve()
94 root_resolved = root.expanduser().resolve()
95 if strict:
96 try:
97 resolved.relative_to(root_resolved)
98 except ValueError as exc:
99 raise DirectivePolicyError(resolved, root_resolved) from exc
100 else:
101 # Permissive: only log if the path resolves outside `root` AND
102 # the original path started *inside* it (i.e., a symlink escape).
103 # A plain `~/elsewhere` path isn't an escape, just an external
104 # source — that's the whole point of permissive mode.
105 try:
106 path.relative_to(root)
107 except ValueError:
108 pass # not anchored at root → not an escape
109 else:
110 try:
111 resolved.relative_to(root_resolved)
112 except ValueError:
113 _LOG.warning(
114 "directive: symlink at %s escapes %s (permissive: proceeding)",
115 path,
116 root,
117 )
118 return resolved
119
120
121 def is_probably_binary(data: bytes, *, sample: int = _BINARY_SNIFF_BYTES) -> bool:
122 """Return True if `data[:sample]` contains a NUL byte.
123
124 Fast and conservative: UTF-8 text never contains NUL outside
125 explicit escapes, and every common binary format does.
126 """
127 return b"\x00" in data[:sample]
128
129
130 def enumerate_matching_files(
131 root: Path,
132 *,
133 include: tuple[str, ...],
134 exclude: tuple[str, ...],
135 ) -> Iterator[Path]:
136 """Yield files under `root` matching include patterns, skipping
137 excluded ones. Deterministic: lexicographic sort.
138
139 If `root` is a file, yield it iff it matches the filters (path
140 patterns are checked against the name component alone). If `root`
141 is a directory, walk it and match paths relative to `root`.
142
143 Size and count caps are enforced by the caller so skip counts
144 can be recorded in provenance.
145 """
146 if root.is_file():
147 if _matches_filters(root.name, include, exclude):
148 yield root
149 return
150
151 if not root.is_dir():
152 return
153
154 candidates = sorted(p for p in root.rglob("*") if p.is_file())
155 for candidate in candidates:
156 rel = candidate.relative_to(root).as_posix()
157 if _matches_filters(rel, include, exclude):
158 yield candidate
159
160
161 def _matches_filters(rel_path: str, include: Iterable[str], exclude: Iterable[str]) -> bool:
162 """Match rel_path against include (any) and exclude (none)."""
163 if any(_compile_glob(pat).fullmatch(rel_path) for pat in exclude):
164 return False
165 return any(_compile_glob(pat).fullmatch(rel_path) for pat in include)