gardesk/garwarp / cdd3515

Browse files

add control protocol

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
cdd3515b9af65131639458b2c4af29dde693df63
Parents
52ba8f5
Tree
0194e73

2 changed files

StatusFile+-
M garwarp-ipc/src/lib.rs 200 3
M garwarpctl/src/main.rs 124 2
garwarp-ipc/src/lib.rsmodified
@@ -1,4 +1,8 @@
1
+use std::fmt;
2
+
13
 pub const PROTOCOL_VERSION: u16 = 1;
4
+pub const DEFAULT_RUNTIME_SUBDIR: &str = "garwarp";
5
+pub const DEFAULT_CONTROL_SOCKET: &str = "control.sock";
26
 
37
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
48
 pub enum HealthStatus {
@@ -8,12 +12,54 @@ pub enum HealthStatus {
812
     Stopping,
913
 }
1014
 
11
-#[derive(Debug, Clone)]
15
+impl HealthStatus {
16
+    #[must_use]
17
+    pub fn as_str(self) -> &'static str {
18
+        match self {
19
+            Self::Starting => "starting",
20
+            Self::Healthy => "healthy",
21
+            Self::Degraded => "degraded",
22
+            Self::Stopping => "stopping",
23
+        }
24
+    }
25
+
26
+    fn parse(input: &str) -> Option<Self> {
27
+        match input {
28
+            "starting" => Some(Self::Starting),
29
+            "healthy" => Some(Self::Healthy),
30
+            "degraded" => Some(Self::Degraded),
31
+            "stopping" => Some(Self::Stopping),
32
+            _ => None,
33
+        }
34
+    }
35
+}
36
+
37
+#[derive(Debug, Clone, PartialEq, Eq)]
1238
 pub enum ControlRequest {
1339
     Status,
40
+    Stop,
1441
 }
1542
 
16
-#[derive(Debug, Clone)]
43
+impl ControlRequest {
44
+    #[must_use]
45
+    pub fn as_line(&self) -> &'static str {
46
+        match self {
47
+            Self::Status => "status",
48
+            Self::Stop => "stop",
49
+        }
50
+    }
51
+
52
+    #[must_use]
53
+    pub fn parse_line(input: &str) -> Option<Self> {
54
+        match input.trim() {
55
+            "status" => Some(Self::Status),
56
+            "stop" => Some(Self::Stop),
57
+            _ => None,
58
+        }
59
+    }
60
+}
61
+
62
+#[derive(Debug, Clone, PartialEq, Eq)]
1763
 pub struct StatusResponse {
1864
     pub protocol_version: u16,
1965
     pub health: HealthStatus,
@@ -31,9 +77,154 @@ impl StatusResponse {
3177
     }
3278
 }
3379
 
