Rust · 96650 bytes Raw Blame History
1 //! HyprKVM Daemon - Main entry point
2 //!
3 //! The daemon handles:
4 //! - Hyprland IPC communication
5 //! - Edge detection (mouse and keyboard)
6 //! - Network connections to peer machines
7 //! - Input capture and injection
8
9 use clap::{Parser, Subcommand};
10 use tracing::{info, Level};
11 use tracing_subscriber::FmtSubscriber;
12
13 mod config;
14 mod hyprland;
15 mod input;
16 mod ipc;
17 mod network;
18 mod state;
19 mod transfer;
20
21 use config::Config;
22
23 /// Convert keycode to human-readable name for logging
24 fn keycode_to_name(keycode: u32) -> &'static str {
25 match keycode {
26 1 => "ESC",
27 14 => "BACKSPACE",
28 15 => "TAB",
29 28 => "ENTER",
30 29 => "LEFTCTRL",
31 42 => "LEFTSHIFT",
32 54 => "RIGHTSHIFT",
33 56 => "LEFTALT",
34 57 => "SPACE",
35 58 => "CAPSLOCK",
36 97 => "RIGHTCTRL",
37 100 => "RIGHTALT",
38 103 => "UP",
39 105 => "LEFT",
40 106 => "RIGHT",
41 108 => "DOWN",
42 125 => "LEFTMETA",
43 126 => "RIGHTMETA",
44 _ => "OTHER",
45 }
46 }
47
48 #[derive(Parser)]
49 #[command(name = "hyprkvm")]
50 #[command(about = "Hyprland-native software KVM switch")]
51 #[command(version)]
52 struct Cli {
53 /// Config file path
54 #[arg(short, long)]
55 config: Option<std::path::PathBuf>,
56
57 /// Increase log verbosity (-v, -vv, -vvv)
58 #[arg(short, long, action = clap::ArgAction::Count)]
59 verbose: u8,
60
61 #[command(subcommand)]
62 command: Commands,
63 }
64
65 #[derive(Subcommand)]
66 enum Commands {
67 /// Start the HyprKVM daemon
68 Daemon,
69
70 /// Show daemon status
71 Status,
72
73 /// Handle a move request (called by keybinding script)
74 Move {
75 /// Direction to move
76 direction: String,
77 },
78
79 /// Configuration management
80 Config {
81 #[command(subcommand)]
82 action: ConfigAction,
83 },
84 }
85
86 #[derive(Subcommand)]
87 enum ConfigAction {
88 /// Show current configuration
89 Show,
90 /// Reload configuration
91 Reload,
92 }
93
94 #[tokio::main]
95 async fn main() -> anyhow::Result<()> {
96 let cli = Cli::parse();
97
98 // Set up logging
99 let log_level = match cli.verbose {
100 0 => Level::INFO,
101 1 => Level::DEBUG,
102 _ => Level::TRACE,
103 };
104
105 FmtSubscriber::builder()
106 .with_max_level(log_level)
107 .with_target(false)
108 .init();
109
110 // Load configuration
111 let config_path = cli.config.unwrap_or_else(|| {
112 dirs::config_dir()
113 .unwrap_or_else(|| std::path::PathBuf::from("."))
114 .join("hyprkvm")
115 .join("hyprkvm.toml")
116 });
117
118 match cli.command {
119 Commands::Daemon => {
120 info!("Starting HyprKVM daemon...");
121 run_daemon(&config_path).await
122 }
123 Commands::Status => {
124 show_status().await
125 }
126 Commands::Move { direction } => {
127 handle_move(&direction).await
128 }
129 Commands::Config { action } => {
130 match action {
131 ConfigAction::Show => show_config(&config_path),
132 ConfigAction::Reload => reload_config().await,
133 }
134 }
135 }
136 }
137
138 async fn run_daemon(config_path: &std::path::Path) -> anyhow::Result<()> {
139 use std::collections::HashMap;
140 use std::net::SocketAddr;
141 use std::sync::Arc;
142 use tokio::sync::RwLock;
143 use hyprkvm_common::Direction;
144 use hyprkvm_common::protocol::{Message, HelloPayload, PROTOCOL_VERSION};
145
146 // Load or create default config
147 let config = match Config::load(config_path) {
148 Ok(cfg) => cfg,
149 Err(e) => {
150 tracing::warn!("Failed to load config: {e}, using defaults");
151 Config::default()
152 }
153 };
154
155 info!("Machine name: {}", config.machines.self_name);
156 info!("Listening on port: {}", config.network.listen_port);
157
158 // Track daemon start time for uptime reporting
159 let daemon_start_time = std::time::Instant::now();
160
161 // State flags for CLI control
162 let barrier_enabled = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
163 let shutdown_requested = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
164
165 // Connect to Hyprland
166 info!("Connecting to Hyprland...");
167 let hypr_client = hyprland::ipc::HyprlandClient::new().await?;
168
169 // Query monitors to validate connection
170 let monitors = hypr_client.monitors().await?;
171 info!("Connected to Hyprland. Monitors: {}", monitors.len());
172 for mon in &monitors {
173 info!(" {} at ({}, {}) {}x{}", mon.name, mon.x, mon.y, mon.width, mon.height);
174 }
175
176 // Calculate screen bounds
177 let screen_width: u32 = monitors.iter().map(|m| m.x as u32 + m.width).max().unwrap_or(1920);
178 let screen_height: u32 = monitors.iter().map(|m| m.y as u32 + m.height).max().unwrap_or(1080);
179
180 // Determine which edges have network neighbors
181 let mut enabled_edges = Vec::new();
182 let mut neighbor_map: HashMap<Direction, SocketAddr> = HashMap::new();
183 for neighbor in &config.machines.neighbors {
184 enabled_edges.push(neighbor.direction);
185 neighbor_map.insert(neighbor.direction, neighbor.address);
186 info!(" Network neighbor: {} ({}) at {}", neighbor.name, neighbor.direction, neighbor.address);
187 }
188
189 // If no neighbors configured, just run in demo mode
190 if enabled_edges.is_empty() {
191 info!("No neighbors configured. Add neighbors in config to enable control transfer.");
192 enabled_edges = vec![Direction::Left, Direction::Right];
193 }
194
195 // Start edge capture
196 info!("Starting edge capture for: {:?}", enabled_edges);
197 let monitor_infos: Vec<input::MonitorInfo> = monitors.iter().map(|m| input::MonitorInfo {
198 name: m.name.clone(),
199 x: m.x,
200 y: m.y,
201 width: m.width,
202 height: m.height,
203 }).collect();
204 let edge_capture = input::EdgeCapture::new(input::EdgeCaptureConfig {
205 barrier_size: 1,
206 enabled_edges: enabled_edges.clone(),
207 monitors: monitor_infos,
208 })?;
209
210 // Create input grabber (for when we send control elsewhere)
211 // Use evdev-based grabber for reliable input capture at kernel level
212 let input_grabber = input::EvdevGrabber::new()?;
213
214 // Create input emulator (for when we receive control from elsewhere)
215 // This is created lazily when we first need to inject
216 let mut input_emulator: Option<input::InputEmulator> = None;
217
218 // Create transfer manager
219 let (transfer_manager, mut transfer_events) = transfer::TransferManager::new(
220 config.machines.self_name.clone(),
221 );
222 let transfer_manager = Arc::new(transfer_manager);
223
224 // Track which direction we're capturing for
225 let mut capture_direction: Option<Direction> = None;
226 let mut input_sequence: u64 = 0;
227
228 // Escape key detection
229 // KEY_SCROLLLOCK = 70 in Linux evdev keycodes
230 const KEY_SCROLLLOCK: u32 = 70;
231 const KEY_LEFTSHIFT: u32 = 42;
232 const KEY_RIGHTSHIFT: u32 = 54;
233 let mut shift_tap_times: Vec<std::time::Instant> = Vec::new();
234 let triple_tap_window = std::time::Duration::from_millis(
235 config.input.escape_hotkey.triple_tap_window_ms
236 );
237
238 // Cursor-based edge detection state
239 let mut last_cursor_pos: Option<(i32, i32)> = None;
240 let mut edge_dwell_start: Option<(Direction, std::time::Instant)> = None;
241 const EDGE_THRESHOLD: i32 = 2; // Pixels from edge to count as "at edge"
242 const EDGE_DWELL_MS: u64 = 50; // How long cursor must be at edge to trigger
243
244 // Cooldown after control returns to prevent immediate bounce-back
245 let mut last_control_return: Option<std::time::Instant> = None;
246 const CONTROL_RETURN_COOLDOWN_MS: u64 = 500; // 500ms cooldown after control returns
247
248 // Connection storage: direction -> peer connection
249 let peers: Arc<RwLock<HashMap<Direction, network::FramedConnection>>> =
250 Arc::new(RwLock::new(HashMap::new()));
251
252 // Start network server
253 let listen_addr: SocketAddr = format!("0.0.0.0:{}", config.network.listen_port).parse()?;
254 let server = network::Server::bind(listen_addr).await?;
255 info!("Listening for connections on {}", server.local_addr());
256
257 // Spawn task to accept incoming connections
258 let machine_name = config.machines.self_name.clone();
259 let neighbors_for_accept = config.machines.neighbors.clone();
260 let peers_for_accept = peers.clone();
261 let accept_handle = tokio::spawn(async move {
262 loop {
263 match server.accept().await {
264 Ok(mut conn) => {
265 let addr = conn.remote_addr();
266 info!("Incoming connection from {}", addr);
267
268 // Receive Hello
269 match conn.recv().await {
270 Ok(Some(Message::Hello(hello))) => {
271 info!("Peer {} connected (protocol v{})", hello.machine_name, hello.protocol_version);
272
273 // Send HelloAck
274 let ack = Message::HelloAck(hyprkvm_common::protocol::HelloAckPayload {
275 accepted: true,
276 protocol_version: PROTOCOL_VERSION,
277 machine_name: machine_name.clone(),
278 error: None,
279 });
280 if let Err(e) = conn.send(&ack).await {
281 tracing::error!("Failed to send HelloAck: {}", e);
282 continue;
283 }
284
285 // Determine direction based on peer's machine name
286 let direction = neighbors_for_accept
287 .iter()
288 .find(|n| n.name == hello.machine_name)
289 .map(|n| n.direction);
290
291 if let Some(dir) = direction {
292 let mut peers = peers_for_accept.write().await;
293 if peers.contains_key(&dir) {
294 info!("Already have connection for {:?}, dropping incoming from {}", dir, hello.machine_name);
295 // Drop the incoming connection, keep the existing one
296 } else {
297 info!("Storing incoming connection from {} as {:?}", hello.machine_name, dir);
298 peers.insert(dir, conn);
299 }
300 } else {
301 tracing::warn!(
302 "Unknown peer '{}' connected - not in neighbors list",
303 hello.machine_name
304 );
305 // Connection will be dropped
306 }
307 }
308 Ok(Some(other)) => {
309 tracing::warn!("Expected Hello, got {:?}", other);
310 }
311 Ok(None) => {
312 tracing::debug!("Connection closed during handshake");
313 }
314 Err(e) => {
315 tracing::error!("Handshake error: {}", e);
316 }
317 }
318 }
319 Err(e) => {
320 tracing::error!("Accept error: {}", e);
321 }
322 }
323 }
324 });
325
326 // Connect to configured peers (with retry loop)
327 for neighbor in &config.machines.neighbors {
328 let addr = neighbor.address;
329 let direction = neighbor.direction;
330 let peers_clone = peers.clone();
331 let machine_name = config.machines.self_name.clone();
332
333 tokio::spawn(async move {
334 loop {
335 // Check if already connected
336 {
337 let peers = peers_clone.read().await;
338 if peers.contains_key(&direction) {
339 // Already connected, wait and check again
340 drop(peers);
341 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
342 continue;
343 }
344 }
345
346 tracing::debug!("Connecting to {} at {}...", direction, addr);
347 match network::connect(addr).await {
348 Ok(mut conn) => {
349 // Send Hello
350 let hello = Message::Hello(HelloPayload {
351 protocol_version: PROTOCOL_VERSION,
352 machine_name: machine_name.clone(),
353 capabilities: vec![],
354 });
355
356 if let Err(e) = conn.send(&hello).await {
357 tracing::error!("Failed to send Hello to {}: {}", direction, e);
358 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
359 continue;
360 }
361
362 // Wait for HelloAck
363 match conn.recv().await {
364 Ok(Some(Message::HelloAck(ack))) => {
365 if ack.accepted {
366 let mut peers = peers_clone.write().await;
367 if peers.contains_key(&direction) {
368 info!("Already have connection for {:?}, dropping outbound to {}", direction, ack.machine_name);
369 // Drop this connection, keep the existing one
370 } else {
371 info!("Connected to {} ({})", ack.machine_name, direction);
372 peers.insert(direction, conn);
373 }
374 // Stay in loop to reconnect if connection drops
375 } else {
376 tracing::error!("Connection rejected: {:?}", ack.error);
377 }
378 }
379 Ok(Some(other)) => {
380 tracing::warn!("Expected HelloAck, got {:?}", other);
381 }
382 Ok(None) => {
383 tracing::warn!("Connection closed during handshake");
384 }
385 Err(e) => {
386 tracing::error!("Handshake error: {}", e);
387 }
388 }
389 }
390 Err(e) => {
391 tracing::debug!("Failed to connect to {} ({}): {}", direction, addr, e);
392 }
393 }
394
395 // Retry after delay
396 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
397 }
398 });
399 }
400
401 // Listen for Hyprland events
402 let mut event_stream = hyprland::events::HyprlandEventStream::connect().await?;
403
404 // Start IPC server for CLI commands
405 let (ipc_tx, mut ipc_rx) = tokio::sync::mpsc::channel::<(
406 hyprkvm_common::protocol::IpcRequest,
407 tokio::sync::oneshot::Sender<hyprkvm_common::protocol::IpcResponse>,
408 )>(16);
409
410 tokio::spawn(async move {
411 let server = match ipc::IpcServer::bind().await {
412 Ok(s) => s,
413 Err(e) => {
414 tracing::error!("Failed to start IPC server: {}", e);
415 return;
416 }
417 };
418
419 loop {
420 match server.accept().await {
421 Ok(mut conn) => {
422 tracing::debug!("IPC: connection accepted");
423 let ipc_tx = ipc_tx.clone();
424 tokio::spawn(async move {
425 match conn.recv().await {
426 Ok(Some(request)) => {
427 tracing::debug!("IPC: received {:?}", request);
428 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
429 if ipc_tx.send((request, resp_tx)).await.is_ok() {
430 tracing::debug!("IPC: sent to main loop, awaiting response");
431 match resp_rx.await {
432 Ok(response) => {
433 tracing::debug!("IPC: got response, sending to client");
434 if let Err(e) = conn.send(&response).await {
435 tracing::error!("IPC: failed to send response: {}", e);
436 }
437 }
438 Err(e) => {
439 tracing::error!("IPC: response channel error: {}", e);
440 }
441 }
442 } else {
443 tracing::error!("IPC: failed to send request to main loop");
444 }
445 }
446 Ok(None) => {
447 tracing::debug!("IPC: connection closed by client");
448 }
449 Err(e) => {
450 tracing::debug!("IPC recv error: {}", e);
451 }
452 }
453 });
454 }
455 Err(e) => {
456 tracing::error!("IPC accept error: {}", e);
457 }
458 }
459 }
460 });
461
462 info!("Daemon running. Move mouse to screen edges to trigger transfer. Press Ctrl+C to stop.");
463
464 loop {
465 tokio::select! {
466 // Check for edge events, grabber events, and poll peer messages
467 _ = tokio::time::sleep(std::time::Duration::from_micros(100)) => {
468 // Forward grabbed input to remote peer
469 if let Some(cap_dir) = capture_direction {
470 let mut should_escape = false;
471
472 // Coalesce motion events - drain queue and accumulate
473 let mut motion_dx: f64 = 0.0;
474 let mut motion_dy: f64 = 0.0;
475 let mut scroll_h: f64 = 0.0;
476 let mut scroll_v: f64 = 0.0;
477 let mut other_events: Vec<input::GrabEvent> = Vec::new();
478
479 while let Some(grab_event) = input_grabber.try_recv() {
480 // Check for escape key before forwarding
481 match &grab_event {
482 input::GrabEvent::KeyDown { keycode } => {
483 tracing::debug!("CAPTURE KeyDown: keycode={} ({})",
484 keycode, keycode_to_name(*keycode));
485
486 // Check for scroll_lock
487 if *keycode == KEY_SCROLLLOCK {
488 info!("Scroll Lock pressed - returning control to local");
489 should_escape = true;
490 continue; // Don't forward this key
491 }
492
493 // Check for triple-tap shift
494 if config.input.escape_hotkey.triple_tap_enabled {
495 if *keycode == KEY_LEFTSHIFT || *keycode == KEY_RIGHTSHIFT {
496 let now = std::time::Instant::now();
497 // Remove old taps outside the window
498 shift_tap_times.retain(|t| now.duration_since(*t) < triple_tap_window);
499 shift_tap_times.push(now);
500
501 if shift_tap_times.len() >= 3 {
502 info!("Triple-tap Shift detected - returning control to local");
503 should_escape = true;
504 shift_tap_times.clear();
505 continue;
506 }
507 }
508 }
509 other_events.push(grab_event);
510 }
511 input::GrabEvent::KeyUp { keycode } => {
512 tracing::debug!("CAPTURE KeyUp: keycode={} ({})",
513 keycode, keycode_to_name(*keycode));
514 other_events.push(grab_event);
515 }
516 input::GrabEvent::PointerMotion { dx, dy } => {
517 motion_dx += dx;
518 motion_dy += dy;
519 }
520 input::GrabEvent::PointerButton { .. } => {
521 other_events.push(grab_event);
522 }
523 input::GrabEvent::Scroll { horizontal, vertical } => {
524 scroll_h += horizontal;
525 scroll_v += vertical;
526 }
527 input::GrabEvent::ModifiersChanged { .. } => {
528 other_events.push(grab_event);
529 }
530 input::GrabEvent::RecoveryHotkey { .. } => {
531 // Should not happen during capture, ignore
532 tracing::warn!("RecoveryHotkey received during capture, ignoring");
533 }
534 }
535 }
536
537 // Send non-motion events first (preserve order for key events)
538 {
539 let mut peers_guard = peers.write().await;
540 if let Some(peer) = peers_guard.get_mut(&cap_dir) {
541 for event in other_events {
542 let payload = event.to_protocol(input_sequence);
543 input_sequence += 1;
544 if let Err(e) = peer.send(&Message::InputEvent(payload)).await {
545 tracing::error!("Failed to send input event: {}", e);
546 }
547 }
548
549 // Send coalesced motion as single event
550 if motion_dx != 0.0 || motion_dy != 0.0 {
551 let motion_event = input::GrabEvent::PointerMotion { dx: motion_dx, dy: motion_dy };
552 let payload = motion_event.to_protocol(input_sequence);
553 input_sequence += 1;
554 if let Err(e) = peer.send(&Message::InputEvent(payload)).await {
555 tracing::error!("Failed to send motion event: {}", e);
556 }
557 }
558
559 // Send coalesced scroll as single event
560 if scroll_h != 0.0 || scroll_v != 0.0 {
561 let scroll_event = input::GrabEvent::Scroll { horizontal: scroll_h, vertical: scroll_v };
562 let payload = scroll_event.to_protocol(input_sequence);
563 input_sequence += 1;
564 if let Err(e) = peer.send(&Message::InputEvent(payload)).await {
565 tracing::error!("Failed to send scroll event: {}", e);
566 }
567 }
568 }
569 }
570
571 // If escape was triggered, stop capture and send Leave
572 if should_escape {
573 info!("Escape triggered - stopping capture");
574 capture_direction = None;
575 input_grabber.stop(None); // No recovery needed for escape
576
577 // Send Leave message - we're leaving in the opposite direction (returning to us)
578 let leave = Message::Leave(hyprkvm_common::protocol::LeavePayload {
579 to_direction: cap_dir.opposite(),
580 cursor_pos: hyprkvm_common::protocol::CursorEntryPos::EdgeRelative(0.5),
581 modifiers: hyprkvm_common::ModifierState::default(),
582 transfer_id: input_sequence, // Use as a simple unique ID
583 });
584 let mut peers_guard = peers.write().await;
585 if let Some(peer) = peers_guard.get_mut(&cap_dir) {
586 if let Err(e) = peer.send(&leave).await {
587 tracing::error!("Failed to send Leave: {}", e);
588 }
589 }
590 }
591 } else {
592 // Not capturing - check for RecoveryHotkey events from recovery mode
593 // These bypass libinput's stale state by detecting keypresses directly at evdev level
594 while let Some(grab_event) = input_grabber.try_recv() {
595 if let input::GrabEvent::RecoveryHotkey { direction } = grab_event {
596 info!("RECOVERY HOTKEY: Super+{:?} detected via evdev", direction);
597
598 // Same at_edge check as IPC Move - only transfer if at edge monitor+window
599 let at_edge = 'edge_check: {
600 // Get monitors and find focused one
601 let monitors = match hypr_client.monitors().await {
602 Ok(m) => m,
603 Err(e) => {
604 info!(" RECOVERY edge_check: monitors query failed: {}", e);
605 break 'edge_check false;
606 }
607 };
608 let focused_monitor = match monitors.iter().find(|m| m.focused) {
609 Some(m) => m,
610 None => {
611 info!(" RECOVERY edge_check: no focused monitor found");
612 break 'edge_check false;
613 }
614 };
615
616 // Check if there's another monitor in the requested direction
617 let has_monitor_in_direction = monitors.iter().any(|m| {
618 if m.id == focused_monitor.id { return false; }
619 match direction {
620 Direction::Left => m.x + m.width as i32 <= focused_monitor.x,
621 Direction::Right => m.x >= focused_monitor.x + focused_monitor.width as i32,
622 Direction::Up => m.y + m.height as i32 <= focused_monitor.y,
623 Direction::Down => m.y >= focused_monitor.y + focused_monitor.height as i32,
624 }
625 });
626
627 if has_monitor_in_direction {
628 info!(" RECOVERY edge_check: has monitor in direction {:?}", direction);
629 break 'edge_check false;
630 }
631
632 // On edge monitor. Check if at edge window.
633 let active_window: serde_json::Value = match hypr_client.query("activewindow").await {
634 Ok(w) => w,
635 Err(e) => {
636 info!(" RECOVERY edge_check: activewindow query failed: {}", e);
637 break 'edge_check false;
638 }
639 };
640
641 let win_x = active_window.get("at").and_then(|a| a.get(0)).and_then(|x| x.as_i64()).unwrap_or(0) as i32;
642 let win_y = active_window.get("at").and_then(|a| a.get(1)).and_then(|y| y.as_i64()).unwrap_or(0) as i32;
643 let win_w = active_window.get("size").and_then(|s| s.get(0)).and_then(|w| w.as_i64()).unwrap_or(100) as i32;
644 let win_h = active_window.get("size").and_then(|s| s.get(1)).and_then(|h| h.as_i64()).unwrap_or(100) as i32;
645
646 // Get all clients (windows)
647 let clients: Vec<serde_json::Value> = match hypr_client.query("clients").await {
648 Ok(c) => c,
649 Err(e) => {
650 info!(" RECOVERY edge_check: clients query failed: {}", e);
651 break 'edge_check false;
652 }
653 };
654
655 // Check if any window is further in the requested direction on same monitor
656 let has_window_in_direction = clients.iter().any(|client| {
657 let mon = client.get("monitor").and_then(|m| m.as_i64()).unwrap_or(-1) as i32;
658 if mon != focused_monitor.id { return false; }
659
660 let cx = client.get("at").and_then(|a| a.get(0)).and_then(|x| x.as_i64()).unwrap_or(0) as i32;
661 let cy = client.get("at").and_then(|a| a.get(1)).and_then(|y| y.as_i64()).unwrap_or(0) as i32;
662 let cw = client.get("size").and_then(|s| s.get(0)).and_then(|w| w.as_i64()).unwrap_or(0) as i32;
663 let ch = client.get("size").and_then(|s| s.get(1)).and_then(|h| h.as_i64()).unwrap_or(0) as i32;
664
665 match direction {
666 Direction::Left => cx + cw < win_x + 10,
667 Direction::Right => cx > win_x + win_w - 10,
668 Direction::Up => cy + ch < win_y + 10,
669 Direction::Down => cy > win_y + win_h - 10,
670 }
671 });
672
673 info!(" RECOVERY edge_check: has_window_in_direction={} -> at_edge={}", has_window_in_direction, !has_window_in_direction);
674 !has_window_in_direction
675 };
676
677 // Check if we have a peer in this direction
678 let has_peer = {
679 let peers = peers.read().await;
680 peers.contains_key(&direction)
681 };
682
683 if at_edge && has_peer {
684 // Get cursor position for transfer
685 let cursor_pos = hypr_client.cursor_pos().await
686 .map(|c| (c.x, c.y))
687 .unwrap_or((0, 0));
688
689 info!("RECOVERY HOTKEY: At edge with peer, initiating transfer to {:?}", direction);
690 if let Err(e) = transfer_manager.initiate_transfer(
691 direction,
692 cursor_pos,
693 screen_height,
694 screen_width,
695 ).await {
696 tracing::error!("Failed to initiate transfer from recovery hotkey: {}", e);
697 }
698 } else if !at_edge {
699 // Not at edge - need to do movefocus ourselves because libinput
700 // DROPPED the keypress due to stale state (it thinks the arrow key
701 // is still pressed from before the grab). This is the whole reason
702 // recovery mode exists.
703 let hypr_dir = match direction {
704 Direction::Left => "l",
705 Direction::Right => "r",
706 Direction::Up => "u",
707 Direction::Down => "d",
708 };
709 info!("RECOVERY HOTKEY: Not at edge, doing movefocus {} (libinput dropped the keypress)", hypr_dir);
710 match hypr_client.dispatch("movefocus", hypr_dir).await {
711 Ok(()) => info!(" RECOVERY movefocus succeeded"),
712 Err(e) => tracing::error!(" RECOVERY movefocus failed: {}", e),
713 }
714 } else {
715 info!("RECOVERY HOTKEY: No peer in direction {:?}", direction);
716 }
717 }
718 }
719 }
720
721 // Handle edge events from layer-shell barriers (for inter-monitor edges)
722 while let Some(edge_event) = edge_capture.try_recv() {
723 let direction = edge_event.direction;
724
725 // Check if we have a peer in this direction
726 let has_peer = {
727 let peers = peers.read().await;
728 peers.contains_key(&direction)
729 };
730
731 if has_peer {
732 // Check if we're in ReceivedControl state from this direction
733 // If so, return control instead of initiating a new transfer
734 let current_state = transfer_manager.state().await;
735 if let transfer::TransferState::ReceivedControl { from, .. } = current_state {
736 if from == direction {
737 info!(
738 "EDGE: {:?} at ({}, {}) - returning control",
739 direction,
740 edge_event.position.0,
741 edge_event.position.1
742 );
743 if let Err(e) = transfer_manager.return_control().await {
744 tracing::warn!("Failed to return control: {}", e);
745 }
746 continue;
747 }
748 }
749
750 // Check cooldown to prevent bounce-back loops
751 if let Some(last_return) = last_control_return {
752 if last_return.elapsed().as_millis() < CONTROL_RETURN_COOLDOWN_MS as u128 {
753 tracing::debug!("EDGE: {:?} - in cooldown, ignoring", direction);
754 continue;
755 }
756 }
757
758 info!(
759 "EDGE: {:?} at ({}, {}) - initiating transfer",
760 direction,
761 edge_event.position.0,
762 edge_event.position.1
763 );
764
765 if let Err(e) = transfer_manager.initiate_transfer(
766 direction,
767 edge_event.position,
768 screen_height,
769 screen_width,
770 ).await {
771 tracing::warn!("Failed to initiate transfer: {}", e);
772 }
773 } else {
774 tracing::debug!(
775 "EDGE: {:?} but no peer connected",
776 direction
777 );
778 }
779 }
780
781 // Cursor-based edge detection (for absolute screen edges)
782 // This catches the case where cursor is at the edge and can't go further
783 if capture_direction.is_none() {
784 if let Ok(cursor) = hypr_client.cursor_pos().await {
785 let (cx, cy) = (cursor.x, cursor.y);
786
787 // Determine if cursor is at a screen edge
788 let at_edge: Option<Direction> = if cx <= EDGE_THRESHOLD {
789 Some(Direction::Left)
790 } else if cx >= screen_width as i32 - EDGE_THRESHOLD {
791 Some(Direction::Right)
792 } else if cy <= EDGE_THRESHOLD {
793 Some(Direction::Up)
794 } else if cy >= screen_height as i32 - EDGE_THRESHOLD {
795 Some(Direction::Down)
796 } else {
797 None
798 };
799
800 // Check if we should trigger based on dwell time and movement
801 if let Some(edge_dir) = at_edge {
802 // Only care about edges with neighbors
803 if enabled_edges.contains(&edge_dir) {
804 let now = std::time::Instant::now();
805
806 // Check if cursor is moving toward the edge (or staying at it)
807 let moving_toward_edge = if let Some((last_x, last_y)) = last_cursor_pos {
808 match edge_dir {
809 Direction::Left => cx <= last_x,
810 Direction::Right => cx >= last_x,
811 Direction::Up => cy <= last_y,
812 Direction::Down => cy >= last_y,
813 }
814 } else {
815 true
816 };
817
818 if moving_toward_edge {
819 match &edge_dwell_start {
820 Some((dir, start)) if *dir == edge_dir => {
821 // Already tracking this edge, check if dwell time exceeded
822 if now.duration_since(*start).as_millis() >= EDGE_DWELL_MS as u128 {
823 // Trigger!
824 let has_peer = {
825 let peers = peers.read().await;
826 peers.contains_key(&edge_dir)
827 };
828
829 if has_peer {
830 // Check if we're in ReceivedControl state from this direction
831 let current_state = transfer_manager.state().await;
832 if let transfer::TransferState::ReceivedControl { from, .. } = current_state {
833 if from == edge_dir {
834 info!(
835 "CURSOR EDGE: {:?} at ({}, {}) - returning control",
836 edge_dir, cx, cy
837 );
838 if let Err(e) = transfer_manager.return_control().await {
839 tracing::warn!("Failed to return control: {}", e);
840 }
841 edge_dwell_start = None;
842 continue;
843 }
844 }
845
846 // Check cooldown to prevent bounce-back
847 if let Some(last_return) = last_control_return {
848 if last_return.elapsed().as_millis() < CONTROL_RETURN_COOLDOWN_MS as u128 {
849 tracing::debug!("CURSOR EDGE: {:?} - in cooldown", edge_dir);
850 edge_dwell_start = None;
851 continue;
852 }
853 }
854
855 info!(
856 "CURSOR EDGE: {:?} at ({}, {}) - initiating transfer",
857 edge_dir, cx, cy
858 );
859
860 if let Err(e) = transfer_manager.initiate_transfer(
861 edge_dir,
862 (cx, cy),
863 screen_height,
864 screen_width,
865 ).await {
866 tracing::warn!("Failed to initiate transfer: {}", e);
867 }
868 } else {
869 tracing::debug!(
870 "CURSOR EDGE: {:?} at ({}, {}) but no peer connected",
871 edge_dir, cx, cy
872 );
873 }
874
875 // Reset to avoid repeated triggers
876 edge_dwell_start = None;
877 }
878 }
879 _ => {
880 // Start tracking this edge
881 tracing::trace!("Started edge dwell tracking for {:?} at ({}, {})", edge_dir, cx, cy);
882 edge_dwell_start = Some((edge_dir, now));
883 }
884 }
885 } else {
886 // Moving away from edge, reset
887 edge_dwell_start = None;
888 }
889 }
890 } else {
891 // Not at any edge, reset
892 edge_dwell_start = None;
893 }
894
895 last_cursor_pos = Some((cx, cy));
896 }
897 }
898
899 // Check for transfer timeout (stuck in Initiating state)
900 if let transfer::TransferState::Initiating { started_at, .. } = transfer_manager.state().await {
901 const TRANSFER_TIMEOUT_MS: u128 = 3000;
902 if started_at.elapsed().as_millis() > TRANSFER_TIMEOUT_MS {
903 tracing::warn!("Transfer timed out after {}ms, aborting", TRANSFER_TIMEOUT_MS);
904 transfer_manager.abort().await;
905 }
906 }
907
908 // Poll for incoming messages from peers (non-blocking)
909 let directions: Vec<Direction> = {
910 let peers = peers.read().await;
911 peers.keys().cloned().collect()
912 };
913
914 // Debug: log state and peers occasionally (every ~5 seconds at 100μs polling)
915 static POLL_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
916 let count = POLL_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
917 if count % 50000 == 0 {
918 let state = transfer_manager.state().await;
919 tracing::info!("Poll #{}: state={:?}, peers={:?}", count, state, directions);
920 }
921
922 for direction in directions {
923 let mut peers = peers.write().await;
924 if let Some(peer) = peers.get_mut(&direction) {
925 // Try non-blocking receive using tokio timeout
926 // Use minimal timeout to avoid blocking the event loop
927 match tokio::time::timeout(
928 std::time::Duration::from_micros(50),
929 peer.recv()
930 ).await {
931 Ok(Ok(Some(msg))) => {
932 tracing::debug!("Received from {:?}: {:?}", direction, msg);
933 // Handle incoming message
934 match msg {
935 Message::Enter(payload) => {
936 info!("Received Enter from {:?}", direction);
937 match transfer_manager.handle_enter(
938 direction,
939 payload,
940 screen_width,
941 screen_height,
942 ).await {
943 Ok(pos) => {
944 info!("Positioned cursor at {:?}", pos);
945 }
946 Err(e) => {
947 tracing::error!("Failed to handle Enter: {}", e);
948 }
949 }
950 }
951 Message::EnterAck(ack) => {
952 info!("Received EnterAck: success={}", ack.success);
953 if let Err(e) = transfer_manager.handle_enter_ack(ack).await {
954 // Usually a benign race condition (collision resolved)
955 tracing::debug!("Failed to handle EnterAck: {}", e);
956 }
957 }
958 Message::Leave(payload) => {
959 info!("Received Leave from {:?}", direction);
960 if let Err(e) = transfer_manager.handle_leave(payload).await {
961 // Usually a benign race condition
962 tracing::debug!("Failed to handle Leave: {}", e);
963 }
964 // Set cooldown to prevent bounce-back loop
965 // When we receive Leave, control is returning to us
966 last_control_return = Some(std::time::Instant::now());
967 tracing::debug!("Set control return cooldown");
968 }
969 Message::LeaveAck => {
970 info!("Received LeaveAck");
971 // Transfer complete
972 }
973 Message::InputEvent(input_payload) => {
974 // Inject input via emulation module
975 if let Some(ref mut emu) = input_emulator {
976 use hyprkvm_common::protocol::InputEventType;
977 match input_payload.event {
978 InputEventType::KeyDown { keycode } => {
979 tracing::debug!("RECV KeyDown: keycode={} ({})",
980 keycode, keycode_to_name(keycode));
981 emu.keyboard.key(keycode, hyprkvm_common::KeyState::Pressed);
982 }
983 InputEventType::KeyUp { keycode } => {
984 tracing::debug!("RECV KeyUp: keycode={} ({})",
985 keycode, keycode_to_name(keycode));
986 emu.keyboard.key(keycode, hyprkvm_common::KeyState::Released);
987 }
988 InputEventType::PointerMotion { dx, dy } => {
989 emu.pointer.motion(dx, dy);
990 }
991 InputEventType::PointerButton { button, pressed } => {
992 let state = if pressed {
993 hyprkvm_common::ButtonState::Pressed
994 } else {
995 hyprkvm_common::ButtonState::Released
996 };
997 emu.pointer.button(button, state);
998 }
999 InputEventType::Scroll { horizontal, vertical } => {
1000 emu.pointer.scroll(horizontal, vertical);
1001 }
1002 InputEventType::ModifierState { .. } => {
1003 // Modifier state is informational
1004 }
1005 }
1006 }
1007 }
1008 Message::Ping { timestamp } => {
1009 let _ = peer.send(&Message::Pong { timestamp }).await;
1010 }
1011 Message::Pong { timestamp } => {
1012 tracing::trace!("Pong received, rtt={}ms",
1013 std::time::SystemTime::now()
1014 .duration_since(std::time::UNIX_EPOCH)
1015 .unwrap()
1016 .as_millis() as u64 - timestamp
1017 );
1018 }
1019 _ => {
1020 tracing::debug!("Unhandled message: {:?}", msg);
1021 }
1022 }
1023 }
1024 Ok(Ok(None)) => {
1025 // Connection closed
1026 info!("Peer {:?} disconnected", direction);
1027 peers.remove(&direction);
1028 }
1029 Ok(Err(e)) => {
1030 tracing::error!("Error receiving from {:?}: {}", direction, e);
1031 peers.remove(&direction);
1032 }
1033 Err(_) => {
1034 // Timeout - no message available, that's fine
1035 }
1036 }
1037 }
1038 }
1039 }
1040
1041 // Handle transfer events
1042 Some(event) = transfer_events.recv() => {
1043 match event {
1044 transfer::TransferEvent::SendMessage { direction, message } => {
1045 let mut peers = peers.write().await;
1046 if let Some(peer) = peers.get_mut(&direction) {
1047 info!("Sending {:?} to {:?}", message, direction);
1048 if let Err(e) = peer.send(&message).await {
1049 tracing::error!("Failed to send message to {:?}: {}", direction, e);
1050 // If send fails, abort the transfer
1051 transfer_manager.abort().await;
1052 }
1053 } else {
1054 tracing::warn!("No peer for direction {:?}, aborting transfer", direction);
1055 transfer_manager.abort().await;
1056 }
1057 }
1058 transfer::TransferEvent::StartCapture { direction: cap_dir } => {
1059 info!("Starting input capture for {:?}", cap_dir);
1060 capture_direction = Some(cap_dir);
1061
1062 // Send synthetic Super key-down as first event.
1063 // The transfer was likely initiated via Super+Arrow keybinding,
1064 // which means Super was already held when the grab started.
1065 // The evdev grabber won't see the initial Super key-down,
1066 // so we need to send it explicitly so the destination knows
1067 // Super is pressed for subsequent keybindings.
1068 {
1069 let mut peers_guard = peers.write().await;
1070 if let Some(peer) = peers_guard.get_mut(&cap_dir) {
1071 let super_down = input::GrabEvent::KeyDown { keycode: 125 }; // KEY_LEFTMETA
1072 let payload = super_down.to_protocol(input_sequence);
1073 input_sequence += 1;
1074 tracing::debug!("Sending synthetic Super key-down to destination");
1075 if let Err(e) = peer.send(&Message::InputEvent(payload)).await {
1076 tracing::error!("Failed to send synthetic Super: {}", e);
1077 }
1078 }
1079 }
1080
1081 input_grabber.start();
1082 }
1083 transfer::TransferEvent::StopCapture => {
1084 info!("Stopping input capture");
1085 let was_capturing_direction = capture_direction;
1086 capture_direction = None;
1087
1088 // Release the evdev grab and enter recovery mode for the stale direction
1089 // The stale key is the arrow key used to initiate the original outgoing transfer
1090 input_grabber.stop(was_capturing_direction);
1091
1092 // Drain any remaining events
1093 while input_grabber.try_recv().is_some() {}
1094
1095 // CRITICAL FIX: After releasing the evdev grab, libinput has stale state.
1096 // The arrow key that initiated the original transfer (before we went remote)
1097 // is still seen as "pressed" by libinput because it never saw the release.
1098 //
1099 // We use uinput to create a virtual keyboard and send synthetic key-up
1100 // events for ALL arrow keys. This gives libinput fresh key-up events,
1101 // which should clear the stale state.
1102 if let Some(dir) = was_capturing_direction {
1103 // The stale key is the one used to initiate the OUTGOING transfer
1104 let stale_keycode: u16 = match dir {
1105 Direction::Left => 105, // KEY_LEFT
1106 Direction::Right => 106, // KEY_RIGHT
1107 Direction::Up => 103, // KEY_UP
1108 Direction::Down => 108, // KEY_DOWN
1109 };
1110
1111 tracing::info!("Sending synthetic key-ups via uinput to clear stale libinput state");
1112
1113 // Send key-ups for all arrow keys to be safe
1114 let all_arrows: [u16; 4] = [103, 105, 106, 108];
1115 if let Err(e) = input::send_synthetic_key_ups(&all_arrows) {
1116 tracing::warn!("Failed to send synthetic key-ups: {}", e);
1117 }
1118
1119 // Also inject via virtual keyboard for Wayland-level cleanup
1120 if input_emulator.is_none() {
1121 if let Ok(emu) = input::InputEmulator::new() {
1122 input_emulator = Some(emu);
1123 }
1124 }
1125 if let Some(ref mut emu) = input_emulator {
1126 emu.keyboard.key(stale_keycode as u32, hyprkvm_common::KeyState::Released);
1127 emu.keyboard.reset_all_keys();
1128 }
1129 }
1130 }
1131 transfer::TransferEvent::StartInjection { from } => {
1132 info!("Starting input injection from {:?}", from);
1133 // Create input emulator if not exists
1134 if input_emulator.is_none() {
1135 match input::InputEmulator::new() {
1136 Ok(emu) => {
1137 info!("Input emulator created");
1138 input_emulator = Some(emu);
1139 }
1140 Err(e) => {
1141 tracing::error!("Failed to create input emulator: {}", e);
1142 }
1143 }
1144 }
1145 }
1146 transfer::TransferEvent::StopInjection => {
1147 info!("Stopping input injection");
1148 // Reset ALL pressed keys so next session starts clean
1149 // This prevents Hyprland from seeing stale key state
1150 // (e.g., arrow key that triggered return was never released)
1151 if let Some(ref mut emu) = input_emulator {
1152 emu.keyboard.reset_all_keys();
1153 }
1154 }
1155 }
1156 }
1157
1158 // Hyprland events
1159 event = event_stream.next_event() => {
1160 match event {
1161 Ok(evt) => {
1162 tracing::trace!("Hyprland event: {:?}", evt);
1163 }
1164 Err(e) => {
1165 tracing::error!("Event error: {e}");
1166 break;
1167 }
1168 }
1169 }
1170
1171 // Handle IPC requests from CLI
1172 Some((request, response_tx)) = ipc_rx.recv() => {
1173 use hyprkvm_common::protocol::{IpcRequest, IpcResponse};
1174
1175 let response = match request {
1176 IpcRequest::Move { direction } => {
1177 // Log current state for debugging
1178 let current_state = transfer_manager.state().await;
1179 info!("IPC Move {:?}: state={:?}", direction, current_state);
1180
1181 // For keyboard navigation, check if we're at the absolute edge:
1182 // 1. On edge monitor (no monitor in that direction)
1183 // 2. On edge window of that monitor (no window further in that direction)
1184
1185 let at_edge = 'edge_check: {
1186 // Get monitors and find focused one
1187 let monitors = match hypr_client.monitors().await {
1188 Ok(m) => m,
1189 Err(e) => {
1190 info!(" edge_check: monitors query failed: {}", e);
1191 break 'edge_check false;
1192 }
1193 };
1194 let focused_monitor = match monitors.iter().find(|m| m.focused) {
1195 Some(m) => m,
1196 None => {
1197 info!(" edge_check: no focused monitor found");
1198 break 'edge_check false;
1199 }
1200 };
1201
1202 // Check if there's another monitor in the requested direction
1203 let has_monitor_in_direction = monitors.iter().any(|m| {
1204 if m.id == focused_monitor.id { return false; }
1205 match direction {
1206 Direction::Left => m.x + m.width as i32 <= focused_monitor.x,
1207 Direction::Right => m.x >= focused_monitor.x + focused_monitor.width as i32,
1208 Direction::Up => m.y + m.height as i32 <= focused_monitor.y,
1209 Direction::Down => m.y >= focused_monitor.y + focused_monitor.height as i32,
1210 }
1211 });
1212
1213 if has_monitor_in_direction {
1214 // There's a monitor in that direction, not at edge
1215 info!(" edge_check: has monitor in direction {:?}", direction);
1216 break 'edge_check false;
1217 }
1218
1219 // We're on the edge monitor. Now check if we're on the edge window.
1220 // Get active window position
1221 let active_window: serde_json::Value = match hypr_client.query("activewindow").await {
1222 Ok(w) => w,
1223 Err(e) => {
1224 info!(" edge_check: activewindow query failed: {}", e);
1225 break 'edge_check false;
1226 }
1227 };
1228
1229 let win_x = active_window.get("at").and_then(|a| a.get(0)).and_then(|x| x.as_i64()).unwrap_or(0) as i32;
1230 let win_y = active_window.get("at").and_then(|a| a.get(1)).and_then(|y| y.as_i64()).unwrap_or(0) as i32;
1231 let win_w = active_window.get("size").and_then(|s| s.get(0)).and_then(|w| w.as_i64()).unwrap_or(100) as i32;
1232 let win_h = active_window.get("size").and_then(|s| s.get(1)).and_then(|h| h.as_i64()).unwrap_or(100) as i32;
1233
1234 // Get all clients (windows)
1235 let clients: Vec<serde_json::Value> = match hypr_client.query("clients").await {
1236 Ok(c) => c,
1237 Err(e) => {
1238 info!(" edge_check: clients query failed: {}", e);
1239 break 'edge_check false;
1240 }
1241 };
1242
1243 info!(" edge_check: active window at ({},{}) size {}x{}, {} clients on monitor",
1244 win_x, win_y, win_w, win_h,
1245 clients.iter().filter(|c| c.get("monitor").and_then(|m| m.as_i64()).unwrap_or(-1) as i32 == focused_monitor.id).count());
1246
1247 // Check if any window is further in the requested direction on same monitor
1248 let has_window_in_direction = clients.iter().any(|client| {
1249 let mon = client.get("monitor").and_then(|m| m.as_i64()).unwrap_or(-1) as i32;
1250 if mon != focused_monitor.id { return false; }
1251
1252 let cx = client.get("at").and_then(|a| a.get(0)).and_then(|x| x.as_i64()).unwrap_or(0) as i32;
1253 let cy = client.get("at").and_then(|a| a.get(1)).and_then(|y| y.as_i64()).unwrap_or(0) as i32;
1254 let cw = client.get("size").and_then(|s| s.get(0)).and_then(|w| w.as_i64()).unwrap_or(0) as i32;
1255 let ch = client.get("size").and_then(|s| s.get(1)).and_then(|h| h.as_i64()).unwrap_or(0) as i32;
1256
1257 match direction {
1258 Direction::Left => cx + cw < win_x + 10, // Window is to the left
1259 Direction::Right => cx > win_x + win_w - 10, // Window is to the right
1260 Direction::Up => cy + ch < win_y + 10,
1261 Direction::Down => cy > win_y + win_h - 10,
1262 }
1263 });
1264
1265 info!(" edge_check: has_window_in_direction={} -> at_edge={}", has_window_in_direction, !has_window_in_direction);
1266 !has_window_in_direction
1267 };
1268
1269 // Check if we have a peer in this direction
1270 let has_peer = {
1271 let peers = peers.read().await;
1272 peers.contains_key(&direction)
1273 };
1274
1275 // Get neighbor name if configured
1276 let neighbor_name = config.machines.neighbors
1277 .iter()
1278 .find(|n| n.direction == direction)
1279 .map(|n| n.name.clone());
1280
1281 info!("IPC Move {:?}: at_edge={}, has_peer={}, neighbor={:?}", direction, at_edge, has_peer, neighbor_name);
1282
1283 // At edge with peer: either return control or initiate transfer
1284 if at_edge && has_peer && neighbor_name.is_some() {
1285 // Check if we're in ReceivedControl state from this direction
1286 if let transfer::TransferState::ReceivedControl { from, .. } = current_state {
1287 if from == direction {
1288 // Return control to source machine
1289 tracing::info!("Keyboard return: at edge, returning control to {:?}", direction);
1290 if let Err(e) = transfer_manager.return_control().await {
1291 tracing::warn!("Failed to return control: {}", e);
1292 IpcResponse::Error { message: format!("Return failed: {}", e) }
1293 } else {
1294 IpcResponse::Transferred { to_machine: neighbor_name.unwrap() }
1295 }
1296 } else {
1297 // At edge with peer but received control from different direction
1298 // Initiate new transfer
1299 let cursor_pos = hypr_client.cursor_pos().await
1300 .map(|c| (c.x, c.y))
1301 .unwrap_or((0, 0));
1302
1303 if let Err(e) = transfer_manager.initiate_transfer(
1304 direction,
1305 cursor_pos,
1306 screen_height,
1307 screen_width,
1308 ).await {
1309 IpcResponse::Error { message: format!("Transfer failed: {}", e) }
1310 } else {
1311 IpcResponse::Transferred { to_machine: neighbor_name.unwrap() }
1312 }
1313 }
1314 } else {
1315 // Not in ReceivedControl - initiate new transfer
1316 let cursor_pos = hypr_client.cursor_pos().await
1317 .map(|c| (c.x, c.y))
1318 .unwrap_or((0, 0));
1319
1320 if let Err(e) = transfer_manager.initiate_transfer(
1321 direction,
1322 cursor_pos,
1323 screen_height,
1324 screen_width,
1325 ).await {
1326 IpcResponse::Error { message: format!("Transfer failed: {}", e) }
1327 } else {
1328 IpcResponse::Transferred { to_machine: neighbor_name.unwrap() }
1329 }
1330 }
1331 } else {
1332 // Either not at edge, or at edge but no peer - do local movefocus
1333 let hypr_dir = match direction {
1334 Direction::Left => "l",
1335 Direction::Right => "r",
1336 Direction::Up => "u",
1337 Direction::Down => "d",
1338 };
1339 info!("IPC Move {:?}: doing local movefocus {}", direction, hypr_dir);
1340 match hypr_client.dispatch("movefocus", hypr_dir).await {
1341 Ok(()) => info!(" movefocus succeeded"),
1342 Err(e) => tracing::error!(" movefocus failed: {}", e),
1343 }
1344 IpcResponse::DoLocalMove
1345 }
1346 }
1347 IpcRequest::Status => {
1348 let state = format!("{:?}", transfer_manager.state().await);
1349 let connected_peers: Vec<String> = {
1350 let peers = peers.read().await;
1351 config.machines.neighbors
1352 .iter()
1353 .filter(|n| peers.contains_key(&n.direction))
1354 .map(|n| n.name.clone())
1355 .collect()
1356 };
1357 let uptime_secs = daemon_start_time.elapsed().as_secs();
1358 IpcResponse::Status {
1359 state,
1360 connected_peers,
1361 uptime_secs,
1362 machine_name: config.machines.self_name.clone(),
1363 }
1364 }
1365 IpcRequest::ListPeers => {
1366 let peers_guard = peers.read().await;
1367 let peer_list: Vec<hyprkvm_common::protocol::PeerInfo> = config.machines.neighbors
1368 .iter()
1369 .map(|n| {
1370 let connected = peers_guard.contains_key(&n.direction);
1371 let status = if connected {
1372 "connected".to_string()
1373 } else {
1374 "disconnected".to_string()
1375 };
1376 hyprkvm_common::protocol::PeerInfo {
1377 name: n.name.clone(),
1378 direction: n.direction,
1379 connected,
1380 address: n.address.to_string(),
1381 status,
1382 }
1383 })
1384 .collect();
1385 IpcResponse::Peers { peers: peer_list }
1386 }
1387 IpcRequest::PingPeer { peer_name } => {
1388 // Find the peer by name
1389 let neighbor = config.machines.neighbors
1390 .iter()
1391 .find(|n| n.name == peer_name);
1392
1393 match neighbor {
1394 Some(n) => {
1395 let direction = n.direction;
1396 let mut peers_guard = peers.write().await;
1397
1398 if let Some(peer_conn) = peers_guard.get_mut(&direction) {
1399 // Send Ping with current timestamp
1400 let timestamp = std::time::SystemTime::now()
1401 .duration_since(std::time::UNIX_EPOCH)
1402 .unwrap()
1403 .as_millis() as u64;
1404
1405 if let Err(e) = peer_conn.send(&Message::Ping { timestamp }).await {
1406 IpcResponse::PingResult {
1407 peer_name,
1408 latency_ms: None,
1409 error: Some(format!("Send failed: {}", e)),
1410 }
1411 } else {
1412 // Wait for Pong with timeout
1413 match tokio::time::timeout(
1414 std::time::Duration::from_secs(5),
1415 peer_conn.recv()
1416 ).await {
1417 Ok(Ok(Some(Message::Pong { timestamp: pong_ts }))) => {
1418 let now = std::time::SystemTime::now()
1419 .duration_since(std::time::UNIX_EPOCH)
1420 .unwrap()
1421 .as_millis() as u64;
1422 let latency = now.saturating_sub(pong_ts);
1423 IpcResponse::PingResult {
1424 peer_name,
1425 latency_ms: Some(latency),
1426 error: None,
1427 }
1428 }
1429 Ok(Ok(Some(_))) => {
1430 IpcResponse::PingResult {
1431 peer_name,
1432 latency_ms: None,
1433 error: Some("Unexpected response".to_string()),
1434 }
1435 }
1436 Ok(Ok(None)) => {
1437 // Connection closed
1438 peers_guard.remove(&direction);
1439 IpcResponse::PingResult {
1440 peer_name,
1441 latency_ms: None,
1442 error: Some("Connection closed".to_string()),
1443 }
1444 }
1445 Ok(Err(e)) => {
1446 IpcResponse::PingResult {
1447 peer_name,
1448 latency_ms: None,
1449 error: Some(format!("Receive error: {}", e)),
1450 }
1451 }
1452 Err(_) => {
1453 IpcResponse::PingResult {
1454 peer_name,
1455 latency_ms: None,
1456 error: Some("Timeout".to_string()),
1457 }
1458 }
1459 }
1460 }
1461 } else {
1462 IpcResponse::PingResult {
1463 peer_name,
1464 latency_ms: None,
1465 error: Some("Peer not connected".to_string()),
1466 }
1467 }
1468 }
1469 None => {
1470 IpcResponse::Error {
1471 message: format!("Unknown peer: {}", peer_name),
1472 }
1473 }
1474 }
1475 }
1476
1477 // ================================================================
1478 // CLI Expansion: Control Transfer
1479 // ================================================================
1480
1481 IpcRequest::Switch { target } => {
1482 use hyprkvm_common::protocol::SwitchTarget;
1483
1484 // Resolve target to a direction
1485 let direction = match &target {
1486 SwitchTarget::Direction(dir) => Some(*dir),
1487 SwitchTarget::MachineName(name) => {
1488 config.machines.neighbors
1489 .iter()
1490 .find(|n| &n.name == name)
1491 .map(|n| n.direction)
1492 }
1493 };
1494
1495 match direction {
1496 Some(dir) => {
1497 let peers_guard = peers.read().await;
1498 if peers_guard.get(&dir).is_some() {
1499 drop(peers_guard);
1500
1501 // Get cursor position and screen size from Hyprland
1502 let (cursor_pos, screen_width, screen_height) = match hypr_client.monitors().await {
1503 Ok(monitors) => {
1504 if let Some(focused) = monitors.iter().find(|m| m.focused) {
1505 // Use center of screen as cursor position for switch
1506 let cx = focused.x + focused.width as i32 / 2;
1507 let cy = focused.y + focused.height as i32 / 2;
1508 ((cx, cy), focused.width, focused.height)
1509 } else {
1510 ((0, 0), 1920, 1080) // Fallback
1511 }
1512 }
1513 Err(_) => ((0, 0), 1920, 1080), // Fallback
1514 };
1515
1516 // Initiate transfer
1517 match transfer_manager.initiate_transfer(dir, cursor_pos, screen_height, screen_width).await {
1518 Ok(()) => {
1519 let machine_name = config.machines.neighbors
1520 .iter()
1521 .find(|n| n.direction == dir)
1522 .map(|n| n.name.clone())
1523 .unwrap_or_else(|| format!("{:?}", dir));
1524 IpcResponse::Transferred { to_machine: machine_name }
1525 }
1526 Err(e) => IpcResponse::Error {
1527 message: format!("Transfer failed: {}", e),
1528 }
1529 }
1530 } else {
1531 IpcResponse::Error {
1532 message: format!("Peer not connected in direction {:?}", dir),
1533 }
1534 }
1535 }
1536 None => {
1537 let name = match target {
1538 SwitchTarget::MachineName(n) => n,
1539 _ => "unknown".to_string(),
1540 };
1541 IpcResponse::Error {
1542 message: format!("Unknown machine: {}", name),
1543 }
1544 }
1545 }
1546 }
1547
1548 IpcRequest::Return => {
1549 match transfer_manager.return_control().await {
1550 Ok(()) => IpcResponse::Ok {
1551 message: "Control returned".to_string(),
1552 },
1553 Err(e) => IpcResponse::Error {
1554 message: format!("Return failed: {}", e),
1555 }
1556 }
1557 }
1558
1559 // ================================================================
1560 // CLI Expansion: Input Management
1561 // ================================================================
1562
1563 IpcRequest::Release => {
1564 // Stop input grabbing
1565 input_grabber.stop(None);
1566 // Abort any pending transfer
1567 transfer_manager.abort().await;
1568 IpcResponse::Ok {
1569 message: "Input released".to_string(),
1570 }
1571 }
1572
1573 IpcRequest::SetBarrier { enabled } => {
1574 barrier_enabled.store(enabled, std::sync::atomic::Ordering::SeqCst);
1575 let status = if enabled { "enabled" } else { "disabled" };
1576 IpcResponse::Ok {
1577 message: format!("Barrier {}", status),
1578 }
1579 }
1580
1581 // ================================================================
1582 // CLI Expansion: Connection Management
1583 // ================================================================
1584
1585 IpcRequest::Disconnect { peer_name } => {
1586 let neighbor = config.machines.neighbors
1587 .iter()
1588 .find(|n| n.name == peer_name);
1589
1590 match neighbor {
1591 Some(n) => {
1592 let direction = n.direction;
1593 let mut peers_guard = peers.write().await;
1594 if let Some(mut peer_conn) = peers_guard.remove(&direction) {
1595 // Send goodbye before disconnecting
1596 let _ = peer_conn.send(&Message::Goodbye).await;
1597 IpcResponse::Ok {
1598 message: format!("Disconnected from {}", peer_name),
1599 }
1600 } else {
1601 IpcResponse::Error {
1602 message: format!("Peer {} not connected", peer_name),
1603 }
1604 }
1605 }
1606 None => IpcResponse::Error {
1607 message: format!("Unknown peer: {}", peer_name),
1608 }
1609 }
1610 }
1611
1612 IpcRequest::Reconnect { peer_name } => {
1613 let neighbor = config.machines.neighbors
1614 .iter()
1615 .find(|n| n.name == peer_name)
1616 .cloned();
1617
1618 match neighbor {
1619 Some(n) => {
1620 let direction = n.direction;
1621 let addr = n.address;
1622 // Remove existing connection if any
1623 {
1624 let mut peers_guard = peers.write().await;
1625 if let Some(mut peer_conn) = peers_guard.remove(&direction) {
1626 let _ = peer_conn.send(&Message::Goodbye).await;
1627 }
1628 }
1629 // Spawn reconnection task (same logic as initial connection)
1630 let peers_clone = peers.clone();
1631 let machine_name = config.machines.self_name.clone();
1632 let neighbor_name = n.name.clone();
1633 tokio::spawn(async move {
1634 match network::connect(addr).await {
1635 Ok(mut conn) => {
1636 // Send Hello
1637 let hello = Message::Hello(HelloPayload {
1638 protocol_version: PROTOCOL_VERSION,
1639 machine_name,
1640 capabilities: vec![],
1641 });
1642 if let Err(e) = conn.send(&hello).await {
1643 tracing::error!("Reconnect: failed to send Hello: {}", e);
1644 return;
1645 }
1646 // Wait for HelloAck
1647 match conn.recv().await {
1648 Ok(Some(Message::HelloAck(ack))) if ack.accepted => {
1649 let mut peers_guard = peers_clone.write().await;
1650 peers_guard.insert(direction, conn);
1651 info!("Reconnected to {}", neighbor_name);
1652 }
1653 Ok(Some(Message::HelloAck(ack))) => {
1654 tracing::error!("Reconnect rejected: {:?}", ack.error);
1655 }
1656 _ => {
1657 tracing::error!("Reconnect: handshake failed");
1658 }
1659 }
1660 }
1661 Err(e) => {
1662 tracing::error!("Reconnect: connection failed: {}", e);
1663 }
1664 }
1665 });
1666 IpcResponse::Ok {
1667 message: format!("Reconnecting to {}", peer_name),
1668 }
1669 }
1670 None => IpcResponse::Error {
1671 message: format!("Unknown peer: {}", peer_name),
1672 }
1673 }
1674 }
1675
1676 // ================================================================
1677 // CLI Expansion: Configuration
1678 // ================================================================
1679
1680 IpcRequest::GetConfig => {
1681 match toml::to_string_pretty(&config) {
1682 Ok(toml_str) => IpcResponse::Config { toml: toml_str },
1683 Err(e) => IpcResponse::Error {
1684 message: format!("Failed to serialize config: {}", e),
1685 }
1686 }
1687 }
1688
1689 IpcRequest::Reload => {
1690 // TODO: Implement config hot-reload
1691 IpcResponse::Error {
1692 message: "Config reload not yet implemented".to_string(),
1693 }
1694 }
1695
1696 // ================================================================
1697 // CLI Expansion: Daemon Control
1698 // ================================================================
1699
1700 IpcRequest::Shutdown => {
1701 info!("Shutdown requested via IPC");
1702 shutdown_requested.store(true, std::sync::atomic::Ordering::SeqCst);
1703 IpcResponse::Ok {
1704 message: "Shutting down...".to_string(),
1705 }
1706 }
1707
1708 IpcRequest::GetLogs { lines, follow: _ } => {
1709 // Read from log file
1710 let log_path = dirs::data_local_dir()
1711 .unwrap_or_else(|| std::path::PathBuf::from("/tmp"))
1712 .join("hyprkvm")
1713 .join("daemon.log");
1714
1715 if log_path.exists() {
1716 match std::fs::read_to_string(&log_path) {
1717 Ok(content) => {
1718 let n = lines.unwrap_or(50) as usize;
1719 let log_lines: Vec<String> = content
1720 .lines()
1721 .rev()
1722 .take(n)
1723 .map(|s| s.to_string())
1724 .collect::<Vec<_>>()
1725 .into_iter()
1726 .rev()
1727 .collect();
1728 IpcResponse::Logs { lines: log_lines }
1729 }
1730 Err(e) => IpcResponse::Error {
1731 message: format!("Failed to read log file: {}", e),
1732 }
1733 }
1734 } else {
1735 IpcResponse::Logs {
1736 lines: vec!["Log file not found. File logging may not be configured.".to_string()],
1737 }
1738 }
1739 }
1740 };
1741
1742 let _ = response_tx.send(response);
1743 }
1744
1745 // Shutdown (Ctrl+C or IPC request)
1746 _ = tokio::signal::ctrl_c() => {
1747 info!("Shutting down (Ctrl+C)...");
1748 accept_handle.abort();
1749 break;
1750 }
1751
1752 // Check for IPC shutdown request
1753 _ = async {
1754 while !shutdown_requested.load(std::sync::atomic::Ordering::SeqCst) {
1755 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1756 }
1757 } => {
1758 info!("Shutting down (IPC request)...");
1759 accept_handle.abort();
1760 break;
1761 }
1762 }
1763 }
1764
1765 Ok(())
1766 }
1767
1768 async fn show_status() -> anyhow::Result<()> {
1769 // TODO: Connect to running daemon via IPC and get status
1770 println!("HyprKVM Status");
1771 println!("==============");
1772 println!("Daemon: not implemented yet");
1773 Ok(())
1774 }
1775
1776 async fn handle_move(direction: &str) -> anyhow::Result<()> {
1777 use hyprkvm_common::Direction;
1778 use hyprkvm_common::protocol::{IpcRequest, IpcResponse};
1779
1780 let dir: Direction = direction.parse()?;
1781
1782 // Try to connect to daemon
1783 match ipc::IpcClient::connect().await {
1784 Ok(mut client) => {
1785 // Ask daemon to handle the move (it does movefocus internally)
1786 let request = IpcRequest::Move { direction: dir };
1787 match client.request(&request).await {
1788 Ok(IpcResponse::Transferred { to_machine }) => {
1789 tracing::info!("Transferred control to {}", to_machine);
1790 }
1791 Ok(IpcResponse::DoLocalMove) => {
1792 // Daemon handled it
1793 }
1794 Ok(IpcResponse::Error { message }) => {
1795 tracing::warn!("Daemon error: {}", message);
1796 }
1797 Ok(_) => {
1798 tracing::warn!("Unexpected response from daemon");
1799 }
1800 Err(e) => {
1801 tracing::debug!("IPC request failed: {}, falling back to local", e);
1802 do_local_move(dir).await?;
1803 }
1804 }
1805 }
1806 Err(e) => {
1807 tracing::debug!("Daemon not running ({}), doing local move", e);
1808 do_local_move(dir).await?;
1809 }
1810 }
1811
1812 Ok(())
1813 }
1814
1815 async fn do_local_move(dir: hyprkvm_common::Direction) -> anyhow::Result<()> {
1816 use hyprkvm_common::Direction;
1817
1818 let hypr_dir = match dir {
1819 Direction::Left => "l",
1820 Direction::Right => "r",
1821 Direction::Up => "u",
1822 Direction::Down => "d",
1823 };
1824
1825 let output = tokio::process::Command::new("hyprctl")
1826 .args(["dispatch", "movefocus", hypr_dir])
1827 .output()
1828 .await?;
1829
1830 if !output.status.success() {
1831 let stderr = String::from_utf8_lossy(&output.stderr);
1832 tracing::error!("hyprctl failed: {}", stderr);
1833 }
1834
1835 Ok(())
1836 }
1837
1838 fn show_config(config_path: &std::path::Path) -> anyhow::Result<()> {
1839 if config_path.exists() {
1840 let content = std::fs::read_to_string(config_path)?;
1841 println!("{}", content);
1842 } else {
1843 println!("No config file at {:?}", config_path);
1844 println!("\nDefault configuration:");
1845 let default = Config::default();
1846 println!("{}", toml::to_string_pretty(&default)?);
1847 }
1848 Ok(())
1849 }
1850
1851 async fn reload_config() -> anyhow::Result<()> {
1852 // TODO: Send reload signal to daemon
1853 println!("Config reload not implemented yet");
1854 Ok(())
1855 }
1856