tenseleyflow/hyprkvm / 0092506

Browse files

Add control transfer state machine and networking integration

- Implement TransferManager with state machine (Local, Initiating, RemoteActive, ReceivedControl)
- Add transfer events channel for coordinating capture/injection
- Wire up daemon to accept/connect peer connections
- Handle Enter/EnterAck/Leave/LeaveAck message flow
- Integrate edge detection with transfer initiation
- Add message receiving loop for all connected peers
- Update config to use SocketAddr for neighbor addresses
Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
009250615761b52ab6b48fda514ea6ce6baae0a1
Parents
f0e059f
Tree
d990573

5 changed files

StatusFile+-
M config/hyprkvm.example.toml 7 6
M hyprkvm-daemon/src/config/mod.rs 2 2
M hyprkvm-daemon/src/main.rs 294 20
A hyprkvm-daemon/src/transfer/manager.rs 428 0
M hyprkvm-daemon/src/transfer/mod.rs 3 3
config/hyprkvm.example.tomlmodified
@@ -11,12 +11,13 @@ self_name = "my-machine"
1111
 # Neighbor machines
1212
 # Configure machines adjacent to this one
1313
 
14
-[[machines.neighbors]]
15
-name = "desktop"
16
-direction = "right"  # This machine is to our right
17
-address = "desktop.local:24850"
18
-# Optional: pre-trust fingerprint (skip verification prompt)
19
-# fingerprint = "SHA256:..."
14
+# Example neighbor - uncomment and modify for your setup
15
+# [[machines.neighbors]]
16
+# name = "desktop"
17
+# direction = "right"  # This machine is to our right
18
+# address = "192.168.1.100:24850"
19
+# # Optional: pre-trust fingerprint (skip verification prompt)
20
+# # fingerprint = "SHA256:..."
2021
 
2122
 # [[machines.neighbors]]
2223
 # name = "laptop"
