Python · 5899 bytes Raw Blame History
1 """Generic HTTPS sink — POST to push, GET to pull.
2
3 Auth header is pulled from the `DLM_SHARE_AUTH` env var at call time
4 (value is used verbatim as the full `Authorization:` header — lets users
5 pick `Bearer <token>`, `Basic ...`, or anything else their endpoint
6 expects). Missing env var means unauthenticated requests.
7
8 Downloads stream in 1 MB chunks so multi-GB packs don't blow memory.
9 Uploads use a streaming file handle for the same reason.
10 """
11
12 from __future__ import annotations
13
14 import logging
15 import os
16 import urllib.error
17 import urllib.request
18 from collections.abc import Callable
19 from pathlib import Path
20 from typing import IO
21
22 from dlm.share.errors import SinkError
23
24 _LOG = logging.getLogger(__name__)
25
26 _CHUNK_BYTES = 1 * 1024 * 1024 # 1 MiB streaming chunks
27 _AUTH_ENV = "DLM_SHARE_AUTH"
28 _USER_AGENT = "dlm-share/1.0"
29
30 ProgressCallback = Callable[[int, int], None] | None
31 """Called as progress(bytes_done, total_bytes). total_bytes may be 0
32 if the server doesn't set Content-Length."""
33
34
35 def push_url(pack_path: Path, url: str, *, progress: ProgressCallback = None) -> None:
36 """Upload a `.dlm.pack` to `url` via HTTP POST.
37
38 Request body is the raw pack bytes. `Authorization` header is copied
39 from `$DLM_SHARE_AUTH` verbatim when set. `Content-Type` is
40 `application/octet-stream` — the endpoint is expected to handle a
41 raw binary body, NOT a multipart form.
42
43 Raises `SinkError` on HTTP != 2xx or network failure.
44 """
45 if not pack_path.is_file():
46 raise SinkError(f"pack file missing: {pack_path}")
47
48 total = pack_path.stat().st_size
49 if url.startswith("http://"):
50 _LOG.warning("url sink: pushing over plaintext HTTP (%s); prefer https://", url)
51
52 req = urllib.request.Request( # noqa: S310 — intentional user-supplied URL
53 url,
54 method="POST",
55 headers=_build_headers(total, content_type="application/octet-stream"),
56 )
57 try:
58 with (
59 pack_path.open("rb") as src,
60 urllib.request.urlopen( # noqa: S310
61 req, data=_iter_read(src, total, progress), timeout=60
62 ) as resp,
63 ):
64 status = resp.status
65 if status < 200 or status >= 300:
66 raise SinkError(f"url push: HTTP {status} from {url}")
67 _LOG.info("url push: %s bytes → %s (HTTP %d)", total, url, status)
68 except urllib.error.HTTPError as exc:
69 raise SinkError(f"url push: HTTP {exc.code} from {url}: {exc.reason}") from exc
70 except urllib.error.URLError as exc:
71 raise SinkError(f"url push: network error contacting {url}: {exc.reason}") from exc
72 except OSError as exc:
73 raise SinkError(f"url push: I/O error reading {pack_path}: {exc}") from exc
74
75
76 def pull_url(url: str, out_path: Path, *, progress: ProgressCallback = None) -> int:
77 """Download `url` to `out_path`. Returns bytes written.
78
79 Streams in `_CHUNK_BYTES` increments. If the server sets
80 Content-Length, `progress` fires with the real total; otherwise
81 `progress` sees total=0 and reports bytes done only.
82
83 Raises `SinkError` on HTTP != 2xx or network failure.
84 """
85 if url.startswith("http://"):
86 _LOG.warning("url sink: pulling over plaintext HTTP (%s); prefer https://", url)
87
88 req = urllib.request.Request( # noqa: S310 — intentional user-supplied URL
89 url,
90 method="GET",
91 headers=_build_headers(None, content_type=None),
92 )
93 try:
94 with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310
95 status = resp.status
96 if status < 200 or status >= 300:
97 raise SinkError(f"url pull: HTTP {status} from {url}")
98 total = int(resp.headers.get("Content-Length", "0"))
99 out_path.parent.mkdir(parents=True, exist_ok=True)
100 bytes_written = _stream_to_file(resp, out_path, total, progress)
101 _LOG.info("url pull: %s%s (%d bytes)", url, out_path, bytes_written)
102 return bytes_written
103 except urllib.error.HTTPError as exc:
104 raise SinkError(f"url pull: HTTP {exc.code} from {url}: {exc.reason}") from exc
105 except urllib.error.URLError as exc:
106 raise SinkError(f"url pull: network error contacting {url}: {exc.reason}") from exc
107 except OSError as exc:
108 raise SinkError(f"url pull: I/O error writing {out_path}: {exc}") from exc
109
110
111 def _build_headers(content_length: int | None, *, content_type: str | None) -> dict[str, str]:
112 headers = {"User-Agent": _USER_AGENT}
113 if content_type is not None:
114 headers["Content-Type"] = content_type
115 if content_length is not None:
116 headers["Content-Length"] = str(content_length)
117 auth = os.environ.get(_AUTH_ENV)
118 if auth:
119 headers["Authorization"] = auth
120 return headers
121
122
123 def _iter_read(src: IO[bytes], total: int, progress: ProgressCallback) -> bytes:
124 """Streaming read adapter for urllib's `data=` parameter.
125
126 urllib accepts a bytes-or-bytes-iterable. We return the full bytes
127 buffer — for very large packs, this could be refactored to use
128 `http.client.HTTPConnection.send()` directly, but for v1 a single
129 in-memory read simplifies error handling. The progress callback
130 is invoked once at 0% and once at 100% to match user expectations.
131 """
132 if progress is not None:
133 progress(0, total)
134 data: bytes = src.read()
135 if progress is not None:
136 progress(len(data), total)
137 return data
138
139
140 def _stream_to_file(resp: IO[bytes], out_path: Path, total: int, progress: ProgressCallback) -> int:
141 written = 0
142 with out_path.open("wb") as dst:
143 while True:
144 chunk = resp.read(_CHUNK_BYTES)
145 if not chunk:
146 break
147 dst.write(chunk)
148 written += len(chunk)
149 if progress is not None:
150 progress(written, total)
151 return written