| 1 |
"""Path confinement, binary-file detection, and size-cap enforcement. |
| 2 |
|
| 3 |
Three primitives that the expansion loop composes: |
| 4 |
|
| 5 |
- `confine_path(path, root, strict)` — resolves symlinks and, under |
| 6 |
strict policy, verifies `resolved.is_relative_to(root)`. Strict |
| 7 |
mode raises `DirectivePolicyError`; permissive logs a warning for |
| 8 |
symlink escapes and proceeds. |
| 9 |
- `is_probably_binary(data)` — NUL-byte scan of the first KiB. The |
| 10 |
standard heuristic (git, grep). Catches images, archives, compiled |
| 11 |
objects; misses text-encoded binaries (base64 blobs) but those are |
| 12 |
legitimately training material. |
| 13 |
- `enumerate_with_caps(root, include, exclude, max_files, |
| 14 |
max_bytes_per_file)` — deterministic lexicographic walk with |
| 15 |
include/exclude glob filtering + size/count caps. |
| 16 |
|
| 17 |
All three stay pure Python (no third-party deps) so test doubles are |
| 18 |
easy to set up in `tmp_path`. |
| 19 |
""" |
| 20 |
|
| 21 |
from __future__ import annotations |
| 22 |
|
| 23 |
import logging |
| 24 |
import re |
| 25 |
from collections.abc import Iterable, Iterator |
| 26 |
from pathlib import Path |
| 27 |
|
| 28 |
from dlm.directives.errors import DirectivePolicyError |
| 29 |
|
| 30 |
__all__ = [ |
| 31 |
"confine_path", |
| 32 |
"enumerate_matching_files", |
| 33 |
"is_probably_binary", |
| 34 |
] |
| 35 |
|
| 36 |
_LOG = logging.getLogger(__name__) |
| 37 |
|
| 38 |
_BINARY_SNIFF_BYTES = 1024 |
| 39 |
|
| 40 |
|
| 41 |
def _compile_glob(pattern: str) -> re.Pattern[str]: |
| 42 |
"""Translate a `**`-aware glob to a regex matching POSIX-style paths. |
| 43 |
|
| 44 |
Rules: |
| 45 |
- `**` matches any number of path segments (including zero). |
| 46 |
- `*` matches any run of non-`/` characters. |
| 47 |
- `?` matches a single non-`/` character. |
| 48 |
- Other characters are literal (regex-escaped). |
| 49 |
|
| 50 |
Trailing-`/**` is treated as "anything beneath this prefix" — |
| 51 |
`tests/**` matches `tests/a`, `tests/a/b`, etc. |
| 52 |
""" |
| 53 |
i = 0 |
| 54 |
n = len(pattern) |
| 55 |
out: list[str] = ["^"] |
| 56 |
while i < n: |
| 57 |
c = pattern[i] |
| 58 |
if c == "*": |
| 59 |
if i + 1 < n and pattern[i + 1] == "*": |
| 60 |
out.append(".*") |
| 61 |
i += 2 |
| 62 |
# consume a trailing `/` after `**` so `tests/**/x` matches |
| 63 |
# both `tests/x` and `tests/a/b/x` |
| 64 |
if i < n and pattern[i] == "/": |
| 65 |
i += 1 |
| 66 |
else: |
| 67 |
out.append("[^/]*") |
| 68 |
i += 1 |
| 69 |
elif c == "?": |
| 70 |
out.append("[^/]") |
| 71 |
i += 1 |
| 72 |
else: |
| 73 |
out.append(re.escape(c)) |
| 74 |
i += 1 |
| 75 |
out.append("$") |
| 76 |
return re.compile("".join(out)) |
| 77 |
|
| 78 |
|
| 79 |
def confine_path(path: Path, root: Path, *, strict: bool) -> Path: |
| 80 |
"""Resolve `path` and, under strict policy, verify containment under |
| 81 |
`root`. Returns the resolved absolute path. |
| 82 |
|
| 83 |
The resolve uses `strict=False` on the Path so callers get a |
| 84 |
meaningful `DirectivePathError` from upstream rather than a |
| 85 |
`FileNotFoundError` here; the caller validates existence |
| 86 |
separately via `path.exists()`. |
| 87 |
|
| 88 |
Permissive policy still *resolves* the path (to normalize ~ and |
| 89 |
symlinks) but doesn't enforce containment. Symlink escapes under |
| 90 |
permissive log one WARN so operators see the escape in training |
| 91 |
logs without failing the run. |
| 92 |
""" |
| 93 |
resolved = path.expanduser().resolve() |
| 94 |
root_resolved = root.expanduser().resolve() |
| 95 |
if strict: |
| 96 |
try: |
| 97 |
resolved.relative_to(root_resolved) |
| 98 |
except ValueError as exc: |
| 99 |
raise DirectivePolicyError(resolved, root_resolved) from exc |
| 100 |
else: |
| 101 |
# Permissive: only log if the path resolves outside `root` AND |
| 102 |
# the original path started *inside* it (i.e., a symlink escape). |
| 103 |
# A plain `~/elsewhere` path isn't an escape, just an external |
| 104 |
# source — that's the whole point of permissive mode. |
| 105 |
try: |
| 106 |
path.relative_to(root) |
| 107 |
except ValueError: |
| 108 |
pass # not anchored at root → not an escape |
| 109 |
else: |
| 110 |
try: |
| 111 |
resolved.relative_to(root_resolved) |
| 112 |
except ValueError: |
| 113 |
_LOG.warning( |
| 114 |
"directive: symlink at %s escapes %s (permissive: proceeding)", |
| 115 |
path, |
| 116 |
root, |
| 117 |
) |
| 118 |
return resolved |
| 119 |
|
| 120 |
|
| 121 |
def is_probably_binary(data: bytes, *, sample: int = _BINARY_SNIFF_BYTES) -> bool: |
| 122 |
"""Return True if `data[:sample]` contains a NUL byte. |
| 123 |
|
| 124 |
Fast and conservative: UTF-8 text never contains NUL outside |
| 125 |
explicit escapes, and every common binary format does. |
| 126 |
""" |
| 127 |
return b"\x00" in data[:sample] |
| 128 |
|
| 129 |
|
| 130 |
def enumerate_matching_files( |
| 131 |
root: Path, |
| 132 |
*, |
| 133 |
include: tuple[str, ...], |
| 134 |
exclude: tuple[str, ...], |
| 135 |
) -> Iterator[Path]: |
| 136 |
"""Yield files under `root` matching include patterns, skipping |
| 137 |
excluded ones. Deterministic: lexicographic sort. |
| 138 |
|
| 139 |
If `root` is a file, yield it iff it matches the filters (path |
| 140 |
patterns are checked against the name component alone). If `root` |
| 141 |
is a directory, walk it and match paths relative to `root`. |
| 142 |
|
| 143 |
Size and count caps are enforced by the caller so skip counts |
| 144 |
can be recorded in provenance. |
| 145 |
""" |
| 146 |
if root.is_file(): |
| 147 |
if _matches_filters(root.name, include, exclude): |
| 148 |
yield root |
| 149 |
return |
| 150 |
|
| 151 |
if not root.is_dir(): |
| 152 |
return |
| 153 |
|
| 154 |
candidates = sorted(p for p in root.rglob("*") if p.is_file()) |
| 155 |
for candidate in candidates: |
| 156 |
rel = candidate.relative_to(root).as_posix() |
| 157 |
if _matches_filters(rel, include, exclude): |
| 158 |
yield candidate |
| 159 |
|
| 160 |
|
| 161 |
def _matches_filters(rel_path: str, include: Iterable[str], exclude: Iterable[str]) -> bool: |
| 162 |
"""Match rel_path against include (any) and exclude (none).""" |
| 163 |
if any(_compile_glob(pat).fullmatch(rel_path) for pat in exclude): |
| 164 |
return False |
| 165 |
return any(_compile_glob(pat).fullmatch(rel_path) for pat in include) |