hyprkvm-daemon/src/config/mod.rsmodified
@@ -205,8 +205,8 @@ pub struct NeighborConfig {
205205
     /// Direction relative to this machine
206206
     pub direction: Direction,
207207
 
208
-    /// Address (hostname:port or ip:port)
209
-    pub address: String,
208
+    /// Address (ip:port)
209
+    pub address: SocketAddr,
210210
 
211211
     /// Pre-trusted certificate fingerprint (optional)
212212
     pub fingerprint: Option<String>,
hyprkvm-daemon/src/main.rsmodified
@@ -110,6 +110,13 @@ async fn main() -> anyhow::Result<()> {
110110
 }
111111
 
112112
 async fn run_daemon(config_path: &std::path::Path) -> anyhow::Result<()> {
113
+    use std::collections::HashMap;
114
+    use std::net::SocketAddr;
115
+    use std::sync::Arc;
116
+    use tokio::sync::RwLock;
117
+    use hyprkvm_common::Direction;
118
+    use hyprkvm_common::protocol::{Message, HelloPayload, PROTOCOL_VERSION};
119
+
113120
     // Load or create default config
114121
     let config = match Config::load(config_path) {
115122
         Ok(cfg) => cfg,
@@ -133,48 +140,314 @@ async fn run_daemon(config_path: &std::path::Path) -> anyhow::Result<()> {
133140
         info!("  {} at ({}, {}) {}x{}", mon.name, mon.x, mon.y, mon.width, mon.height);
134141
     }
135142
 
143
+    // Calculate screen bounds
144
+    let screen_width: u32 = monitors.iter().map(|m| m.x as u32 + m.width).max().unwrap_or(1920);
145
+    let screen_height: u32 = monitors.iter().map(|m| m.y as u32 + m.height).max().unwrap_or(1080);
146
+
136147
     // Determine which edges have network neighbors
137148
     let mut enabled_edges = Vec::new();
149
+    let mut neighbor_map: HashMap<Direction, SocketAddr> = HashMap::new();
138150
     for neighbor in &config.machines.neighbors {
139151
         enabled_edges.push(neighbor.direction);
140
-        info!("  Network neighbor: {} ({})", neighbor.name, neighbor.direction);
152
+        neighbor_map.insert(neighbor.direction, neighbor.address);
153
+        info!("  Network neighbor: {} ({}) at {}", neighbor.name, neighbor.direction, neighbor.address);
141154
     }
142155
 
143
-    // If no neighbors configured, enable all edges for testing
156
+    // If no neighbors configured, just run in demo mode
144157
     if enabled_edges.is_empty() {
145
-        info!("No neighbors configured, enabling all edges for testing");
146
-        enabled_edges = vec![
147
-            hyprkvm_common::Direction::Left,
148
-            hyprkvm_common::Direction::Right,
149
-        ];
158
+        info!("No neighbors configured. Add neighbors in config to enable control transfer.");
159
+        enabled_edges = vec![Direction::Left, Direction::Right];
150160
     }
151161
 
152162
     // Start edge capture
153163
     info!("Starting edge capture for: {:?}", enabled_edges);
154164
     let edge_capture = input::EdgeCapture::new(input::EdgeCaptureConfig {
155165
         barrier_size: 1,
156
-        enabled_edges,
166
+        enabled_edges: enabled_edges.clone(),
157167
     })?;
158168
 
169
+    // Create transfer manager
170
+    let (transfer_manager, mut transfer_events) = transfer::TransferManager::new(
171
+        config.machines.self_name.clone(),
172
+    );
173
+    let transfer_manager = Arc::new(transfer_manager);
174
+
175
+    // Connection storage: direction -> peer connection
176
+    let peers: Arc<RwLock<HashMap<Direction, network::FramedConnection>>> =
177
+        Arc::new(RwLock::new(HashMap::new()));
178
+
179
+    // Start network server
180
+    let listen_addr: SocketAddr = format!("0.0.0.0:{}", config.network.listen_port).parse()?;
181
+    let server = network::Server::bind(listen_addr).await?;
182
+    info!("Listening for connections on {}", server.local_addr());
183
+
184
+    // Spawn task to accept incoming connections
185
+    let peers_clone = peers.clone();
186
+    let machine_name = config.machines.self_name.clone();
187
+    let accept_handle = tokio::spawn(async move {
188
+        loop {
189
+            match server.accept().await {
190
+                Ok(mut conn) => {
191
+                    let addr = conn.remote_addr();
192
+                    info!("Incoming connection from {}", addr);
193
+
194
+                    // Receive Hello
195
+                    match conn.recv().await {
196
+                        Ok(Some(Message::Hello(hello))) => {
197
+                            info!("Peer {} connected (protocol v{})", hello.machine_name, hello.protocol_version);
198
+
199
+                            // Send HelloAck
200
+                            let ack = Message::HelloAck(hyprkvm_common::protocol::HelloAckPayload {
201
+                                accepted: true,
202
+                                protocol_version: PROTOCOL_VERSION,
203
+                                machine_name: machine_name.clone(),
204
+                                error: None,
205
+                            });
206
+                            if let Err(e) = conn.send(&ack).await {
207
+                                tracing::error!("Failed to send HelloAck: {}", e);
208
+                                continue;
209
+                            }
210
+
211
+                            // TODO: Determine direction from peer info
212
+                            // For now, assume first connection is from configured neighbor
213
+                            // In production, match by machine name
214
+                        }
215
+                        Ok(Some(other)) => {
216
+                            tracing::warn!("Expected Hello, got {:?}", other);
217
+                        }
218
+                        Ok(None) => {
219
+                            tracing::debug!("Connection closed during handshake");
220
+                        }
221
+                        Err(e) => {
222
+                            tracing::error!("Handshake error: {}", e);
223
+                        }
224
+                    }
225
+                }
226
+                Err(e) => {
227
+                    tracing::error!("Accept error: {}", e);
228
+                }
229
+            }
230
+        }
231
+    });
232
+
233
+    // Channel for incoming messages from peers
234
+    let (peer_msg_tx, mut peer_msg_rx) = tokio::sync::mpsc::channel::<(Direction, Message)>(64);
235
+
236
+    // Connect to configured peers
237
+    for neighbor in &config.machines.neighbors {
238
+        let addr = neighbor.address;
239
+        let direction = neighbor.direction;
240
+        let peers_clone = peers.clone();
241
+        let machine_name = config.machines.self_name.clone();
242
+        let msg_tx = peer_msg_tx.clone();
243
+
244
+        tokio::spawn(async move {
245
+            info!("Connecting to {} at {}...", direction, addr);
246
+            match network::connect(addr).await {
247
+                Ok(mut conn) => {
248
+                    // Send Hello
249
+                    let hello = Message::Hello(HelloPayload {
250
+                        protocol_version: PROTOCOL_VERSION,
251
+                        machine_name: machine_name.clone(),
252
+                        capabilities: vec![],
253
+                    });
254
+
255
+                    if let Err(e) = conn.send(&hello).await {
256
+                        tracing::error!("Failed to send Hello to {}: {}", direction, e);
257
+                        return;
258
+                    }
259
+
260
+                    // Wait for HelloAck
261
+                    match conn.recv().await {
262
+                        Ok(Some(Message::HelloAck(ack))) => {
263
+                            if ack.accepted {
264
+                                info!("Connected to {} ({})", ack.machine_name, direction);
265
+
266
+                                // Split connection: store for sending, spawn receiver
267
+                                // For now, just store and we'll poll in the main loop
268
+                                let mut peers = peers_clone.write().await;
269
+                                peers.insert(direction, conn);
270
+                            } else {
271
+                                tracing::error!("Connection rejected: {:?}", ack.error);
272
+                            }
273
+                        }
274
+                        Ok(Some(other)) => {
275
+                            tracing::warn!("Expected HelloAck, got {:?}", other);
276
+                        }
277
+                        Ok(None) => {
278
+                            tracing::warn!("Connection closed during handshake");
279
+                        }
280
+                        Err(e) => {
281
+                            tracing::error!("Handshake error: {}", e);
282
+                        }
283
+                    }
284
+                }
285
+                Err(e) => {
286
+                    tracing::warn!("Failed to connect to {} ({}): {}", direction, addr, e);
287
+                }
288
+            }
289
+        });
290
+    }
291
+
159292
     // Listen for Hyprland events
160293
     let mut event_stream = hyprland::events::HyprlandEventStream::connect().await?;
161294
 
162
-    info!("Daemon running. Move mouse to screen edges to test. Press Ctrl+C to stop.");
295
+    info!("Daemon running. Move mouse to screen edges to trigger transfer. Press Ctrl+C to stop.");
163296
 
164297
     loop {
165298
         tokio::select! {
166
-            // Check for edge events (non-blocking via channel)
299
+            // Check for edge events and poll peer messages
167300
             _ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {
301
+                // Handle edge events
168302
                 while let Some(edge_event) = edge_capture.try_recv() {
169
-                    info!(
170
-                        "EDGE EVENT: {:?} at ({}, {})",
171
-                        edge_event.direction,
172
-                        edge_event.position.0,
173
-                        edge_event.position.1
174
-                    );
175
-
176
-                    // TODO: Sprint 4 - Trigger network switch
177
-                    // For now, just log it
303
+                    let direction = edge_event.direction;
304
+
305
+                    // Check if we have a peer in this direction
306
+                    let has_peer = {
307
+                        let peers = peers.read().await;
308
+                        peers.contains_key(&direction)
309
+                    };
310
+
311
+                    if has_peer {
312
+                        info!(
313
+                            "EDGE: {:?} at ({}, {}) - initiating transfer",
314
+                            direction,
315
+                            edge_event.position.0,
316
+                            edge_event.position.1
317
+                        );
318
+
319
+                        if let Err(e) = transfer_manager.initiate_transfer(
320
+                            direction,
321
+                            edge_event.position,
322
+                            screen_height,
323
+                            screen_width,
324
+                        ).await {
325
+                            tracing::warn!("Failed to initiate transfer: {}", e);
326
+                        }
327
+                    } else {
328
+                        tracing::debug!(
329
+                            "EDGE: {:?} but no peer connected",
330
+                            direction
331
+                        );
332
+                    }
333
+                }
334
+
335
+                // Poll for incoming messages from peers (non-blocking)
336
+                let directions: Vec<Direction> = {
337
+                    let peers = peers.read().await;
338
+                    peers.keys().cloned().collect()
339
+                };
340
+
341
+                for direction in directions {
342
+                    let mut peers = peers.write().await;
343
+                    if let Some(peer) = peers.get_mut(&direction) {
344
+                        // Try non-blocking receive using tokio timeout
345
+                        match tokio::time::timeout(
346
+                            std::time::Duration::from_millis(1),
347
+                            peer.recv()
348
+                        ).await {
349
+                            Ok(Ok(Some(msg))) => {
350
+                                tracing::debug!("Received from {:?}: {:?}", direction, msg);
351
+                                // Handle incoming message
352
+                                match msg {
353
+                                    Message::Enter(payload) => {
354
+                                        info!("Received Enter from {:?}", direction);
355
+                                        match transfer_manager.handle_enter(
356
+                                            direction,
357
+                                            payload,
358
+                                            screen_width,
359
+                                            screen_height,
360
+                                        ).await {
361
+                                            Ok(pos) => {
362
+                                                info!("Positioned cursor at {:?}", pos);
363
+                                            }
364
+                                            Err(e) => {
365
+                                                tracing::error!("Failed to handle Enter: {}", e);
366
+                                            }
367
+                                        }
368
+                                    }
369
+                                    Message::EnterAck(ack) => {
370
+                                        info!("Received EnterAck: success={}", ack.success);
371
+                                        if let Err(e) = transfer_manager.handle_enter_ack(ack).await {
372
+                                            tracing::error!("Failed to handle EnterAck: {}", e);
373
+                                        }
374
+                                    }
375
+                                    Message::Leave(payload) => {
376
+                                        info!("Received Leave from {:?}", direction);
377
+                                        if let Err(e) = transfer_manager.handle_leave(payload).await {
378
+                                            tracing::error!("Failed to handle Leave: {}", e);
379
+                                        }
380
+                                    }
381
+                                    Message::LeaveAck => {
382
+                                        info!("Received LeaveAck");
383
+                                        // Transfer complete
384
+                                    }
385
+                                    Message::InputEvent(input) => {
386
+                                        tracing::trace!("Received input event: {:?}", input);
387
+                                        // TODO: Inject input via emulation module
388
+                                    }
389
+                                    Message::Ping { timestamp } => {
390
+                                        let _ = peer.send(&Message::Pong { timestamp }).await;
391
+                                    }
392
+                                    Message::Pong { timestamp } => {
393
+                                        tracing::trace!("Pong received, rtt={}ms",
394
+                                            std::time::SystemTime::now()
395
+                                                .duration_since(std::time::UNIX_EPOCH)
396
+                                                .unwrap()
397
+                                                .as_millis() as u64 - timestamp
398
+                                        );
399
+                                    }
400
+                                    _ => {
401
+                                        tracing::debug!("Unhandled message: {:?}", msg);
402
+                                    }
403
+                                }
404
+                            }
405
+                            Ok(Ok(None)) => {
406
+                                // Connection closed
407
+                                info!("Peer {:?} disconnected", direction);
408
+                                peers.remove(&direction);
409
+                            }
410
+                            Ok(Err(e)) => {
411
+                                tracing::error!("Error receiving from {:?}: {}", direction, e);
412
+                                peers.remove(&direction);
413
+                            }
414
+                            Err(_) => {
415
+                                // Timeout - no message available, that's fine
416
+                            }
417
+                        }
418
+                    }
419
+                }
420
+            }
421
+
422
+            // Handle transfer events
423
+            Some(event) = transfer_events.recv() => {
424
+                match event {
425
+                    transfer::TransferEvent::SendMessage { direction, message } => {
426
+                        let mut peers = peers.write().await;
427
+                        if let Some(peer) = peers.get_mut(&direction) {
428
+                            tracing::debug!("Sending {:?} to {:?}", message, direction);
429
+                            if let Err(e) = peer.send(&message).await {
430
+                                tracing::error!("Failed to send message: {}", e);
431
+                            }
432
+                        } else {
433
+                            tracing::warn!("No peer for direction {:?}", direction);
434
+                        }
435
+                    }
436
+                    transfer::TransferEvent::StartCapture { direction } => {
437
+                        info!("Starting input capture for {:?}", direction);
438
+                        // TODO: Implement actual input capture
439
+                        // For now, just log
440
+                    }
441
+                    transfer::TransferEvent::StopCapture => {
442
+                        info!("Stopping input capture");
443
+                    }
444
+                    transfer::TransferEvent::StartInjection { from } => {
445
+                        info!("Starting input injection from {:?}", from);
446
+                        // TODO: Implement actual input injection
447
+                    }
448
+                    transfer::TransferEvent::StopInjection => {
449
+                        info!("Stopping input injection");
450
+                    }
178451
                 }
179452
             }
180453
 
@@ -182,7 +455,7 @@ async fn run_daemon(config_path: &std::path::Path) -> anyhow::Result<()> {
182455
             event = event_stream.next_event() => {
183456
                 match event {
184457
                     Ok(evt) => {
185
-                        tracing::debug!("Hyprland event: {:?}", evt);
458
+                        tracing::trace!("Hyprland event: {:?}", evt);
186459
                     }
187460
                     Err(e) => {
188461
                         tracing::error!("Event error: {e}");
@@ -194,6 +467,7 @@ async fn run_daemon(config_path: &std::path::Path) -> anyhow::Result<()> {
194467
             // Shutdown
195468
             _ = tokio::signal::ctrl_c() => {
196469
                 info!("Shutting down...");
470
+                accept_handle.abort();
197471
                 break;
198472
             }
199473
         }
hyprkvm-daemon/src/transfer/manager.rsadded
@@ -0,0 +1,428 @@
1
+//! Transfer manager - orchestrates control handoff between machines
2
+
3
+use std::sync::atomic::{AtomicU64, Ordering};
4
+use std::sync::Arc;
5
+use std::time::{Duration, Instant};
6
+
7
+use tokio::sync::{mpsc, RwLock};
8
+
9
+use hyprkvm_common::protocol::{
10
+    CursorEntryPos, EnterAckPayload, EnterPayload, LeavePayload, Message,
11
+};
12
+use hyprkvm_common::{Direction, ModifierState};
13
+
14
+/// Transfer state machine
15
+#[derive(Debug, Clone)]
16
+pub enum TransferState {
17
+    /// Normal operation, we have control
18
+    Local,
19
+
20
+    /// Transfer initiated, waiting for ack
21
+    Initiating {
22
+        target: Direction,
23
+        transfer_id: u64,
24
+        started_at: Instant,
25
+    },
26
+
27
+    /// We sent control away, forwarding input
28
+    RemoteActive {
29
+        target: Direction,
30
+        transfer_id: u64,
31
+        entered_at: Instant,
32
+    },
33
+
34
+    /// We received control from another machine
35
+    ReceivedControl {
36
+        from: Direction,
37
+        transfer_id: u64,
38
+        entered_at: Instant,
39
+    },
40
+}
41
+
42
+impl TransferState {
43
+    pub fn is_local(&self) -> bool {
44
+        matches!(self, TransferState::Local)
45
+    }
46
+
47
+    pub fn is_remote_active(&self) -> bool {
48
+        matches!(self, TransferState::RemoteActive { .. })
49
+    }
50
+
51
+    pub fn is_receiving(&self) -> bool {
52
+        matches!(self, TransferState::ReceivedControl { .. })
53
+    }
54
+}
55
+
56
+/// Events from the transfer manager
57
+#[derive(Debug, Clone)]
58
+pub enum TransferEvent {
59
+    /// Start capturing and forwarding input
60
+    StartCapture { direction: Direction },
61
+    /// Stop capturing, return to local
62
+    StopCapture,
63
+    /// Start injecting received input
64
+    StartInjection { from: Direction },
65
+    /// Stop injecting
66
+    StopInjection,
67
+    /// Send a message to a peer
68
+    SendMessage { direction: Direction, message: Message },
69
+}
70
+
71
+/// Manages control transfer between machines
72
+pub struct TransferManager {
73
+    state: RwLock<TransferState>,
74
+    transfer_id_counter: AtomicU64,
75
+    event_tx: mpsc::Sender<TransferEvent>,
76
+    machine_name: String,
77
+}
78
+
79
+impl TransferManager {
80
+    pub fn new(machine_name: String) -> (Self, mpsc::Receiver<TransferEvent>) {
81
+        let (event_tx, event_rx) = mpsc::channel(32);
82
+
83
+        (
84
+            Self {
85
+                state: RwLock::new(TransferState::Local),
86
+                transfer_id_counter: AtomicU64::new(1),
87
+                event_tx,
88
+                machine_name,
89
+            },
90
+            event_rx,
91
+        )
92
+    }
93
+
94
+    fn next_transfer_id(&self) -> u64 {
95
+        self.transfer_id_counter.fetch_add(1, Ordering::Relaxed)
96
+    }
97
+
98
+    /// Get current state
99
+    pub async fn state(&self) -> TransferState {
100
+        self.state.read().await.clone()
101
+    }
102
+
103
+    /// Initiate transfer to a direction (mouse or keyboard edge hit)
104
+    pub async fn initiate_transfer(
105
+        &self,
106
+        direction: Direction,
107
+        cursor_pos: (i32, i32),
108
+        screen_height: u32,
109
+        screen_width: u32,
110
+    ) -> Result<(), TransferError> {
111
+        let mut state = self.state.write().await;
112
+
113
+        // Only transfer from local or received state
114
+        match &*state {
115
+            TransferState::Local | TransferState::ReceivedControl { .. } => {}
116
+            TransferState::Initiating { .. } => {
117
+                return Err(TransferError::AlreadyTransferring);
118
+            }
119
+            TransferState::RemoteActive { .. } => {
120
+                return Err(TransferError::InvalidState(
121
+                    "Already in remote active state".to_string(),
122
+                ));
123
+            }
124
+        }
125
+
126
+        let transfer_id = self.next_transfer_id();
127
+
128
+        // Calculate edge-relative cursor position
129
+        let edge_relative = match direction {
130
+            Direction::Left | Direction::Right => {
131
+                cursor_pos.1 as f64 / screen_height as f64
132
+            }
133
+            Direction::Up | Direction::Down => {
134
+                cursor_pos.0 as f64 / screen_width as f64
135
+            }
136
+        };
137
+
138
+        tracing::info!(
139
+            "Initiating transfer to {:?}, transfer_id={}",
140
+            direction,
141
+            transfer_id
142
+        );
143
+
144
+        // Update state
145
+        *state = TransferState::Initiating {
146
+            target: direction,
147
+            transfer_id,
148
+            started_at: Instant::now(),
149
+        };
150
+
151
+        // Send Enter message
152
+        let enter = Message::Enter(EnterPayload {
153
+            from_direction: direction.opposite(),
154
+            cursor_pos: CursorEntryPos::EdgeRelative(edge_relative),
155
+            modifiers: ModifierState::default(), // TODO: get actual modifier state
156
+            transfer_id,
157
+        });
158
+
159
+        self.event_tx
160
+            .send(TransferEvent::SendMessage {
161
+                direction,
162
+                message: enter,
163
+            })
164
+            .await
165
+            .map_err(|_| TransferError::ChannelClosed)?;
166
+
167
+        Ok(())
168
+    }
169
+
170
+    /// Handle EnterAck from remote
171
+    pub async fn handle_enter_ack(&self, ack: EnterAckPayload) -> Result<(), TransferError> {
172
+        let mut state = self.state.write().await;
173
+
174
+        match &*state {
175
+            TransferState::Initiating {
176
+                target,
177
+                transfer_id,
178
+                ..
179
+            } => {
180
+                if *transfer_id != ack.transfer_id {
181
+                    tracing::warn!(
182
+                        "EnterAck transfer_id mismatch: expected {}, got {}",
183
+                        transfer_id,
184
+                        ack.transfer_id
185
+                    );
186
+                    return Err(TransferError::TransferIdMismatch);
187
+                }
188
+
189
+                if !ack.success {
190
+                    tracing::warn!("EnterAck rejected: {:?}", ack.error);
191
+                    *state = TransferState::Local;
192
+                    return Err(TransferError::Rejected(
193
+                        ack.error.unwrap_or_else(|| "Unknown".to_string()),
194
+                    ));
195
+                }
196
+
197
+                tracing::info!(
198
+                    "Transfer accepted, cursor at {:?}",
199
+                    ack.actual_cursor_pos
200
+                );
201
+
202
+                let direction = *target;
203
+                let tid = *transfer_id;
204
+
205
+                *state = TransferState::RemoteActive {
206
+                    target: direction,
207
+                    transfer_id: tid,
208
+                    entered_at: Instant::now(),
209
+                };
210
+
211
+                // Start capturing input
212
+                self.event_tx
213
+                    .send(TransferEvent::StartCapture { direction })
214
+                    .await
215
+                    .map_err(|_| TransferError::ChannelClosed)?;
216
+
217
+                Ok(())
218
+            }
219
+            _ => Err(TransferError::InvalidState(
220
+                "Not in Initiating state".to_string(),
221
+            )),
222
+        }
223
+    }
224
+
225
+    /// Handle incoming Enter from another machine
226
+    pub async fn handle_enter(
227
+        &self,
228
+        from_direction: Direction,
229
+        payload: EnterPayload,
230
+        screen_width: u32,
231
+        screen_height: u32,
232
+    ) -> Result<(i32, i32), TransferError> {
233
+        let mut state = self.state.write().await;
234
+
235
+        // Calculate actual cursor position
236
+        let cursor_pos = match payload.cursor_pos {
237
+            CursorEntryPos::EdgeRelative(rel) => {
238
+                // Entry is from the perspective of the sender
239
+                // So if they say "from_direction = Right", they're to our right
240
+                // and we should position cursor at our right edge
241
+                match from_direction {
242
+                    Direction::Left => {
243
+                        // They're to our left, cursor enters from left edge
244
+                        let y = (rel * screen_height as f64) as i32;
245
+                        (0, y)
246
+                    }
247
+                    Direction::Right => {
248
+                        let y = (rel * screen_height as f64) as i32;
249
+                        (screen_width as i32 - 1, y)
250
+                    }
251
+                    Direction::Up => {
252
+                        let x = (rel * screen_width as f64) as i32;
253
+                        (x, 0)
254
+                    }
255
+                    Direction::Down => {
256
+                        let x = (rel * screen_width as f64) as i32;
257
+                        (x, screen_height as i32 - 1)
258
+                    }
259
+                }
260
+            }
261
+            CursorEntryPos::Absolute { x, y } => (x, y),
262
+        };
263
+
264
+        tracing::info!(
265
+            "Receiving control from {:?}, cursor at ({}, {})",
266
+            from_direction,
267
+            cursor_pos.0,
268
+            cursor_pos.1
269
+        );
270
+
271
+        *state = TransferState::ReceivedControl {
272
+            from: from_direction,
273
+            transfer_id: payload.transfer_id,
274
+            entered_at: Instant::now(),
275
+        };
276
+
277
+        // Start injection mode
278
+        self.event_tx
279
+            .send(TransferEvent::StartInjection {
280
+                from: from_direction,
281
+            })
282
+            .await
283
+            .map_err(|_| TransferError::ChannelClosed)?;
284
+
285
+        // Send ack
286
+        let ack = Message::EnterAck(EnterAckPayload {
287
+            success: true,
288
+            transfer_id: payload.transfer_id,
289
+            actual_cursor_pos: Some(cursor_pos),
290
+            error: None,
291
+        });
292
+
293
+        self.event_tx
294
+            .send(TransferEvent::SendMessage {
295
+                direction: from_direction,
296
+                message: ack,
297
+            })
298
+            .await
299
+            .map_err(|_| TransferError::ChannelClosed)?;
300
+
301
+        Ok(cursor_pos)
302
+    }
303
+
304
+    /// Return control to sender (escape hotkey or reverse edge)
305
+    pub async fn return_control(&self) -> Result<(), TransferError> {
306
+        let mut state = self.state.write().await;
307
+
308
+        match &*state {
309
+            TransferState::ReceivedControl {
310
+                from, transfer_id, ..
311
+            } => {
312
+                let direction = *from;
313
+                let tid = *transfer_id;
314
+
315
+                tracing::info!("Returning control to {:?}", direction);
316
+
317
+                // Send Leave message
318
+                let leave = Message::Leave(LeavePayload {
319
+                    to_direction: direction,
320
+                    cursor_pos: CursorEntryPos::EdgeRelative(0.5), // Center for now
321
+                    modifiers: ModifierState::default(),
322
+                    transfer_id: tid,
323
+                });
324
+
325
+                self.event_tx
326
+                    .send(TransferEvent::SendMessage {
327
+                        direction,
328
+                        message: leave,
329
+                    })
330
+                    .await
331
+                    .map_err(|_| TransferError::ChannelClosed)?;
332
+
333
+                // Stop injection
334
+                self.event_tx
335
+                    .send(TransferEvent::StopInjection)
336
+                    .await
337
+                    .map_err(|_| TransferError::ChannelClosed)?;
338
+
339
+                *state = TransferState::Local;
340
+                Ok(())
341
+            }
342
+            _ => Err(TransferError::InvalidState(
343
+                "Not receiving control".to_string(),
344
+            )),
345
+        }
346
+    }
347
+
348
+    /// Handle incoming Leave (control returning to us)
349
+    pub async fn handle_leave(&self, payload: LeavePayload) -> Result<(), TransferError> {
350
+        let mut state = self.state.write().await;
351
+
352
+        match &*state {
353
+            TransferState::RemoteActive { transfer_id, .. } => {
354
+                if *transfer_id != payload.transfer_id {
355
+                    tracing::warn!("Leave transfer_id mismatch");
356
+                }
357
+
358
+                tracing::info!("Control returned to local");
359
+
360
+                // Stop capturing
361
+                self.event_tx
362
+                    .send(TransferEvent::StopCapture)
363
+                    .await
364
+                    .map_err(|_| TransferError::ChannelClosed)?;
365
+
366
+                // Send LeaveAck
367
+                let direction = payload.to_direction.opposite();
368
+                self.event_tx
369
+                    .send(TransferEvent::SendMessage {
370
+                        direction,
371
+                        message: Message::LeaveAck,
372
+                    })
373
+                    .await
374
+                    .map_err(|_| TransferError::ChannelClosed)?;
375
+
376
+                *state = TransferState::Local;
377
+                Ok(())
378
+            }
379
+            _ => Err(TransferError::InvalidState(
380
+                "Not in RemoteActive state".to_string(),
381
+            )),
382
+        }
383
+    }
384
+
385
+    /// Abort any pending transfer (timeout, error)
386
+    pub async fn abort(&self) {
387
+        let mut state = self.state.write().await;
388
+
389
+        match &*state {
390
+            TransferState::Initiating { .. } => {
391
+                tracing::warn!("Aborting pending transfer");
392
+                *state = TransferState::Local;
393
+            }
394
+            TransferState::RemoteActive { .. } => {
395
+                tracing::warn!("Aborting remote active state");
396
+                let _ = self.event_tx.send(TransferEvent::StopCapture).await;
397
+                *state = TransferState::Local;
398
+            }
399
+            TransferState::ReceivedControl { .. } => {
400
+                tracing::warn!("Aborting received control state");
401
+                let _ = self.event_tx.send(TransferEvent::StopInjection).await;
402
+                *state = TransferState::Local;
403
+            }
404
+            TransferState::Local => {}
405
+        }
406
+    }
407
+}
408
+
409
+#[derive(Debug, thiserror::Error)]
410
+pub enum TransferError {
411
+    #[error("Already transferring")]
412
+    AlreadyTransferring,
413
+
414
+    #[error("Invalid state: {0}")]
415
+    InvalidState(String),
416
+
417
+    #[error("Transfer ID mismatch")]
418
+    TransferIdMismatch,
419
+
420
+    #[error("Transfer rejected: {0}")]
421
+    Rejected(String),
422
+
423
+    #[error("Channel closed")]
424
+    ChannelClosed,
425
+
426
+    #[error("Timeout")]
427
+    Timeout,
428
+}
hyprkvm-daemon/src/transfer/mod.rsmodified
@@ -2,6 +2,6 @@
22
 //!
33
 //! Manages the transfer of keyboard/mouse control between machines.
44
 
5
-// TODO: Sprint 4 - Implement control transfer
6
-// pub mod manager;
7
-// pub mod state_machine;
5
+pub mod manager;
6
+
7
+pub use manager::{TransferError, TransferEvent, TransferManager, TransferState};