80
+#[derive(Debug, Clone, PartialEq, Eq)]
81
+pub enum ControlResponse {
82
+    Status(StatusResponse),
83
+    AckStopping,
84
+    Error { reason: String },
85
+}
86
+
87
+impl ControlResponse {
88
+    #[must_use]
89
+    pub fn to_line(&self) -> String {
90
+        match self {
91
+            Self::Status(status) => format!(
92
+                "status protocol={} health={} in_flight={}\n",
93
+                status.protocol_version,
94
+                status.health.as_str(),
95
+                status.in_flight_requests
96
+            ),
97
+            Self::AckStopping => "ack stopping\n".to_string(),
98
+            Self::Error { reason } => format!("error reason={}\n", reason),
99
+        }
100
+    }
101
+
102
+    pub fn parse_line(input: &str) -> Result<Self, ParseError> {
103
+        let trimmed = input.trim();
104
+        let mut parts = trimmed.split_whitespace();
105
+
106
+        match parts.next() {
107
+            Some("status") => {
108
+                let mut protocol_version = None;
109
+                let mut health = None;
110
+                let mut in_flight_requests = None;
111
+
112
+                for part in parts {
113
+                    let (key, value) = part
114
+                        .split_once('=')
115
+                        .ok_or(ParseError::InvalidField(part.to_string()))?;
116
+                    match key {
117
+                        "protocol" => {
118
+                            protocol_version = Some(
119
+                                value
120
+                                    .parse::<u16>()
121
+                                    .map_err(|_| ParseError::InvalidField(part.to_string()))?,
122
+                            );
123
+                        }
124
+                        "health" => {
125
+                            health = HealthStatus::parse(value);
126
+                            if health.is_none() {
127
+                                return Err(ParseError::InvalidField(part.to_string()));
128
+                            }
129
+                        }
130
+                        "in_flight" => {
131
+                            in_flight_requests = Some(
132
+                                value
133
+                                    .parse::<usize>()
134
+                                    .map_err(|_| ParseError::InvalidField(part.to_string()))?,
135
+                            );
136
+                        }
137
+                        _ => return Err(ParseError::InvalidField(part.to_string())),
138
+                    }
139
+                }
140
+
141
+                let status = StatusResponse {
142
+                    protocol_version: protocol_version
143
+                        .ok_or(ParseError::MissingField("protocol"))?,
144
+                    health: health.ok_or(ParseError::MissingField("health"))?,
145
+                    in_flight_requests: in_flight_requests
146
+                        .ok_or(ParseError::MissingField("in_flight"))?,
147
+                };
148
+                Ok(Self::Status(status))
149
+            }
150
+            Some("ack") => match parts.next() {
151
+                Some("stopping") => Ok(Self::AckStopping),
152
+                Some(other) => Err(ParseError::UnknownToken(other.to_string())),
153
+                None => Err(ParseError::MissingField("ack")),
154
+            },
155
+            Some("error") => match parts.next() {
156
+                Some(reason_field) => {
157
+                    let (key, value) = reason_field
158
+                        .split_once('=')
159
+                        .ok_or(ParseError::InvalidField(reason_field.to_string()))?;
160
+                    if key != "reason" {
161
+                        return Err(ParseError::InvalidField(reason_field.to_string()));
162
+                    }
163
+                    Ok(Self::Error {
164
+                        reason: value.to_string(),
165
+                    })
166
+                }
167
+                None => Err(ParseError::MissingField("reason")),
168
+            },
169
+            Some(other) => Err(ParseError::UnknownToken(other.to_string())),
170
+            None => Err(ParseError::Empty),
171
+        }
172
+    }
173
+}
174
+
175
+#[derive(Debug, Clone, PartialEq, Eq)]
176
+pub enum ParseError {
177
+    Empty,
178
+    MissingField(&'static str),
179
+    InvalidField(String),
180
+    UnknownToken(String),
181
+}
182
+
183
+impl fmt::Display for ParseError {
184
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185
+        match self {
186
+            Self::Empty => write!(f, "empty input"),
187
+            Self::MissingField(field) => write!(f, "missing field: {field}"),
188
+            Self::InvalidField(field) => write!(f, "invalid field: {field}"),
189
+            Self::UnknownToken(token) => write!(f, "unknown token: {token}"),
190
+        }
191
+    }
192
+}
193
+
194
+impl std::error::Error for ParseError {}
195
+
34196
 #[cfg(test)]
