| 1 |
"""Generic HTTPS sink — POST to push, GET to pull. |
| 2 |
|
| 3 |
Auth header is pulled from the `DLM_SHARE_AUTH` env var at call time |
| 4 |
(value is used verbatim as the full `Authorization:` header — lets users |
| 5 |
pick `Bearer <token>`, `Basic ...`, or anything else their endpoint |
| 6 |
expects). Missing env var means unauthenticated requests. |
| 7 |
|
| 8 |
Downloads stream in 1 MB chunks so multi-GB packs don't blow memory. |
| 9 |
Uploads use a streaming file handle for the same reason. |
| 10 |
""" |
| 11 |
|
| 12 |
from __future__ import annotations |
| 13 |
|
| 14 |
import logging |
| 15 |
import os |
| 16 |
import urllib.error |
| 17 |
import urllib.request |
| 18 |
from collections.abc import Callable |
| 19 |
from pathlib import Path |
| 20 |
from typing import IO |
| 21 |
|
| 22 |
from dlm.share.errors import SinkError |
| 23 |
|
| 24 |
_LOG = logging.getLogger(__name__) |
| 25 |
|
| 26 |
_CHUNK_BYTES = 1 * 1024 * 1024 # 1 MiB streaming chunks |
| 27 |
_AUTH_ENV = "DLM_SHARE_AUTH" |
| 28 |
_USER_AGENT = "dlm-share/1.0" |
| 29 |
|
| 30 |
ProgressCallback = Callable[[int, int], None] | None |
| 31 |
"""Called as progress(bytes_done, total_bytes). total_bytes may be 0 |
| 32 |
if the server doesn't set Content-Length.""" |
| 33 |
|
| 34 |
|
| 35 |
def push_url(pack_path: Path, url: str, *, progress: ProgressCallback = None) -> None: |
| 36 |
"""Upload a `.dlm.pack` to `url` via HTTP POST. |
| 37 |
|
| 38 |
Request body is the raw pack bytes. `Authorization` header is copied |
| 39 |
from `$DLM_SHARE_AUTH` verbatim when set. `Content-Type` is |
| 40 |
`application/octet-stream` — the endpoint is expected to handle a |
| 41 |
raw binary body, NOT a multipart form. |
| 42 |
|
| 43 |
Raises `SinkError` on HTTP != 2xx or network failure. |
| 44 |
""" |
| 45 |
if not pack_path.is_file(): |
| 46 |
raise SinkError(f"pack file missing: {pack_path}") |
| 47 |
|
| 48 |
total = pack_path.stat().st_size |
| 49 |
if url.startswith("http://"): |
| 50 |
_LOG.warning("url sink: pushing over plaintext HTTP (%s); prefer https://", url) |
| 51 |
|
| 52 |
req = urllib.request.Request( # noqa: S310 — intentional user-supplied URL |
| 53 |
url, |
| 54 |
method="POST", |
| 55 |
headers=_build_headers(total, content_type="application/octet-stream"), |
| 56 |
) |
| 57 |
try: |
| 58 |
with ( |
| 59 |
pack_path.open("rb") as src, |
| 60 |
urllib.request.urlopen( # noqa: S310 |
| 61 |
req, data=_iter_read(src, total, progress), timeout=60 |
| 62 |
) as resp, |
| 63 |
): |
| 64 |
status = resp.status |
| 65 |
if status < 200 or status >= 300: |
| 66 |
raise SinkError(f"url push: HTTP {status} from {url}") |
| 67 |
_LOG.info("url push: %s bytes → %s (HTTP %d)", total, url, status) |
| 68 |
except urllib.error.HTTPError as exc: |
| 69 |
raise SinkError(f"url push: HTTP {exc.code} from {url}: {exc.reason}") from exc |
| 70 |
except urllib.error.URLError as exc: |
| 71 |
raise SinkError(f"url push: network error contacting {url}: {exc.reason}") from exc |
| 72 |
except OSError as exc: |
| 73 |
raise SinkError(f"url push: I/O error reading {pack_path}: {exc}") from exc |
| 74 |
|
| 75 |
|
| 76 |
def pull_url(url: str, out_path: Path, *, progress: ProgressCallback = None) -> int: |
| 77 |
"""Download `url` to `out_path`. Returns bytes written. |
| 78 |
|
| 79 |
Streams in `_CHUNK_BYTES` increments. If the server sets |
| 80 |
Content-Length, `progress` fires with the real total; otherwise |
| 81 |
`progress` sees total=0 and reports bytes done only. |
| 82 |
|
| 83 |
Raises `SinkError` on HTTP != 2xx or network failure. |
| 84 |
""" |
| 85 |
if url.startswith("http://"): |
| 86 |
_LOG.warning("url sink: pulling over plaintext HTTP (%s); prefer https://", url) |
| 87 |
|
| 88 |
req = urllib.request.Request( # noqa: S310 — intentional user-supplied URL |
| 89 |
url, |
| 90 |
method="GET", |
| 91 |
headers=_build_headers(None, content_type=None), |
| 92 |
) |
| 93 |
try: |
| 94 |
with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310 |
| 95 |
status = resp.status |
| 96 |
if status < 200 or status >= 300: |
| 97 |
raise SinkError(f"url pull: HTTP {status} from {url}") |
| 98 |
total = int(resp.headers.get("Content-Length", "0")) |
| 99 |
out_path.parent.mkdir(parents=True, exist_ok=True) |
| 100 |
bytes_written = _stream_to_file(resp, out_path, total, progress) |
| 101 |
_LOG.info("url pull: %s → %s (%d bytes)", url, out_path, bytes_written) |
| 102 |
return bytes_written |
| 103 |
except urllib.error.HTTPError as exc: |
| 104 |
raise SinkError(f"url pull: HTTP {exc.code} from {url}: {exc.reason}") from exc |
| 105 |
except urllib.error.URLError as exc: |
| 106 |
raise SinkError(f"url pull: network error contacting {url}: {exc.reason}") from exc |
| 107 |
except OSError as exc: |
| 108 |
raise SinkError(f"url pull: I/O error writing {out_path}: {exc}") from exc |
| 109 |
|
| 110 |
|
| 111 |
def _build_headers(content_length: int | None, *, content_type: str | None) -> dict[str, str]: |
| 112 |
headers = {"User-Agent": _USER_AGENT} |
| 113 |
if content_type is not None: |
| 114 |
headers["Content-Type"] = content_type |
| 115 |
if content_length is not None: |
| 116 |
headers["Content-Length"] = str(content_length) |
| 117 |
auth = os.environ.get(_AUTH_ENV) |
| 118 |
if auth: |
| 119 |
headers["Authorization"] = auth |
| 120 |
return headers |
| 121 |
|
| 122 |
|
| 123 |
def _iter_read(src: IO[bytes], total: int, progress: ProgressCallback) -> bytes: |
| 124 |
"""Streaming read adapter for urllib's `data=` parameter. |
| 125 |
|
| 126 |
urllib accepts a bytes-or-bytes-iterable. We return the full bytes |
| 127 |
buffer — for very large packs, this could be refactored to use |
| 128 |
`http.client.HTTPConnection.send()` directly, but for v1 a single |
| 129 |
in-memory read simplifies error handling. The progress callback |
| 130 |
is invoked once at 0% and once at 100% to match user expectations. |
| 131 |
""" |
| 132 |
if progress is not None: |
| 133 |
progress(0, total) |
| 134 |
data: bytes = src.read() |
| 135 |
if progress is not None: |
| 136 |
progress(len(data), total) |
| 137 |
return data |
| 138 |
|
| 139 |
|
| 140 |
def _stream_to_file(resp: IO[bytes], out_path: Path, total: int, progress: ProgressCallback) -> int: |
| 141 |
written = 0 |
| 142 |
with out_path.open("wb") as dst: |
| 143 |
while True: |
| 144 |
chunk = resp.read(_CHUNK_BYTES) |
| 145 |
if not chunk: |
| 146 |
break |
| 147 |
dst.write(chunk) |
| 148 |
written += len(chunk) |
| 149 |
if progress is not None: |
| 150 |
progress(written, total) |
| 151 |
return written |