Rust · 12382 bytes Raw Blame History
1 use anyhow::{Result, Context};
2 use std::time::Duration;
3 use tonic::transport::{Channel, Endpoint};
4 use tonic::{Request, Response, Status};
5 use tracing::{debug, warn};
6
7 use super::types::*;
8
9 /// Generated gRPC client code
10 pub mod coordinator_service {
11 tonic::include_proto!("zephyrfs.coordinator");
12 }
13
14 use coordinator_service::{
15 coordinator_service_client::CoordinatorServiceClient,
16 RegisterNodeRequest as ProtoRegisterNodeRequest,
17 RegisterNodeResponse as ProtoRegisterNodeResponse,
18 UnregisterNodeRequest as ProtoUnregisterNodeRequest,
19 UnregisterNodeResponse as ProtoUnregisterNodeResponse,
20 GetActiveNodesRequest as ProtoGetActiveNodesRequest,
21 GetActiveNodesResponse as ProtoGetActiveNodesResponse,
22 NodeHeartbeatRequest as ProtoNodeHeartbeatRequest,
23 NodeHeartbeatResponse as ProtoNodeHeartbeatResponse,
24 RegisterFileRequest as ProtoRegisterFileRequest,
25 RegisterFileResponse as ProtoRegisterFileResponse,
26 GetFileInfoRequest as ProtoGetFileInfoRequest,
27 GetFileInfoResponse as ProtoGetFileInfoResponse,
28 UpdateChunkLocationsRequest as ProtoUpdateChunkLocationsRequest,
29 UpdateChunkLocationsResponse as ProtoUpdateChunkLocationsResponse,
30 FindChunkLocationsRequest as ProtoFindChunkLocationsRequest,
31 FindChunkLocationsResponse as ProtoFindChunkLocationsResponse,
32 GetNetworkStatusRequest as ProtoGetNetworkStatusRequest,
33 GetNetworkStatusResponse as ProtoGetNetworkStatusResponse,
34 };
35
36 /// Coordinator gRPC client
37 #[derive(Clone)]
38 pub struct CoordinatorClient {
39 client: CoordinatorServiceClient<Channel>,
40 }
41
42 impl CoordinatorClient {
43 /// Create a new coordinator client
44 pub async fn new(coordinator_url: &str) -> Result<Self> {
45 debug!("Connecting to coordinator at: {}", coordinator_url);
46
47 let endpoint = Endpoint::from_shared(coordinator_url.to_string())
48 .context("Invalid coordinator URL")?
49 .timeout(Duration::from_secs(10))
50 .connect_timeout(Duration::from_secs(5));
51
52 let channel = endpoint.connect().await
53 .context("Failed to connect to coordinator")?;
54
55 let client = CoordinatorServiceClient::new(channel);
56
57 debug!("Successfully connected to coordinator");
58 Ok(Self { client })
59 }
60
61 /// Register node with coordinator
62 pub async fn register_node(&self, request: RegisterNodeRequest) -> Result<RegisterNodeResponse> {
63 let proto_request = ProtoRegisterNodeRequest {
64 node_id: request.node_id,
65 addresses: request.addresses,
66 storage_capacity: request.storage_capacity,
67 capabilities: request.capabilities,
68 };
69
70 let response = self.client.clone()
71 .register_node(Request::new(proto_request))
72 .await
73 .context("gRPC call failed")?
74 .into_inner();
75
76 Ok(RegisterNodeResponse {
77 success: response.success,
78 message: response.message,
79 assigned_node_id: response.assigned_node_id,
80 bootstrap_peers: response.bootstrap_peers,
81 })
82 }
83
84 /// Unregister node from coordinator
85 pub async fn unregister_node(&self, request: UnregisterNodeRequest) -> Result<UnregisterNodeResponse> {
86 let proto_request = ProtoUnregisterNodeRequest {
87 node_id: request.node_id,
88 reason: request.reason,
89 };
90
91 let response = self.client.clone()
92 .unregister_node(Request::new(proto_request))
93 .await
94 .context("gRPC call failed")?
95 .into_inner();
96
97 Ok(UnregisterNodeResponse {
98 success: response.success,
99 message: response.message,
100 })
101 }
102
103 /// Get active nodes from coordinator
104 pub async fn get_active_nodes(&self, request: GetActiveNodesRequest) -> Result<GetActiveNodesResponse> {
105 let proto_request = ProtoGetActiveNodesRequest {
106 limit: request.limit,
107 exclude_nodes: request.exclude_nodes,
108 };
109
110 let response = self.client.clone()
111 .get_active_nodes(Request::new(proto_request))
112 .await
113 .context("gRPC call failed")?
114 .into_inner();
115
116 let nodes = response.nodes.into_iter()
117 .map(|node| NodeStatus {
118 node_id: node.node_id,
119 addresses: node.addresses,
120 stats: node.stats.map(|stats| NodeStats {
121 storage_used: stats.storage_used,
122 storage_available: stats.storage_available,
123 chunks_stored: stats.chunks_stored,
124 bandwidth_up: stats.bandwidth_up,
125 bandwidth_down: stats.bandwidth_down,
126 cpu_usage: stats.cpu_usage,
127 memory_usage: stats.memory_usage,
128 uptime_seconds: stats.uptime_seconds,
129 }),
130 last_heartbeat: node.last_heartbeat,
131 status: node.status,
132 })
133 .collect();
134
135 Ok(GetActiveNodesResponse {
136 nodes,
137 total_nodes: response.total_nodes,
138 })
139 }
140
141 /// Send heartbeat to coordinator
142 pub async fn node_heartbeat(&self, request: NodeHeartbeatRequest) -> Result<NodeHeartbeatResponse> {
143 let proto_stats = request.stats.map(|stats| coordinator_service::NodeStats {
144 storage_used: stats.storage_used,
145 storage_available: stats.storage_available,
146 chunks_stored: stats.chunks_stored,
147 bandwidth_up: stats.bandwidth_up,
148 bandwidth_down: stats.bandwidth_down,
149 cpu_usage: stats.cpu_usage,
150 memory_usage: stats.memory_usage,
151 uptime_seconds: stats.uptime_seconds,
152 });
153
154 let proto_request = ProtoNodeHeartbeatRequest {
155 node_id: request.node_id,
156 stats: proto_stats,
157 };
158
159 let response = self.client.clone()
160 .node_heartbeat(Request::new(proto_request))
161 .await
162 .context("gRPC call failed")?
163 .into_inner();
164
165 Ok(NodeHeartbeatResponse {
166 success: response.success,
167 message: response.message,
168 tasks: response.tasks,
169 })
170 }
171
172 /// Register file with coordinator
173 pub async fn register_file(&self, request: RegisterFileRequest) -> Result<RegisterFileResponse> {
174 let proto_chunks = request.chunks.into_iter()
175 .map(|chunk| coordinator_service::ChunkMetadata {
176 chunk_id: chunk.chunk_id,
177 hash: chunk.hash,
178 size: chunk.size,
179 index: chunk.index,
180 })
181 .collect();
182
183 let proto_request = ProtoRegisterFileRequest {
184 file_id: request.file_id,
185 file_name: request.file_name,
186 file_size: request.file_size,
187 file_hash: request.file_hash,
188 chunks: proto_chunks,
189 owner_node_id: request.owner_node_id,
190 };
191
192 let response = self.client.clone()
193 .register_file(Request::new(proto_request))
194 .await
195 .context("gRPC call failed")?
196 .into_inner();
197
198 let chunk_placements = response.chunk_placements.into_iter()
199 .map(|placement| ChunkPlacement {
200 chunk_id: placement.chunk_id,
201 target_nodes: placement.target_nodes,
202 replication_factor: placement.replication_factor,
203 })
204 .collect();
205
206 Ok(RegisterFileResponse {
207 success: response.success,
208 message: response.message,
209 chunk_placements,
210 })
211 }
212
213 /// Get file info from coordinator
214 pub async fn get_file_info(&self, request: GetFileInfoRequest) -> Result<GetFileInfoResponse> {
215 let proto_request = ProtoGetFileInfoRequest {
216 file_id: request.file_id,
217 };
218
219 let response = self.client.clone()
220 .get_file_info(Request::new(proto_request))
221 .await
222 .context("gRPC call failed")?
223 .into_inner();
224
225 let file_info = response.file_info.map(|info| FileRecord {
226 file_id: info.file_id,
227 file_name: info.file_name,
228 file_size: info.file_size,
229 file_hash: info.file_hash,
230 chunks: info.chunks.into_iter()
231 .map(|chunk| ChunkRecord {
232 chunk_id: chunk.chunk_id,
233 hash: chunk.hash,
234 size: chunk.size,
235 index: chunk.index,
236 stored_at_nodes: chunk.stored_at_nodes,
237 replication_count: chunk.replication_count,
238 })
239 .collect(),
240 owner_node_id: info.owner_node_id,
241 created_at: info.created_at,
242 last_accessed: info.last_accessed,
243 });
244
245 Ok(GetFileInfoResponse {
246 success: response.success,
247 message: response.message,
248 file_info,
249 })
250 }
251
252 /// Update chunk locations
253 pub async fn update_chunk_locations(&self, request: UpdateChunkLocationsRequest) -> Result<UpdateChunkLocationsResponse> {
254 let proto_request = ProtoUpdateChunkLocationsRequest {
255 chunk_id: request.chunk_id,
256 node_ids: request.node_ids,
257 operation: request.operation,
258 };
259
260 let response = self.client.clone()
261 .update_chunk_locations(Request::new(proto_request))
262 .await
263 .context("gRPC call failed")?
264 .into_inner();
265
266 Ok(UpdateChunkLocationsResponse {
267 success: response.success,
268 message: response.message,
269 })
270 }
271
272 /// Find chunk locations
273 pub async fn find_chunk_locations(&self, request: FindChunkLocationsRequest) -> Result<FindChunkLocationsResponse> {
274 let proto_request = ProtoFindChunkLocationsRequest {
275 chunk_id: request.chunk_id,
276 preferred_count: request.preferred_count,
277 };
278
279 let response = self.client.clone()
280 .find_chunk_locations(Request::new(proto_request))
281 .await
282 .context("gRPC call failed")?
283 .into_inner();
284
285 Ok(FindChunkLocationsResponse {
286 success: response.success,
287 message: response.message,
288 node_ids: response.node_ids,
289 node_addresses: response.node_addresses,
290 })
291 }
292
293 /// Get network status
294 pub async fn get_network_status(&self, request: GetNetworkStatusRequest) -> Result<GetNetworkStatusResponse> {
295 let proto_request = ProtoGetNetworkStatusRequest {};
296
297 let response = self.client.clone()
298 .get_network_status(Request::new(proto_request))
299 .await
300 .context("gRPC call failed")?
301 .into_inner();
302
303 let network_stats = response.network_stats.map(|stats| NetworkStats {
304 total_nodes: stats.total_nodes,
305 active_nodes: stats.active_nodes,
306 total_storage_capacity: stats.total_storage_capacity,
307 total_storage_used: stats.total_storage_used,
308 total_files: stats.total_files,
309 total_chunks: stats.total_chunks,
310 average_node_uptime: stats.average_node_uptime,
311 network_uptime_seconds: stats.network_uptime_seconds,
312 });
313
314 let active_nodes = response.active_nodes.into_iter()
315 .map(|node| NodeStatus {
316 node_id: node.node_id,
317 addresses: node.addresses,
318 stats: node.stats.map(|stats| NodeStats {
319 storage_used: stats.storage_used,
320 storage_available: stats.storage_available,
321 chunks_stored: stats.chunks_stored,
322 bandwidth_up: stats.bandwidth_up,
323 bandwidth_down: stats.bandwidth_down,
324 cpu_usage: stats.cpu_usage,
325 memory_usage: stats.memory_usage,
326 uptime_seconds: stats.uptime_seconds,
327 }),
328 last_heartbeat: node.last_heartbeat,
329 status: node.status,
330 })
331 .collect();
332
333 Ok(GetNetworkStatusResponse {
334 network_stats,
335 active_nodes,
336 timestamp: response.timestamp,
337 })
338 }
339 }