fortrangoingonforty/cue / ef96b06

Browse files

forgot to commit the driver

Authored by espadonne
SHA
ef96b06e783e722ecba6ca825d3911096be092df
Parents
f1b1d29
Tree
8c178b6

1 changed file

StatusFile+-
A src/gesture_dsp/cli.py 404 0
src/gesture_dsp/cli.pyadded
@@ -0,0 +1,404 @@
1
+#!/usr/bin/env python3
2
+#
3
+# Hi; I know this is inefficient;
4
+# Hi; I know it even hangs on run;
5
+# Hi; You know this is insufficient;
6
+# Hey, but you know it runs!
7
+#
8
+#
9
+import argparse
10
+import cv2
11
+import numpy as np
12
+import pyaudio
13
+import wave
14
+import os
15
+from collections import deque
16
+import time
17
+
18
+# for convolution and threading
19
+#
20
+from scipy.signal import fftconvolve
21
+from concurrent.futures import ThreadPoolExecutor
22
+
23
+import soundfile as sf
24
+
25
+from gesture_dsp.dsp_effects import (
26
+    mid_side,
27
+    convolution_reverb,
28
+    bitcrush,
29
+    filters,
30
+    spectral_freeze,
31
+    delay,
32
+    pitch_shift,
33
+)
34
+
35
+
36
+def parse_args():
37
+    parser = argparse.ArgumentParser(
38
+        description="Gesture-based DSP demo: apply DSP effects via camera gestures."
39
+    )
40
+    parser.add_argument(
41
+        "wav",
42
+        help="Path to input stereo WAV file"
43
+    )
44
+    parser.add_argument(
45
+        "--ir",
46
+        help="Path to impulse response WAV file (for convolution reverb)",
47
+        default=None
48
+    )
49
+    return parser.parse_args()
50
+
51
+
52
+# :::
53
+# :::: WRAPPERS FOR THE FO(UR)TRAN MODULES ::::
54
+# ::::: ::::::::::::::::::::::::::::::::::: :::::
55
+#
56
+# NOTE: most of the f2-y wrappers want outbuffa as return
57
+#
58
+
59
+def apply_mid_side(audio_in: np.ndarray, side_gain: float) -> np.ndarray:
60
+    n = audio_in.size // 2
61
+    out = np.empty_like(audio_in, dtype=np.float64)
62
+    out = mid_side.mid_side(audio_in, side_gain, n)
63
+    return out
64
+
65
+
66
+def apply_bitcrush(audio_in: np.ndarray, bit_depth: int = 8) -> np.ndarray:
67
+    out = bitcrush.bitcrush_mod.bitcrush(
68
+        audio_in,
69
+        bit_depth
70
+    )
71
+    return out
72
+
73
+
74
+def apply_lp_filter(audio_in: np.ndarray, cutoff: float, fs: float) -> np.ndarray:
75
+    out = filters.filters_mod.lp_filter(audio_in, cutoff, fs)
76
+    return out
77
+
78
+
79
+def apply_hp_filter(audio_in: np.ndarray, cutoff: float, fs: float) -> np.ndarray:
80
+    out = filters.filters_mod.hp_filter(audio_in, cutoff, fs)
81
+    return out
82
+
83
+
84
+def apply_delay(audio_in: np.ndarray, delay_samps: int = 4410, feedback: float = 0.5) -> np.ndarray:
85
+    out = delay.delay_mod.delay(audio_in, delay_samps, feedback)
86
+    return out
87
+
88
+
89
+def apply_convolution_reverb(audio_in: np.ndarray, ir: np.ndarray) -> np.ndarray:
90
+    return fftconvolve(audio_in, ir, mode='same')
91
+
92
+
93
+def apply_spectral_freeze(audio_in: np.ndarray) -> np.ndarray:
94
+    n = audio_in.size
95
+    out = np.empty(n, dtype=np.float64)
96
+    out = spectral_freeze.spectral_freeze_mod.spectral_freeze(audio_in, n)
97
+    return out
98
+
99
+
100
+def apply_pitch_shift(audio_in: np.ndarray, semitones: int = 4) -> np.ndarray:
101
+    n = audio_in.size
102
+    out = np.empty(n, dtype=np.float64)
103
+    out = pitch_shift.pitch_shift_mod.pitch_shift(audio_in, semitones)
104
+    return out
105
+
106
+
107
+# effect name -> (callable, default params)
108
+EFFECTS = [
109
+    ("mid_side",    lambda buf, fs, ir, p: apply_mid_side(buf, side_gain=min(max(p*2.0, 0.0), 2.0))),
110
+    ("bitcrush",    lambda buf, fs, ir, p: apply_bitcrush(buf, bit_depth=int((p**3)*32)+1)),
111
+    ("lowpass",     lambda buf, fs, ir, p: apply_lp_filter(buf, cutoff=p*6000+400, fs=fs)),
112
+    ("highpass",    lambda buf, fs, ir, p: apply_hp_filter(buf, cutoff=p*6000+200, fs=fs)),
113
+    ("delay",       lambda buf, fs, ir, p: apply_delay(buf, delay_samps=int(fs * (0.2 + 0.6 * p)), feedback=0.3 + (0.6 * p))),
114
+    ("reverb",      lambda buf, fs, ir, p: apply_convolution_reverb(buf, ir)),
115
+    ("spectral_freeze", lambda buf, fs, ir, p: apply_spectral_freeze(buf)),
116
+    ("pitch_shift", lambda buf, fs, ir, p: apply_pitch_shift(buf, semitones=int(p*60-30))),
117
+]
118
+EFFECT_NAMES = [name for name, _ in EFFECTS]
119
+
120
+
121
+class GestureDSP:
122
+    def __init__(self, wav_path: str, ir_path: str = None):
123
+
124
+        self.wf = wave.open(wav_path, 'rb')
125
+        assert self.wf.getnchannels() == 2, "Need stereo WAV"
126
+        self.fs = self.wf.getframerate()
127
+        self.p = pyaudio.PyAudio()
128
+
129
+        self.frames_per_buffer = 4096
130
+        self.stream = self.p.open(
131
+            format=pyaudio.paInt16,
132
+            channels=2,
133
+            rate=self.fs,
134
+            output=True,
135
+            frames_per_buffer=self.frames_per_buffer,
136
+            stream_callback=self._callback
137
+        )
138
+
139
+        num_channels = self.wf.getnchannels()
140
+        max_samples = self.frames_per_buffer * num_channels
141
+        self._in_buffer = np.empty(max_samples, dtype=np.float64)
142
+        self._f64_clip_buffer = np.empty(max_samples, dtype=np.float64)
143
+        self._int16_out_buffer = np.empty(max_samples, dtype=np.int16)
144
+
145
+        # load IR if provided
146
+        if ir_path and os.path.exists(ir_path):
147
+            try:
148
+                with wave.open(ir_path, 'rb') as ir_wf:
149
+                    data = ir_wf.readframes(ir_wf.getnframes())
150
+                    self.ir = np.frombuffer(data, dtype=np.int16).astype(np.float64)
151
+            except wave.Error:
152
+                # fallback for non-PCM WAV formats
153
+                data, sr = sf.read(ir_path, dtype='float64')
154
+
155
+                # flatten multi-channel IR
156
+                if data.ndim > 1:
157
+                    data = data.flatten()
158
+
159
+                # convert float [-1,1] to int16 range
160
+                self.ir = (data * 32767).astype(np.float64)
161
+        else:
162
+            self.ir = np.zeros(1, dtype=np.float64)
163
+
164
+        # out the getgo with number one (or zero if that 
165
+        # floats your boat...)
166
+        self.current_idx = 0
167
+        self.current_param = 0.5
168
+
169
+    def _callback(self, in_data, frame_count, time_info, status):
170
+        raw = self.wf.readframes(frame_count)
171
+        if not raw:
172
+            return (raw, pyaudio.paComplete)
173
+
174
+        # convert int16 input into pre-allocated float buffer
175
+        in_int16 = np.frombuffer(raw, dtype=np.int16)
176
+        self._in_buffer[:in_int16.size] = in_int16.astype(np.float64)
177
+
178
+        # apply DSP
179
+        _, func = EFFECTS[self.current_idx]
180
+        audio_out = func(self._in_buffer, self.fs, self.ir, self.current_param)
181
+
182
+        # clip and convert to int16 into pre-allocated buffer
183
+        np.clip(audio_out, -32768, 32767, out=self._f64_clip_buffer)
184
+        self._int16_out_buffer[:audio_out.size] = self._f64_clip_buffer.astype(np.int16)
185
+
186
+        return (self._int16_out_buffer.tobytes(), pyaudio.paContinue)
187
+
188
+    def start(self):
189
+        self.stream.start_stream()
190
+
191
+    def stop(self):
192
+        self.stream.stop_stream()
193
+        self.stream.close()
194
+        self.wf.close()
195
+        self.p.terminate()
196
+
197
+    def set_effect_by_zone(self, x_center: float, frame_w: int):
198
+        """Divide frame into N vertical zones to pick effect."""
199
+        zone_width = frame_w / len(EFFECT_NAMES)
200
+        idx = int(x_center // zone_width)
201
+        self.current_idx = max(0, min(idx, len(EFFECT_NAMES)-1))
202
+
203
+
204
+# :::
205
+# :::: HAND-DETECTION HELPERS ::::
206
+# ::::: :::::::::::::::::::::: :::::
207
+#
208
+
209
+# god this is ugly;
210
+# god; forgive me;
211
+# god; and I don't
212
+# even believe in you;
213
+# ;
214
+def is_hand_raised(frame):
215
+    blur = cv2.GaussianBlur(frame, (5, 5), 0)
216
+    hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
217
+    lb1, ub1 = np.array([140,50,50]), np.array([179,255,255])
218
+    lb2, ub2 = np.array([0,50,50]),  np.array([10,255,255])
219
+    m1 = cv2.inRange(hsv, lb1, ub1)
220
+    m2 = cv2.inRange(hsv, lb2, ub2)
221
+    mask = cv2.morphologyEx(
222
+        cv2.bitwise_or(m1, m2),
223
+        cv2.MORPH_OPEN,
224
+        np.ones((2,2), np.uint8)
225
+    )
226
+
227
+    cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
228
+    if not cnts:
229
+        return None
230
+
231
+    c = max(cnts, key=cv2.contourArea)
232
+    if cv2.contourArea(c) < 8:
233
+        return None
234
+
235
+    x,y,w,h = cv2.boundingRect(c)
236
+    if (y + h/2) < frame.shape[0] * 0.70:
237
+        return (x, y, w, h)
238
+    return None
239
+
240
+def is_second_glove_raised(frame):
241
+    blur = cv2.GaussianBlur(frame, (5, 5), 0)
242
+    hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
243
+    lb = np.array([35, 50, 50])
244
+    ub = np.array([85, 255, 255])
245
+    mask = cv2.inRange(hsv, lb, ub)
246
+    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2,2), np.uint8))
247
+
248
+    cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
249
+    if not cnts:
250
+        return None
251
+
252
+    # pick the largest contour to avoid multiple detections
253
+    c = max(cnts, key=cv2.contourArea)
254
+    if cv2.contourArea(c) < 100:
255
+        return None
256
+
257
+    x, y, w, h = cv2.boundingRect(c)
258
+    return (x, y, w, h)
259
+
260
+ 
261
+# :::
262
+# :::: MAIN LOOP ::::
263
+# ::::: ::::::::: :::::
264
+#
265
+
266
+def main():
267
+    args = parse_args()
268
+
269
+    cap = cv2.VideoCapture(0)
270
+    if not cap.isOpened():
271
+        print("🔴 Cannot open camera")
272
+        return
273
+
274
+    # offload hand-detection to a worker thread
275
+    # not a clue if I'm doing this right...
276
+    # ...yet!
277
+    # this ain't the therac, chill with your thread shrieks
278
+    executor = ThreadPoolExecutor(max_workers=1)
279
+    prev_future = None
280
+
281
+    # parameter window thresholds for mapping glove height
282
+    param_min_ratio = 0.2
283
+    param_max_ratio = 0.8
284
+
285
+    # smoothing factor for parameter changes
286
+    param_smooth = 0.2
287
+    smoothed_param = 0.0
288
+
289
+    player = GestureDSP(args.wav, ir_path=args.ir)
290
+    player.start()
291
+
292
+    # track recent effect indices for cue detection
293
+    history = deque(maxlen=10)
294
+    cue_mode = False
295
+
296
+    # empty precompute cues
297
+    # idk what i want to do with this realy
298
+    precomputed_cues = {}
299
+    last_cue_time = 0
300
+
301
+    try:
302
+        while True:
303
+            ret, frame = cap.read()
304
+            if not ret:
305
+                break
306
+            frame = cv2.flip(frame, 1)
307
+
308
+            # schedule detection off the main thread
309
+            future = executor.submit(lambda f: (is_hand_raised(f), is_second_glove_raised(f)), frame.copy())
310
+            if prev_future and prev_future.done():
311
+                hand_box, glove_box = prev_future.result()
312
+            else:
313
+                hand_box, glove_box = None, None
314
+            prev_future = future
315
+
316
+            if hand_box:
317
+                x, y, w, h = hand_box
318
+                cx = x + w/2
319
+            else:
320
+                cx = None
321
+            if glove_box:
322
+                x2, y2, w2, h2 = glove_box
323
+                cy = y2 + h2/2
324
+            else:
325
+                cy = None
326
+
327
+            if cx is not None:
328
+                # effect selection
329
+                player.set_effect_by_zone(cx, frame.shape[1])
330
+                effect_idx = player.current_idx
331
+                effect_name = EFFECT_NAMES[effect_idx]
332
+                history.append(effect_idx)
333
+            else:
334
+                effect_name = "none"
335
+
336
+            # map second glove vertical pos to parameter with clamping and smoothing
337
+            if cy is not None:
338
+                h = frame.shape[0]
339
+                min_y = param_min_ratio * h
340
+                max_y = param_max_ratio * h
341
+                if cy <= min_y:
342
+                    raw_param = 1.0
343
+                elif cy >= max_y:
344
+                    raw_param = 0.0
345
+                else:
346
+                    raw_param = 1.0 - (cy - min_y) / (max_y - min_y)
347
+            else:
348
+                raw_param = 0.5
349
+
350
+            # exponential smoothing
351
+            smoothed_param = param_smooth * smoothed_param + (1.0 - param_smooth) * raw_param
352
+            player.current_param = smoothed_param
353
+
354
+            # detect simple cue: same
355
+            # effect selected repeatedly quickly
356
+            # NOTE: I hate this whole thing now
357
+            #
358
+            now = time.time()
359
+            if len(history) == history.maxlen and len(set(history)) == 1 and now - last_cue_time > 2:
360
+                cue_mode = True
361
+                last_cue_time = now
362
+            else:
363
+                cue_mode = False
364
+
365
+            if True:
366
+                # draw detection rectangles
367
+                if hand_box:
368
+                    x, y, w, h = hand_box
369
+                    cv2.rectangle(frame, (x, y), (x+w, y+h), (0,255,0), 2)
370
+                if glove_box:
371
+                    x2, y2, w2, h2 = glove_box
372
+                    cv2.rectangle(frame, (x2, y2), (x2+w2, y2+h2), (180,105,255), 2)
373
+
374
+                # draw effect name
375
+                cv2.putText(frame, f"Effect: {effect_name}", (10,30),
376
+                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
377
+                
378
+                # draw paramabar on right
379
+                ph = int(player.current_param * frame.shape[0])
380
+                cv2.rectangle(frame,
381
+                              (frame.shape[1]-50, frame.shape[0]),
382
+                              (frame.shape[1]-10, frame.shape[0]-ph),
383
+                              (255,0,0), -1)
384
+
385
+                # cue overlay
386
+                if cue_mode:
387
+                    cv2.putText(frame, "CUE TRIGGERED", (frame.shape[1]//2-100,50),
388
+                                cv2.FONT_HERSHEY_COMPLEX, 1.5, (0,0,255), 3)
389
+
390
+            cv2.imshow("Gesture-DSP Demo", frame)
391
+            key = cv2.waitKey(1) & 0xFF
392
+            if key in (ord('q'), 27):
393
+                break
394
+            if not player.stream.is_active():
395
+                break
396
+
397
+    finally:
398
+        player.stop()
399
+        cap.release()
400
+        cv2.destroyAllWindows()
401
+
402
+
403
+if __name__ == "__main__":
404
+    main()