Rust · 5561 bytes Raw Blame History
1 //! LSP server process management
2 //!
3 //! Handles spawning and communicating with language server processes.
4 //!
5 //! Note: Some process methods are for planned features.
6 #![allow(dead_code)]
7
8 use anyhow::{anyhow, Result};
9 use std::io::{Read, Write};
10 use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
11 use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
12 use std::thread;
13
14 /// A running language server process
15 pub struct ServerProcess {
16 child: Child,
17 stdin: ChildStdin,
18 message_rx: Receiver<String>,
19 /// Buffer for incomplete messages
20 read_buffer: String,
21 }
22
23 impl ServerProcess {
24 /// Spawn a new language server process
25 pub fn spawn(command: &[String]) -> Result<Self> {
26 if command.is_empty() {
27 return Err(anyhow!("Empty command"));
28 }
29
30 let mut cmd = Command::new(&command[0]);
31 if command.len() > 1 {
32 cmd.args(&command[1..]);
33 }
34
35 let mut child = cmd
36 .stdin(Stdio::piped())
37 .stdout(Stdio::piped())
38 .stderr(Stdio::piped())
39 .spawn()
40 .map_err(|e| anyhow!("Failed to spawn LSP server '{}': {}", command[0], e))?;
41
42 let stdin = child.stdin.take().ok_or_else(|| anyhow!("No stdin"))?;
43 let stdout = child.stdout.take().ok_or_else(|| anyhow!("No stdout"))?;
44
45 // Spawn a thread to read from stdout asynchronously
46 let (tx, rx) = mpsc::channel();
47 spawn_reader_thread(stdout, tx);
48
49 Ok(Self {
50 child,
51 stdin,
52 message_rx: rx,
53 read_buffer: String::new(),
54 })
55 }
56
57 /// Send a message to the server
58 pub fn send(&mut self, message: &str) -> Result<()> {
59 self.stdin.write_all(message.as_bytes())?;
60 self.stdin.flush()?;
61 Ok(())
62 }
63
64 /// Try to receive a complete message from the server (non-blocking)
65 pub fn try_recv(&mut self) -> Option<String> {
66 // Drain all available data from the channel into our buffer
67 loop {
68 match self.message_rx.try_recv() {
69 Ok(data) => self.read_buffer.push_str(&data),
70 Err(TryRecvError::Empty) => break,
71 Err(TryRecvError::Disconnected) => break,
72 }
73 }
74
75 // Try to parse a complete message from the buffer
76 self.parse_message()
77 }
78
79 /// Block until a message is received (with timeout in ms)
80 pub fn recv_timeout(&mut self, timeout_ms: u64) -> Option<String> {
81 use std::time::{Duration, Instant};
82 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
83
84 loop {
85 // First check if we have a complete message buffered
86 if let Some(msg) = self.parse_message() {
87 return Some(msg);
88 }
89
90 // Wait for more data
91 let remaining = deadline.saturating_duration_since(Instant::now());
92 if remaining.is_zero() {
93 return None;
94 }
95
96 match self.message_rx.recv_timeout(remaining) {
97 Ok(data) => self.read_buffer.push_str(&data),
98 Err(_) => return None,
99 }
100 }
101 }
102
103 /// Parse a complete LSP message from the buffer
104 fn parse_message(&mut self) -> Option<String> {
105 // Look for Content-Length header
106 let header_end = self.read_buffer.find("\r\n\r\n")?;
107 let header = &self.read_buffer[..header_end];
108
109 // Parse Content-Length
110 let content_length: usize = header
111 .lines()
112 .find(|line| line.to_lowercase().starts_with("content-length:"))
113 .and_then(|line| line.split(':').nth(1))
114 .and_then(|len| len.trim().parse().ok())?;
115
116 // Check if we have the full message
117 let message_start = header_end + 4;
118 let message_end = message_start + content_length;
119
120 if self.read_buffer.len() < message_end {
121 return None;
122 }
123
124 // Extract the message
125 let message = self.read_buffer[message_start..message_end].to_string();
126
127 // Remove from buffer
128 self.read_buffer = self.read_buffer[message_end..].to_string();
129
130 Some(message)
131 }
132
133 /// Check if the process is still running
134 pub fn is_running(&mut self) -> bool {
135 match self.child.try_wait() {
136 Ok(Some(_)) => false, // Process has exited
137 Ok(None) => true, // Still running
138 Err(_) => false, // Error checking status
139 }
140 }
141
142 /// Kill the server process
143 pub fn kill(&mut self) -> Result<()> {
144 let _ = self.child.kill();
145 Ok(())
146 }
147
148 /// Get the process ID
149 pub fn pid(&self) -> u32 {
150 self.child.id()
151 }
152 }
153
154 impl Drop for ServerProcess {
155 fn drop(&mut self) {
156 let _ = self.kill();
157 }
158 }
159
160 /// Spawn a thread to read from the server's stdout
161 fn spawn_reader_thread(mut stdout: ChildStdout, tx: Sender<String>) {
162 use std::io::ErrorKind;
163
164 thread::spawn(move || {
165 let mut buffer = [0u8; 8192];
166 loop {
167 match stdout.read(&mut buffer) {
168 Ok(0) => break,
169 Ok(n) => {
170 if let Ok(s) = std::str::from_utf8(&buffer[..n]) {
171 if tx.send(s.to_string()).is_err() {
172 break;
173 }
174 }
175 }
176 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
177 Err(_) => break,
178 }
179 }
180 });
181 }
182