| 1 |
use anyhow::{Result, Context}; |
| 2 |
use std::time::Duration; |
| 3 |
use tonic::transport::{Channel, Endpoint}; |
| 4 |
use tonic::{Request, Response, Status}; |
| 5 |
use tracing::{debug, warn}; |
| 6 |
|
| 7 |
use super::types::*; |
| 8 |
|
| 9 |
/// Generated gRPC client code |
| 10 |
pub mod coordinator_service { |
| 11 |
tonic::include_proto!("zephyrfs.coordinator"); |
| 12 |
} |
| 13 |
|
| 14 |
use coordinator_service::{ |
| 15 |
coordinator_service_client::CoordinatorServiceClient, |
| 16 |
RegisterNodeRequest as ProtoRegisterNodeRequest, |
| 17 |
RegisterNodeResponse as ProtoRegisterNodeResponse, |
| 18 |
UnregisterNodeRequest as ProtoUnregisterNodeRequest, |
| 19 |
UnregisterNodeResponse as ProtoUnregisterNodeResponse, |
| 20 |
GetActiveNodesRequest as ProtoGetActiveNodesRequest, |
| 21 |
GetActiveNodesResponse as ProtoGetActiveNodesResponse, |
| 22 |
NodeHeartbeatRequest as ProtoNodeHeartbeatRequest, |
| 23 |
NodeHeartbeatResponse as ProtoNodeHeartbeatResponse, |
| 24 |
RegisterFileRequest as ProtoRegisterFileRequest, |
| 25 |
RegisterFileResponse as ProtoRegisterFileResponse, |
| 26 |
GetFileInfoRequest as ProtoGetFileInfoRequest, |
| 27 |
GetFileInfoResponse as ProtoGetFileInfoResponse, |
| 28 |
UpdateChunkLocationsRequest as ProtoUpdateChunkLocationsRequest, |
| 29 |
UpdateChunkLocationsResponse as ProtoUpdateChunkLocationsResponse, |
| 30 |
FindChunkLocationsRequest as ProtoFindChunkLocationsRequest, |
| 31 |
FindChunkLocationsResponse as ProtoFindChunkLocationsResponse, |
| 32 |
GetNetworkStatusRequest as ProtoGetNetworkStatusRequest, |
| 33 |
GetNetworkStatusResponse as ProtoGetNetworkStatusResponse, |
| 34 |
}; |
| 35 |
|
| 36 |
/// Coordinator gRPC client |
| 37 |
#[derive(Clone)] |
| 38 |
pub struct CoordinatorClient { |
| 39 |
client: CoordinatorServiceClient<Channel>, |
| 40 |
} |
| 41 |
|
| 42 |
impl CoordinatorClient { |
| 43 |
/// Create a new coordinator client |
| 44 |
pub async fn new(coordinator_url: &str) -> Result<Self> { |
| 45 |
debug!("Connecting to coordinator at: {}", coordinator_url); |
| 46 |
|
| 47 |
let endpoint = Endpoint::from_shared(coordinator_url.to_string()) |
| 48 |
.context("Invalid coordinator URL")? |
| 49 |
.timeout(Duration::from_secs(10)) |
| 50 |
.connect_timeout(Duration::from_secs(5)); |
| 51 |
|
| 52 |
let channel = endpoint.connect().await |
| 53 |
.context("Failed to connect to coordinator")?; |
| 54 |
|
| 55 |
let client = CoordinatorServiceClient::new(channel); |
| 56 |
|
| 57 |
debug!("Successfully connected to coordinator"); |
| 58 |
Ok(Self { client }) |
| 59 |
} |
| 60 |
|
| 61 |
/// Register node with coordinator |
| 62 |
pub async fn register_node(&self, request: RegisterNodeRequest) -> Result<RegisterNodeResponse> { |
| 63 |
let proto_request = ProtoRegisterNodeRequest { |
| 64 |
node_id: request.node_id, |
| 65 |
addresses: request.addresses, |
| 66 |
storage_capacity: request.storage_capacity, |
| 67 |
capabilities: request.capabilities, |
| 68 |
}; |
| 69 |
|
| 70 |
let response = self.client.clone() |
| 71 |
.register_node(Request::new(proto_request)) |
| 72 |
.await |
| 73 |
.context("gRPC call failed")? |
| 74 |
.into_inner(); |
| 75 |
|
| 76 |
Ok(RegisterNodeResponse { |
| 77 |
success: response.success, |
| 78 |
message: response.message, |
| 79 |
assigned_node_id: response.assigned_node_id, |
| 80 |
bootstrap_peers: response.bootstrap_peers, |
| 81 |
}) |
| 82 |
} |
| 83 |
|
| 84 |
/// Unregister node from coordinator |
| 85 |
pub async fn unregister_node(&self, request: UnregisterNodeRequest) -> Result<UnregisterNodeResponse> { |
| 86 |
let proto_request = ProtoUnregisterNodeRequest { |
| 87 |
node_id: request.node_id, |
| 88 |
reason: request.reason, |
| 89 |
}; |
| 90 |
|
| 91 |
let response = self.client.clone() |
| 92 |
.unregister_node(Request::new(proto_request)) |
| 93 |
.await |
| 94 |
.context("gRPC call failed")? |
| 95 |
.into_inner(); |
| 96 |
|
| 97 |
Ok(UnregisterNodeResponse { |
| 98 |
success: response.success, |
| 99 |
message: response.message, |
| 100 |
}) |
| 101 |
} |
| 102 |
|
| 103 |
/// Get active nodes from coordinator |
| 104 |
pub async fn get_active_nodes(&self, request: GetActiveNodesRequest) -> Result<GetActiveNodesResponse> { |
| 105 |
let proto_request = ProtoGetActiveNodesRequest { |
| 106 |
limit: request.limit, |
| 107 |
exclude_nodes: request.exclude_nodes, |
| 108 |
}; |
| 109 |
|
| 110 |
let response = self.client.clone() |
| 111 |
.get_active_nodes(Request::new(proto_request)) |
| 112 |
.await |
| 113 |
.context("gRPC call failed")? |
| 114 |
.into_inner(); |
| 115 |
|
| 116 |
let nodes = response.nodes.into_iter() |
| 117 |
.map(|node| NodeStatus { |
| 118 |
node_id: node.node_id, |
| 119 |
addresses: node.addresses, |
| 120 |
stats: node.stats.map(|stats| NodeStats { |
| 121 |
storage_used: stats.storage_used, |
| 122 |
storage_available: stats.storage_available, |
| 123 |
chunks_stored: stats.chunks_stored, |
| 124 |
bandwidth_up: stats.bandwidth_up, |
| 125 |
bandwidth_down: stats.bandwidth_down, |
| 126 |
cpu_usage: stats.cpu_usage, |
| 127 |
memory_usage: stats.memory_usage, |
| 128 |
uptime_seconds: stats.uptime_seconds, |
| 129 |
}), |
| 130 |
last_heartbeat: node.last_heartbeat, |
| 131 |
status: node.status, |
| 132 |
}) |
| 133 |
.collect(); |
| 134 |
|
| 135 |
Ok(GetActiveNodesResponse { |
| 136 |
nodes, |
| 137 |
total_nodes: response.total_nodes, |
| 138 |
}) |
| 139 |
} |
| 140 |
|
| 141 |
/// Send heartbeat to coordinator |
| 142 |
pub async fn node_heartbeat(&self, request: NodeHeartbeatRequest) -> Result<NodeHeartbeatResponse> { |
| 143 |
let proto_stats = request.stats.map(|stats| coordinator_service::NodeStats { |
| 144 |
storage_used: stats.storage_used, |
| 145 |
storage_available: stats.storage_available, |
| 146 |
chunks_stored: stats.chunks_stored, |
| 147 |
bandwidth_up: stats.bandwidth_up, |
| 148 |
bandwidth_down: stats.bandwidth_down, |
| 149 |
cpu_usage: stats.cpu_usage, |
| 150 |
memory_usage: stats.memory_usage, |
| 151 |
uptime_seconds: stats.uptime_seconds, |
| 152 |
}); |
| 153 |
|
| 154 |
let proto_request = ProtoNodeHeartbeatRequest { |
| 155 |
node_id: request.node_id, |
| 156 |
stats: proto_stats, |
| 157 |
}; |
| 158 |
|
| 159 |
let response = self.client.clone() |
| 160 |
.node_heartbeat(Request::new(proto_request)) |
| 161 |
.await |
| 162 |
.context("gRPC call failed")? |
| 163 |
.into_inner(); |
| 164 |
|
| 165 |
Ok(NodeHeartbeatResponse { |
| 166 |
success: response.success, |
| 167 |
message: response.message, |
| 168 |
tasks: response.tasks, |
| 169 |
}) |
| 170 |
} |
| 171 |
|
| 172 |
/// Register file with coordinator |
| 173 |
pub async fn register_file(&self, request: RegisterFileRequest) -> Result<RegisterFileResponse> { |
| 174 |
let proto_chunks = request.chunks.into_iter() |
| 175 |
.map(|chunk| coordinator_service::ChunkMetadata { |
| 176 |
chunk_id: chunk.chunk_id, |
| 177 |
hash: chunk.hash, |
| 178 |
size: chunk.size, |
| 179 |
index: chunk.index, |
| 180 |
}) |
| 181 |
.collect(); |
| 182 |
|
| 183 |
let proto_request = ProtoRegisterFileRequest { |
| 184 |
file_id: request.file_id, |
| 185 |
file_name: request.file_name, |
| 186 |
file_size: request.file_size, |
| 187 |
file_hash: request.file_hash, |
| 188 |
chunks: proto_chunks, |
| 189 |
owner_node_id: request.owner_node_id, |
| 190 |
}; |
| 191 |
|
| 192 |
let response = self.client.clone() |
| 193 |
.register_file(Request::new(proto_request)) |
| 194 |
.await |
| 195 |
.context("gRPC call failed")? |
| 196 |
.into_inner(); |
| 197 |
|
| 198 |
let chunk_placements = response.chunk_placements.into_iter() |
| 199 |
.map(|placement| ChunkPlacement { |
| 200 |
chunk_id: placement.chunk_id, |
| 201 |
target_nodes: placement.target_nodes, |
| 202 |
replication_factor: placement.replication_factor, |
| 203 |
}) |
| 204 |
.collect(); |
| 205 |
|
| 206 |
Ok(RegisterFileResponse { |
| 207 |
success: response.success, |
| 208 |
message: response.message, |
| 209 |
chunk_placements, |
| 210 |
}) |
| 211 |
} |
| 212 |
|
| 213 |
/// Get file info from coordinator |
| 214 |
pub async fn get_file_info(&self, request: GetFileInfoRequest) -> Result<GetFileInfoResponse> { |
| 215 |
let proto_request = ProtoGetFileInfoRequest { |
| 216 |
file_id: request.file_id, |
| 217 |
}; |
| 218 |
|
| 219 |
let response = self.client.clone() |
| 220 |
.get_file_info(Request::new(proto_request)) |
| 221 |
.await |
| 222 |
.context("gRPC call failed")? |
| 223 |
.into_inner(); |
| 224 |
|
| 225 |
let file_info = response.file_info.map(|info| FileRecord { |
| 226 |
file_id: info.file_id, |
| 227 |
file_name: info.file_name, |
| 228 |
file_size: info.file_size, |
| 229 |
file_hash: info.file_hash, |
| 230 |
chunks: info.chunks.into_iter() |
| 231 |
.map(|chunk| ChunkRecord { |
| 232 |
chunk_id: chunk.chunk_id, |
| 233 |
hash: chunk.hash, |
| 234 |
size: chunk.size, |
| 235 |
index: chunk.index, |
| 236 |
stored_at_nodes: chunk.stored_at_nodes, |
| 237 |
replication_count: chunk.replication_count, |
| 238 |
}) |
| 239 |
.collect(), |
| 240 |
owner_node_id: info.owner_node_id, |
| 241 |
created_at: info.created_at, |
| 242 |
last_accessed: info.last_accessed, |
| 243 |
}); |
| 244 |
|
| 245 |
Ok(GetFileInfoResponse { |
| 246 |
success: response.success, |
| 247 |
message: response.message, |
| 248 |
file_info, |
| 249 |
}) |
| 250 |
} |
| 251 |
|
| 252 |
/// Update chunk locations |
| 253 |
pub async fn update_chunk_locations(&self, request: UpdateChunkLocationsRequest) -> Result<UpdateChunkLocationsResponse> { |
| 254 |
let proto_request = ProtoUpdateChunkLocationsRequest { |
| 255 |
chunk_id: request.chunk_id, |
| 256 |
node_ids: request.node_ids, |
| 257 |
operation: request.operation, |
| 258 |
}; |
| 259 |
|
| 260 |
let response = self.client.clone() |
| 261 |
.update_chunk_locations(Request::new(proto_request)) |
| 262 |
.await |
| 263 |
.context("gRPC call failed")? |
| 264 |
.into_inner(); |
| 265 |
|
| 266 |
Ok(UpdateChunkLocationsResponse { |
| 267 |
success: response.success, |
| 268 |
message: response.message, |
| 269 |
}) |
| 270 |
} |
| 271 |
|
| 272 |
/// Find chunk locations |
| 273 |
pub async fn find_chunk_locations(&self, request: FindChunkLocationsRequest) -> Result<FindChunkLocationsResponse> { |
| 274 |
let proto_request = ProtoFindChunkLocationsRequest { |
| 275 |
chunk_id: request.chunk_id, |
| 276 |
preferred_count: request.preferred_count, |
| 277 |
}; |
| 278 |
|
| 279 |
let response = self.client.clone() |
| 280 |
.find_chunk_locations(Request::new(proto_request)) |
| 281 |
.await |
| 282 |
.context("gRPC call failed")? |
| 283 |
.into_inner(); |
| 284 |
|
| 285 |
Ok(FindChunkLocationsResponse { |
| 286 |
success: response.success, |
| 287 |
message: response.message, |
| 288 |
node_ids: response.node_ids, |
| 289 |
node_addresses: response.node_addresses, |
| 290 |
}) |
| 291 |
} |
| 292 |
|
| 293 |
/// Get network status |
| 294 |
pub async fn get_network_status(&self, request: GetNetworkStatusRequest) -> Result<GetNetworkStatusResponse> { |
| 295 |
let proto_request = ProtoGetNetworkStatusRequest {}; |
| 296 |
|
| 297 |
let response = self.client.clone() |
| 298 |
.get_network_status(Request::new(proto_request)) |
| 299 |
.await |
| 300 |
.context("gRPC call failed")? |
| 301 |
.into_inner(); |
| 302 |
|
| 303 |
let network_stats = response.network_stats.map(|stats| NetworkStats { |
| 304 |
total_nodes: stats.total_nodes, |
| 305 |
active_nodes: stats.active_nodes, |
| 306 |
total_storage_capacity: stats.total_storage_capacity, |
| 307 |
total_storage_used: stats.total_storage_used, |
| 308 |
total_files: stats.total_files, |
| 309 |
total_chunks: stats.total_chunks, |
| 310 |
average_node_uptime: stats.average_node_uptime, |
| 311 |
network_uptime_seconds: stats.network_uptime_seconds, |
| 312 |
}); |
| 313 |
|
| 314 |
let active_nodes = response.active_nodes.into_iter() |
| 315 |
.map(|node| NodeStatus { |
| 316 |
node_id: node.node_id, |
| 317 |
addresses: node.addresses, |
| 318 |
stats: node.stats.map(|stats| NodeStats { |
| 319 |
storage_used: stats.storage_used, |
| 320 |
storage_available: stats.storage_available, |
| 321 |
chunks_stored: stats.chunks_stored, |
| 322 |
bandwidth_up: stats.bandwidth_up, |
| 323 |
bandwidth_down: stats.bandwidth_down, |
| 324 |
cpu_usage: stats.cpu_usage, |
| 325 |
memory_usage: stats.memory_usage, |
| 326 |
uptime_seconds: stats.uptime_seconds, |
| 327 |
}), |
| 328 |
last_heartbeat: node.last_heartbeat, |
| 329 |
status: node.status, |
| 330 |
}) |
| 331 |
.collect(); |
| 332 |
|
| 333 |
Ok(GetNetworkStatusResponse { |
| 334 |
network_stats, |
| 335 |
active_nodes, |
| 336 |
timestamp: response.timestamp, |
| 337 |
}) |
| 338 |
} |
| 339 |
} |