35197
 mod tests {
36
-    use super::{HealthStatus, PROTOCOL_VERSION, StatusResponse};
198
+    use super::{ControlRequest, ControlResponse, HealthStatus, PROTOCOL_VERSION, StatusResponse};
199
+
200
+    #[test]
201
+    fn request_parse_roundtrip() {
202
+        for request in [ControlRequest::Status, ControlRequest::Stop] {
203
+            let line = request.as_line();
204
+            let parsed = ControlRequest::parse_line(line);
205
+            assert_eq!(parsed, Some(request));
206
+        }
207
+    }
208
+
209
+    #[test]
210
+    fn response_status_roundtrip() {
211
+        let response = ControlResponse::Status(StatusResponse {
212
+            protocol_version: PROTOCOL_VERSION,
213
+            health: HealthStatus::Healthy,
214
+            in_flight_requests: 7,
215
+        });
216
+        let line = response.to_line();
217
+        let parsed = ControlResponse::parse_line(&line).expect("response should parse");
218
+        assert_eq!(parsed, response);
219
+    }
220
+
221
+    #[test]
222
+    fn response_ack_roundtrip() {
223
+        let response = ControlResponse::AckStopping;
224
+        let line = response.to_line();
225
+        let parsed = ControlResponse::parse_line(&line).expect("response should parse");
226
+        assert_eq!(parsed, response);
227
+    }
37228
 
38229
     #[test]
39230
     fn healthy_response_uses_protocol_version() {
@@ -42,4 +233,10 @@ mod tests {
42233
         assert_eq!(response.health, HealthStatus::Healthy);
43234
         assert_eq!(response.in_flight_requests, 0);
44235
     }
236
+
237
+    #[test]
238
+    fn malformed_status_is_rejected() {
239
+        let parsed = ControlResponse::parse_line("status protocol=one health=healthy in_flight=0");
240
+        assert!(parsed.is_err());
241
+    }
45242
 }
garwarpctl/src/main.rsmodified
@@ -1,5 +1,127 @@
1
-use garwarp_ipc::PROTOCOL_VERSION;
1
+use std::env;
2
+use std::io::{self, BufRead, BufReader, Write};
3
+use std::os::unix::net::UnixStream;
4
+use std::path::PathBuf;
5
+
6
+use garwarp_ipc::{
7
+    ControlRequest, ControlResponse, DEFAULT_CONTROL_SOCKET, DEFAULT_RUNTIME_SUBDIR,
8
+    PROTOCOL_VERSION,
9
+};
210
 
311
 fn main() {
4
-    println!("garwarpctl protocol v{PROTOCOL_VERSION}");
12
+    let command = parse_command(env::args().nth(1).as_deref());
13
+    if let Err(error) = run(command) {
14
+        eprintln!("garwarpctl error: {error}");
15
+        std::process::exit(1);
16
+    }
17
+}
18
+
19
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20
+enum Command {
21
+    Status,
22
+    Stop,
23
+    Version,
24
+    Help,
25
+}
26
+
27
+fn parse_command(input: Option<&str>) -> Command {
28
+    match input {
29
+        Some("status") | None => Command::Status,
30
+        Some("stop") => Command::Stop,
31
+        Some("version") | Some("--version") | Some("-V") => Command::Version,
32
+        Some("help") | Some("--help") | Some("-h") => Command::Help,
33
+        Some(_) => Command::Help,
34
+    }
35
+}
36
+
37
+fn run(command: Command) -> io::Result<()> {
38
+    match command {
39
+        Command::Status => {
40
+            let response = send_request(ControlRequest::Status)?;
41
+            match response {
42
+                ControlResponse::Status(status) => {
43
+                    println!("protocol={}", status.protocol_version);
44
+                    println!("health={}", status.health.as_str());
45
+                    println!("in_flight={}", status.in_flight_requests);
46
+                    Ok(())
47
+                }
48
+                ControlResponse::Error { reason } => {
49
+                    Err(io::Error::other(format!("daemon error: {reason}")))
50
+                }
51
+                other => Err(io::Error::new(
52
+                    io::ErrorKind::InvalidData,
53
+                    format!("unexpected response: {other:?}"),
54
+                )),
55
+            }
56
+        }
57
+        Command::Stop => {
58
+            let response = send_request(ControlRequest::Stop)?;
59
+            match response {
60
+                ControlResponse::AckStopping => {
61
+                    println!("stopping");
62
+                    Ok(())
63
+                }
64
+                ControlResponse::Error { reason } => {
65
+                    Err(io::Error::other(format!("daemon error: {reason}")))
66
+                }
67
+                other => Err(io::Error::new(
68
+                    io::ErrorKind::InvalidData,
69
+                    format!("unexpected response: {other:?}"),
70
+                )),
71
+            }
72
+        }
73
+        Command::Version => {
74
+            println!("garwarpctl protocol v{PROTOCOL_VERSION}");
75
+            Ok(())
76
+        }
77
+        Command::Help => {
78
+            print_help();
79
+            Ok(())
80
+        }
81
+    }
82
+}
83
+
84
+fn send_request(request: ControlRequest) -> io::Result<ControlResponse> {
85
+    let socket_path = control_socket_path();
86
+    let mut stream = UnixStream::connect(&socket_path)?;
87
+    stream.write_all(request.as_line().as_bytes())?;
88
+    stream.write_all(b"\n")?;
89
+    stream.flush()?;
90
+
91
+    let mut reader = BufReader::new(stream);
92
+    let mut line = String::new();
93
+    reader.read_line(&mut line)?;
94
+    ControlResponse::parse_line(&line)
95
+        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
96
+}
97
+
98
+fn control_socket_path() -> PathBuf {
99
+    runtime_dir().join(DEFAULT_CONTROL_SOCKET)
100
+}
101
+
102
+fn runtime_dir() -> PathBuf {
103
+    let base = env::var_os("XDG_RUNTIME_DIR")
104
+        .map(PathBuf::from)
105
+        .unwrap_or_else(env::temp_dir);
106
+    base.join(DEFAULT_RUNTIME_SUBDIR)
107
+}
108
+
109
+fn print_help() {
110
+    println!("garwarpctl <command>");
111
+    println!("commands: status (default), stop, version, help");
112
+}
113
+
114
+#[cfg(test)]
115
+mod tests {
116
+    use super::{Command, parse_command};
117
+
118
+    #[test]
119
+    fn status_is_default_command() {
120
+        assert_eq!(parse_command(None), Command::Status);
121
+    }
122
+
123
+    #[test]
124
+    fn help_for_unknown_command() {
125
+        assert_eq!(parse_command(Some("bogus")), Command::Help);
126
+    }
5127
 }