| 1 | """Integration test: HF backend scoring methods on a real tiny model. |
| 2 | |
| 3 | Covers ``logprob_of`` / ``rolling_logprob`` / ``next_token_dist`` for |
| 4 | both base and ft views — the surface area sway probes hammer hardest |
| 5 | and the area Audit 01 flagged as 21% covered (C2). |
| 6 | |
| 7 | The zero-token-completion path of ``logprob_of`` (which raises |
| 8 | ``ProbeError``) is exercised here too, since the alternative is the |
| 9 | full CLI integration test catching it for the wrong reason. |
| 10 | |
| 11 | Marked ``slow+online``. |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | |
| 16 | import math |
| 17 | from pathlib import Path |
| 18 | |
| 19 | import numpy as np |
| 20 | import pytest |
| 21 | |
| 22 | from dlm_sway.backends.hf import HuggingFaceDifferentialBackend |
| 23 | from dlm_sway.core.errors import ProbeError |
| 24 | from dlm_sway.core.model import ModelSpec |
| 25 | |
| 26 | pytestmark = [pytest.mark.slow, pytest.mark.online] |
| 27 | |
| 28 | |
| 29 | def _build_random_lora_adapter(base_dir: Path, out_dir: Path) -> None: |
| 30 | """Same shape as the toggle-test adapter.""" |
| 31 | import torch |
| 32 | from peft import LoraConfig, get_peft_model |
| 33 | from transformers import AutoModelForCausalLM, AutoTokenizer |
| 34 | |
| 35 | torch.manual_seed(0) |
| 36 | tokenizer = AutoTokenizer.from_pretrained(str(base_dir)) |
| 37 | if tokenizer.pad_token_id is None: |
| 38 | tokenizer.pad_token = tokenizer.eos_token |
| 39 | base = AutoModelForCausalLM.from_pretrained(str(base_dir), torch_dtype=torch.float32) |
| 40 | cfg = LoraConfig( |
| 41 | r=8, |
| 42 | lora_alpha=16, |
| 43 | target_modules=["q_proj", "v_proj"], |
| 44 | lora_dropout=0.0, |
| 45 | bias="none", |
| 46 | task_type="CAUSAL_LM", |
| 47 | ) |
| 48 | peft_model = get_peft_model(base, cfg) |
| 49 | with torch.no_grad(): |
| 50 | for name, param in peft_model.named_parameters(): |
| 51 | if "lora_B" in name: |
| 52 | param.copy_(torch.randn_like(param) * 0.05) |
| 53 | peft_model.save_pretrained(str(out_dir)) |
| 54 | tokenizer.save_pretrained(str(out_dir)) |
| 55 | |
| 56 | |
| 57 | @pytest.fixture(scope="module") |
| 58 | def random_adapter(tiny_model_dir: Path, tmp_path_factory: pytest.TempPathFactory) -> Path: |
| 59 | adapter_dir = tmp_path_factory.mktemp("scoring-random-adapter") |
| 60 | _build_random_lora_adapter(tiny_model_dir, adapter_dir) |
| 61 | return adapter_dir |
| 62 | |
| 63 | |
| 64 | @pytest.fixture(scope="module") |
| 65 | def hf_backend(tiny_model_dir: Path, random_adapter: Path) -> HuggingFaceDifferentialBackend: |
| 66 | backend = HuggingFaceDifferentialBackend( |
| 67 | base_spec=ModelSpec(base=str(tiny_model_dir), kind="hf", dtype="fp32", device="cpu"), |
| 68 | adapter_path=random_adapter, |
| 69 | ) |
| 70 | yield backend |
| 71 | backend.close() |
| 72 | |
| 73 | |
| 74 | _PROMPTS_AND_COMPLETIONS = [ |
| 75 | ("The capital of France is", " Paris"), |
| 76 | ("Two plus two equals", " four"), |
| 77 | ("The quick brown fox jumps over the", " lazy dog"), |
| 78 | ] |
| 79 | |
| 80 | |
| 81 | class TestLogprobOf: |
| 82 | @pytest.mark.parametrize(("prompt", "completion"), _PROMPTS_AND_COMPLETIONS) |
| 83 | def test_finite_negative_for_real_completions( |
| 84 | self, |
| 85 | hf_backend: HuggingFaceDifferentialBackend, |
| 86 | prompt: str, |
| 87 | completion: str, |
| 88 | ) -> None: |
| 89 | with hf_backend.as_base() as b: |
| 90 | lp_base = b.logprob_of(prompt, completion) |
| 91 | with hf_backend.as_finetuned() as f: |
| 92 | lp_ft = f.logprob_of(prompt, completion) |
| 93 | assert math.isfinite(lp_base) |
| 94 | assert lp_base < 0.0 |
| 95 | assert math.isfinite(lp_ft) |
| 96 | assert lp_ft < 0.0 |
| 97 | |
| 98 | def test_zero_token_completion_raises_probe_error( |
| 99 | self, hf_backend: HuggingFaceDifferentialBackend |
| 100 | ) -> None: |
| 101 | """Empty completion tokenizes to zero new tokens — the entry |
| 102 | point must reject it loudly so a probe can route to ERROR.""" |
| 103 | with hf_backend.as_base() as b: |
| 104 | with pytest.raises(ProbeError, match="completion tokenized to zero"): |
| 105 | b.logprob_of("hello", "") |
| 106 | |
| 107 | def test_longer_completion_is_more_negative( |
| 108 | self, hf_backend: HuggingFaceDifferentialBackend |
| 109 | ) -> None: |
| 110 | """Sanity: extending a completion can only add negative logprob.""" |
| 111 | with hf_backend.as_base() as b: |
| 112 | short = b.logprob_of("the prefix is", " short") |
| 113 | longer = b.logprob_of("the prefix is", " short and gets longer here") |
| 114 | assert longer < short, f"longer={longer}, short={short}" |
| 115 | |
| 116 | |
| 117 | class TestRollingLogprob: |
| 118 | def test_returns_per_position_logprobs_and_finite_summary( |
| 119 | self, hf_backend: HuggingFaceDifferentialBackend |
| 120 | ) -> None: |
| 121 | with hf_backend.as_base() as b: |
| 122 | r = b.rolling_logprob("Hello world. This is a sentence.") |
| 123 | assert r.num_tokens >= 2 |
| 124 | assert r.logprobs.size == r.num_tokens - 1 |
| 125 | assert math.isfinite(r.total_logprob) |
| 126 | assert math.isfinite(r.mean_logprob) |
| 127 | assert math.isfinite(r.perplexity) |
| 128 | assert r.perplexity > 1.0 # any text past one token has PPL > 1 |
| 129 | |
| 130 | def test_short_text_under_two_tokens_returns_empty( |
| 131 | self, hf_backend: HuggingFaceDifferentialBackend |
| 132 | ) -> None: |
| 133 | """Single-token text has no per-position predictions to gather.""" |
| 134 | with hf_backend.as_base() as b: |
| 135 | r = b.rolling_logprob("a") |
| 136 | assert r.logprobs.size == 0 |
| 137 | assert r.total_logprob == 0.0 |
| 138 | |
| 139 | |
| 140 | class TestGenerate: |
| 141 | def test_greedy_generation_returns_string( |
| 142 | self, hf_backend: HuggingFaceDifferentialBackend |
| 143 | ) -> None: |
| 144 | with hf_backend.as_base() as b: |
| 145 | out = b.generate("Hello", max_new_tokens=8, seed=0) |
| 146 | assert isinstance(out, str) |
| 147 | assert len(out) > 0 |
| 148 | |
| 149 | def test_sampled_generation_obeys_seed( |
| 150 | self, hf_backend: HuggingFaceDifferentialBackend |
| 151 | ) -> None: |
| 152 | """``temperature > 0`` engages the sampling path (do_sample=True).""" |
| 153 | with hf_backend.as_base() as b: |
| 154 | a = b.generate("The future of AI is", max_new_tokens=8, temperature=0.7, seed=7) |
| 155 | b1 = b.generate("The future of AI is", max_new_tokens=8, temperature=0.7, seed=7) |
| 156 | assert a == b1, f"sampled generation not deterministic at seed=7: {a!r} vs {b1!r}" |
| 157 | |
| 158 | |
| 159 | class TestNextTokenDist: |
| 160 | def test_top_k_dist_finite_and_sorted(self, hf_backend: HuggingFaceDifferentialBackend) -> None: |
| 161 | with hf_backend.as_base() as b: |
| 162 | d = b.next_token_dist("The capital of France is", top_k=64) |
| 163 | assert d.token_ids.shape == (64,) |
| 164 | assert d.logprobs.shape == (64,) |
| 165 | assert np.all(np.isfinite(d.logprobs)) |
| 166 | # Top-k must arrive in descending probability order. |
| 167 | assert np.all(np.diff(d.logprobs) <= 1e-7) |
| 168 | assert d.vocab_size > 64 |
| 169 | # B6: tail_logprob is None (k covers vocab — won't happen here), |
| 170 | # 0.0 (underflow), or a finite negative log-prob. |
| 171 | assert d.tail_logprob is None or math.isfinite(d.tail_logprob) |
| 172 | |
| 173 | def test_dist_changes_under_adapter(self, hf_backend: HuggingFaceDifferentialBackend) -> None: |
| 174 | prompt = "the adapter influences" |
| 175 | with hf_backend.as_base() as b: |
| 176 | base_dist = b.next_token_dist(prompt, top_k=32) |
| 177 | with hf_backend.as_finetuned() as f: |
| 178 | ft_dist = f.next_token_dist(prompt, top_k=32) |
| 179 | # Either the top-32 token IDs reordered, or at least one logprob |
| 180 | # moved by more than fp32 noise. |
| 181 | same_ids = np.array_equal(base_dist.token_ids, ft_dist.token_ids) |
| 182 | if same_ids: |
| 183 | assert not np.allclose(base_dist.logprobs, ft_dist.logprobs, atol=1e-5) |