Rust · 22185 bytes Raw Blame History
1 //! Transfer manager - orchestrates control handoff between machines
2
3 #![allow(dead_code)]
4
5 use std::sync::atomic::{AtomicU64, Ordering};
6 use std::time::Instant;
7
8 use tokio::sync::{mpsc, RwLock};
9
10 use hyprkvm_common::protocol::{
11 CursorEntryPos, EnterAckPayload, EnterPayload, LeavePayload, Message,
12 };
13 use hyprkvm_common::{Direction, ModifierState};
14
15 /// Transfer state machine
16 #[derive(Debug, Clone)]
17 pub enum TransferState {
18 /// Normal operation, we have control
19 Local,
20
21 /// Transfer initiated, waiting for ack
22 Initiating {
23 target: Direction,
24 transfer_id: u64,
25 started_at: Instant,
26 /// True if transfer was triggered via keyboard (Super+Arrow)
27 keyboard_initiated: bool,
28 /// If we initiated from ReceivedControl, this is the original source
29 relay_from: Option<Direction>,
30 },
31
32 /// We sent control away, forwarding input (from local devices)
33 RemoteActive {
34 target: Direction,
35 transfer_id: u64,
36 entered_at: Instant,
37 /// True if transfer was triggered via keyboard (Super+Arrow)
38 keyboard_initiated: bool,
39 },
40
41 /// We are relaying input from one machine to another (no local devices)
42 Relaying {
43 from: Direction,
44 to: Direction,
45 transfer_id: u64,
46 entered_at: Instant,
47 },
48
49 /// We received control from another machine
50 ReceivedControl {
51 from: Direction,
52 transfer_id: u64,
53 entered_at: Instant,
54 },
55 }
56
57 impl TransferState {
58 pub fn is_local(&self) -> bool {
59 matches!(self, TransferState::Local)
60 }
61
62 pub fn is_remote_active(&self) -> bool {
63 matches!(self, TransferState::RemoteActive { .. })
64 }
65
66 pub fn is_relaying(&self) -> bool {
67 matches!(self, TransferState::Relaying { .. })
68 }
69
70 pub fn is_receiving(&self) -> bool {
71 matches!(self, TransferState::ReceivedControl { .. })
72 }
73 }
74
75 /// Events from the transfer manager
76 #[derive(Debug, Clone)]
77 pub enum TransferEvent {
78 /// Start capturing and forwarding input (from local devices)
79 /// `keyboard_initiated` is true if the transfer was triggered via keyboard (Super+Arrow),
80 /// false if triggered via CLI or other non-keyboard means
81 StartCapture { direction: Direction, keyboard_initiated: bool },
82 /// Stop capturing, return to local
83 StopCapture,
84 /// Start relaying input from one direction to another (no local devices needed)
85 /// Used when a deviceless machine needs to forward input through
86 StartRelay { from: Direction, to: Direction },
87 /// Stop relaying
88 StopRelay,
89 /// Start injecting received input
90 StartInjection { from: Direction },
91 /// Stop injecting
92 StopInjection,
93 /// Send a message to a peer
94 SendMessage { direction: Direction, message: Message },
95 /// Sync clipboard to remote machine
96 SyncClipboardOutgoing { direction: Direction },
97 }
98
99 /// Manages control transfer between machines
100 pub struct TransferManager {
101 state: RwLock<TransferState>,
102 transfer_id_counter: AtomicU64,
103 event_tx: mpsc::Sender<TransferEvent>,
104 machine_name: String,
105 }
106
107 impl TransferManager {
108 pub fn new(machine_name: String) -> (Self, mpsc::Receiver<TransferEvent>) {
109 let (event_tx, event_rx) = mpsc::channel(32);
110
111 (
112 Self {
113 state: RwLock::new(TransferState::Local),
114 transfer_id_counter: AtomicU64::new(1),
115 event_tx,
116 machine_name,
117 },
118 event_rx,
119 )
120 }
121
122 fn next_transfer_id(&self) -> u64 {
123 self.transfer_id_counter.fetch_add(1, Ordering::Relaxed)
124 }
125
126 /// Get current state
127 pub async fn state(&self) -> TransferState {
128 self.state.read().await.clone()
129 }
130
131 /// Initiate transfer to a direction (mouse or keyboard edge hit)
132 /// `keyboard_initiated` should be true if this was triggered via Super+Arrow keybind,
133 /// false if triggered via CLI, mouse edge, or other non-keyboard means
134 pub async fn initiate_transfer(
135 &self,
136 direction: Direction,
137 cursor_pos: (i32, i32),
138 screen_min_x: i32,
139 screen_min_y: i32,
140 screen_max_x: i32,
141 screen_max_y: i32,
142 keyboard_initiated: bool,
143 ) -> Result<(), TransferError> {
144 let mut state = self.state.write().await;
145
146 // Track if we're initiating from ReceivedControl (for relay mode)
147 let relay_from = match &*state {
148 TransferState::Local => None,
149 TransferState::ReceivedControl { from, .. } => Some(*from),
150 TransferState::Initiating { .. } => {
151 return Err(TransferError::AlreadyTransferring);
152 }
153 TransferState::RemoteActive { .. } => {
154 return Err(TransferError::InvalidState(
155 "Already in remote active state".to_string(),
156 ));
157 }
158 TransferState::Relaying { .. } => {
159 return Err(TransferError::InvalidState(
160 "Already relaying".to_string(),
161 ));
162 }
163 };
164
165 let transfer_id = self.next_transfer_id();
166
167 // Calculate edge-relative cursor position (0.0-1.0 along the edge)
168 let screen_width = (screen_max_x - screen_min_x) as f64;
169 let screen_height = (screen_max_y - screen_min_y) as f64;
170
171 let edge_relative = match direction {
172 Direction::Left | Direction::Right => {
173 // Y position relative to screen height
174 (cursor_pos.1 - screen_min_y) as f64 / screen_height
175 }
176 Direction::Up | Direction::Down => {
177 // X position relative to screen width
178 (cursor_pos.0 - screen_min_x) as f64 / screen_width
179 }
180 }.clamp(0.0, 1.0); // Ensure within valid range
181
182 tracing::info!(
183 "Initiating transfer to {:?}, transfer_id={}",
184 direction,
185 transfer_id
186 );
187
188 // Update state
189 *state = TransferState::Initiating {
190 target: direction,
191 transfer_id,
192 started_at: Instant::now(),
193 keyboard_initiated,
194 relay_from,
195 };
196
197 // Send Enter message
198 let enter = Message::Enter(EnterPayload {
199 from_direction: direction.opposite(),
200 cursor_pos: CursorEntryPos::EdgeRelative(edge_relative),
201 modifiers: ModifierState::default(), // TODO: get actual modifier state
202 transfer_id,
203 });
204
205 self.event_tx
206 .send(TransferEvent::SendMessage {
207 direction,
208 message: enter,
209 })
210 .await
211 .map_err(|_| TransferError::ChannelClosed)?;
212
213 Ok(())
214 }
215
216 /// Handle EnterAck from remote
217 pub async fn handle_enter_ack(&self, ack: EnterAckPayload) -> Result<(), TransferError> {
218 let mut state = self.state.write().await;
219
220 match &*state {
221 TransferState::Initiating {
222 target,
223 transfer_id,
224 keyboard_initiated,
225 relay_from,
226 ..
227 } => {
228 if *transfer_id != ack.transfer_id {
229 tracing::warn!(
230 "EnterAck transfer_id mismatch: expected {}, got {}",
231 transfer_id,
232 ack.transfer_id
233 );
234 return Err(TransferError::TransferIdMismatch);
235 }
236
237 if !ack.success {
238 tracing::warn!("EnterAck rejected: {:?}", ack.error);
239 // If we were relaying, go back to ReceivedControl
240 if let Some(from) = relay_from {
241 *state = TransferState::ReceivedControl {
242 from: *from,
243 transfer_id: *transfer_id,
244 entered_at: Instant::now(),
245 };
246 } else {
247 *state = TransferState::Local;
248 }
249 return Err(TransferError::Rejected(
250 ack.error.unwrap_or_else(|| "Unknown".to_string()),
251 ));
252 }
253
254 let direction = *target;
255 let tid = *transfer_id;
256 let kbd_init = *keyboard_initiated;
257 let from_dir = *relay_from;
258
259 // Check if this is a relay (initiated from ReceivedControl) or direct transfer
260 if let Some(from) = from_dir {
261 tracing::info!(
262 "Relay transfer accepted: {:?} -> {:?}, cursor at {:?}",
263 from,
264 direction,
265 ack.actual_cursor_pos
266 );
267
268 *state = TransferState::Relaying {
269 from,
270 to: direction,
271 transfer_id: tid,
272 entered_at: Instant::now(),
273 };
274
275 // Start relaying input (don't grab local devices, forward from source)
276 self.event_tx
277 .send(TransferEvent::StartRelay { from, to: direction })
278 .await
279 .map_err(|_| TransferError::ChannelClosed)?;
280 } else {
281 tracing::info!(
282 "Transfer accepted, cursor at {:?}, keyboard_initiated={}",
283 ack.actual_cursor_pos,
284 keyboard_initiated
285 );
286
287 *state = TransferState::RemoteActive {
288 target: direction,
289 transfer_id: tid,
290 entered_at: Instant::now(),
291 keyboard_initiated: kbd_init,
292 };
293
294 // Start capturing input from local devices
295 self.event_tx
296 .send(TransferEvent::StartCapture { direction, keyboard_initiated: kbd_init })
297 .await
298 .map_err(|_| TransferError::ChannelClosed)?;
299 }
300
301 // Trigger clipboard sync (if enabled, handled by main loop)
302 self.event_tx
303 .send(TransferEvent::SyncClipboardOutgoing { direction })
304 .await
305 .map_err(|_| TransferError::ChannelClosed)?;
306
307 Ok(())
308 }
309 _ => Err(TransferError::InvalidState(
310 "Not in Initiating state".to_string(),
311 )),
312 }
313 }
314
315 /// Handle incoming Enter from another machine
316 pub async fn handle_enter(
317 &self,
318 from_direction: Direction,
319 payload: EnterPayload,
320 screen_min_x: i32,
321 screen_min_y: i32,
322 screen_max_x: i32,
323 screen_max_y: i32,
324 ) -> Result<(i32, i32), TransferError> {
325 let mut state = self.state.write().await;
326
327 // Validate current state - we can only receive Enter if we're Local or ReceivedControl
328 match &*state {
329 TransferState::Local => {
330 // Normal case - we're idle, ready to receive control
331 }
332 TransferState::ReceivedControl { .. } => {
333 // Already receiving, this is a re-entry - accept it
334 tracing::info!("Re-receiving control (was already in ReceivedControl)");
335 }
336 TransferState::Initiating { .. } => {
337 // We're trying to send control, but they're also trying to send to us
338 // This is a collision - let them win (accept their Enter)
339 tracing::debug!("Enter collision: we were Initiating, accepting their Enter");
340 }
341 TransferState::RemoteActive { .. } => {
342 // We're forwarding to them, but they're sending control back to us
343 // This shouldn't happen normally - they should send Leave, not Enter
344 tracing::warn!("Received Enter while in RemoteActive - unusual but accepting");
345 }
346 TransferState::Relaying { .. } => {
347 // We're relaying input, but receiving a new Enter
348 // This is unusual but we'll accept it
349 tracing::warn!("Received Enter while Relaying - unusual but accepting");
350 }
351 }
352
353 // Calculate actual cursor position using proper screen bounds
354 let screen_width = (screen_max_x - screen_min_x) as f64;
355 let screen_height = (screen_max_y - screen_min_y) as f64;
356
357 // Offset from edge to prevent immediate re-triggering of edge detection
358 const EDGE_INSET: i32 = 30;
359
360 let cursor_pos = match payload.cursor_pos {
361 CursorEntryPos::EdgeRelative(rel) => {
362 // from_direction indicates which edge the cursor enters from
363 // e.g., from_direction=Left means cursor enters at our left edge
364 match from_direction {
365 Direction::Left => {
366 // Cursor enters from left edge - position slightly inward
367 let y = screen_min_y + (rel * screen_height) as i32;
368 (screen_min_x + EDGE_INSET, y)
369 }
370 Direction::Right => {
371 // Cursor enters from right edge - position slightly inward
372 let y = screen_min_y + (rel * screen_height) as i32;
373 (screen_max_x - EDGE_INSET, y)
374 }
375 Direction::Up => {
376 // Cursor enters from top edge - position slightly inward
377 let x = screen_min_x + (rel * screen_width) as i32;
378 (x, screen_min_y + EDGE_INSET)
379 }
380 Direction::Down => {
381 // Cursor enters from bottom edge - position slightly inward
382 let x = screen_min_x + (rel * screen_width) as i32;
383 (x, screen_max_y - EDGE_INSET)
384 }
385 }
386 }
387 CursorEntryPos::Absolute { x, y } => (x, y),
388 };
389
390 tracing::info!(
391 "Receiving control from {:?}, cursor at ({}, {})",
392 from_direction,
393 cursor_pos.0,
394 cursor_pos.1
395 );
396
397 *state = TransferState::ReceivedControl {
398 from: from_direction,
399 transfer_id: payload.transfer_id,
400 entered_at: Instant::now(),
401 };
402
403 // Start injection mode
404 self.event_tx
405 .send(TransferEvent::StartInjection {
406 from: from_direction,
407 })
408 .await
409 .map_err(|_| TransferError::ChannelClosed)?;
410
411 // Send ack
412 let ack = Message::EnterAck(EnterAckPayload {
413 success: true,
414 transfer_id: payload.transfer_id,
415 actual_cursor_pos: Some(cursor_pos),
416 error: None,
417 });
418
419 self.event_tx
420 .send(TransferEvent::SendMessage {
421 direction: from_direction,
422 message: ack,
423 })
424 .await
425 .map_err(|_| TransferError::ChannelClosed)?;
426
427 Ok(cursor_pos)
428 }
429
430 /// Return control to sender (escape hotkey or reverse edge)
431 pub async fn return_control(&self) -> Result<(), TransferError> {
432 let mut state = self.state.write().await;
433
434 match &*state {
435 TransferState::ReceivedControl {
436 from, transfer_id, ..
437 } => {
438 let direction = *from;
439 let tid = *transfer_id;
440
441 tracing::info!("Returning control to {:?}", direction);
442
443 // Trigger clipboard sync before leaving (if enabled, handled by main loop)
444 self.event_tx
445 .send(TransferEvent::SyncClipboardOutgoing { direction })
446 .await
447 .map_err(|_| TransferError::ChannelClosed)?;
448
449 // Send Leave message
450 let leave = Message::Leave(LeavePayload {
451 to_direction: direction,
452 cursor_pos: CursorEntryPos::EdgeRelative(0.5), // Center for now
453 modifiers: ModifierState::default(),
454 transfer_id: tid,
455 });
456
457 self.event_tx
458 .send(TransferEvent::SendMessage {
459 direction,
460 message: leave,
461 })
462 .await
463 .map_err(|_| TransferError::ChannelClosed)?;
464
465 // Stop injection
466 self.event_tx
467 .send(TransferEvent::StopInjection)
468 .await
469 .map_err(|_| TransferError::ChannelClosed)?;
470
471 *state = TransferState::Local;
472 Ok(())
473 }
474 _ => Err(TransferError::InvalidState(
475 "Not receiving control".to_string(),
476 )),
477 }
478 }
479
480 /// Handle incoming Leave (control returning to us)
481 pub async fn handle_leave(&self, payload: LeavePayload) -> Result<(), TransferError> {
482 let mut state = self.state.write().await;
483
484 match &*state {
485 TransferState::RemoteActive { transfer_id, .. } => {
486 if *transfer_id != payload.transfer_id {
487 tracing::warn!("Leave transfer_id mismatch");
488 }
489
490 tracing::info!("Control returned to local");
491
492 // Stop capturing
493 self.event_tx
494 .send(TransferEvent::StopCapture)
495 .await
496 .map_err(|_| TransferError::ChannelClosed)?;
497
498 // Send LeaveAck
499 let direction = payload.to_direction.opposite();
500 self.event_tx
501 .send(TransferEvent::SendMessage {
502 direction,
503 message: Message::LeaveAck,
504 })
505 .await
506 .map_err(|_| TransferError::ChannelClosed)?;
507
508 *state = TransferState::Local;
509 Ok(())
510 }
511 TransferState::Relaying { from, transfer_id, .. } => {
512 if *transfer_id != payload.transfer_id {
513 tracing::warn!("Leave transfer_id mismatch (relay)");
514 }
515
516 let from_dir = *from;
517 tracing::info!("Relay target returned control, resuming ReceivedControl from {:?}", from_dir);
518
519 // Stop relaying
520 self.event_tx
521 .send(TransferEvent::StopRelay)
522 .await
523 .map_err(|_| TransferError::ChannelClosed)?;
524
525 // Send LeaveAck to the target
526 let direction = payload.to_direction.opposite();
527 self.event_tx
528 .send(TransferEvent::SendMessage {
529 direction,
530 message: Message::LeaveAck,
531 })
532 .await
533 .map_err(|_| TransferError::ChannelClosed)?;
534
535 // Resume injection from original source
536 self.event_tx
537 .send(TransferEvent::StartInjection { from: from_dir })
538 .await
539 .map_err(|_| TransferError::ChannelClosed)?;
540
541 *state = TransferState::ReceivedControl {
542 from: from_dir,
543 transfer_id: *transfer_id,
544 entered_at: Instant::now(),
545 };
546 Ok(())
547 }
548 _ => Err(TransferError::InvalidState(
549 "Not in RemoteActive or Relaying state".to_string(),
550 )),
551 }
552 }
553
554 /// Abort any pending transfer (timeout, error)
555 pub async fn abort(&self) {
556 let mut state = self.state.write().await;
557
558 match &*state {
559 TransferState::Initiating { relay_from, .. } => {
560 tracing::warn!("Aborting pending transfer");
561 // If we were initiating from ReceivedControl, go back there
562 if let Some(from) = relay_from {
563 *state = TransferState::ReceivedControl {
564 from: *from,
565 transfer_id: 0,
566 entered_at: Instant::now(),
567 };
568 } else {
569 *state = TransferState::Local;
570 }
571 }
572 TransferState::RemoteActive { .. } => {
573 tracing::warn!("Aborting remote active state");
574 let _ = self.event_tx.send(TransferEvent::StopCapture).await;
575 *state = TransferState::Local;
576 }
577 TransferState::Relaying { from, .. } => {
578 tracing::warn!("Aborting relay state");
579 let _ = self.event_tx.send(TransferEvent::StopRelay).await;
580 // Go back to receiving from original source
581 *state = TransferState::ReceivedControl {
582 from: *from,
583 transfer_id: 0,
584 entered_at: Instant::now(),
585 };
586 }
587 TransferState::ReceivedControl { .. } => {
588 tracing::warn!("Aborting received control state");
589 let _ = self.event_tx.send(TransferEvent::StopInjection).await;
590 *state = TransferState::Local;
591 }
592 TransferState::Local => {}
593 }
594 }
595 }
596
597 #[derive(Debug, thiserror::Error)]
598 pub enum TransferError {
599 #[error("Already transferring")]
600 AlreadyTransferring,
601
602 #[error("Invalid state: {0}")]
603 InvalidState(String),
604
605 #[error("Transfer ID mismatch")]
606 TransferIdMismatch,
607
608 #[error("Transfer rejected: {0}")]
609 Rejected(String),
610
611 #[error("Channel closed")]
612 ChannelClosed,
613
614 }
615