| 1 |
use anyhow::{Result, Context}; |
| 2 |
use std::time::{Duration, SystemTime, UNIX_EPOCH}; |
| 3 |
use tokio::time::{interval, sleep}; |
| 4 |
use tracing::{debug, info, warn, error}; |
| 5 |
use uuid::Uuid; |
| 6 |
|
| 7 |
pub mod client; |
| 8 |
pub mod types; |
| 9 |
|
| 10 |
pub use client::CoordinatorClient; |
| 11 |
pub use types::*; |
| 12 |
|
| 13 |
/// Coordinator integration for node registration and coordination |
| 14 |
pub struct CoordinatorManager { |
| 15 |
client: CoordinatorClient, |
| 16 |
node_id: String, |
| 17 |
coordinator_url: String, |
| 18 |
heartbeat_interval: Duration, |
| 19 |
registration_status: RegistrationStatus, |
| 20 |
} |
| 21 |
|
| 22 |
#[derive(Debug, Clone)] |
| 23 |
pub enum RegistrationStatus { |
| 24 |
NotRegistered, |
| 25 |
Registering, |
| 26 |
Registered, |
| 27 |
Failed(String), |
| 28 |
} |
| 29 |
|
| 30 |
impl CoordinatorManager { |
| 31 |
/// Create a new coordinator manager |
| 32 |
pub async fn new(coordinator_url: String) -> Result<Self> { |
| 33 |
let client = CoordinatorClient::new(&coordinator_url).await |
| 34 |
.context("Failed to create coordinator client")?; |
| 35 |
|
| 36 |
let node_id = Uuid::new_v4().to_string(); |
| 37 |
let heartbeat_interval = Duration::from_secs(10); |
| 38 |
|
| 39 |
Ok(Self { |
| 40 |
client, |
| 41 |
node_id, |
| 42 |
coordinator_url, |
| 43 |
heartbeat_interval, |
| 44 |
registration_status: RegistrationStatus::NotRegistered, |
| 45 |
}) |
| 46 |
} |
| 47 |
|
| 48 |
/// Register this node with the coordinator |
| 49 |
pub async fn register_node( |
| 50 |
&mut self, |
| 51 |
addresses: Vec<String>, |
| 52 |
storage_capacity: u64, |
| 53 |
capabilities: std::collections::HashMap<String, String>, |
| 54 |
) -> Result<RegisterNodeResponse> { |
| 55 |
info!("Registering node {} with coordinator at {}", self.node_id, self.coordinator_url); |
| 56 |
self.registration_status = RegistrationStatus::Registering; |
| 57 |
|
| 58 |
let request = RegisterNodeRequest { |
| 59 |
node_id: self.node_id.clone(), |
| 60 |
addresses, |
| 61 |
storage_capacity: storage_capacity as i64, |
| 62 |
capabilities, |
| 63 |
}; |
| 64 |
|
| 65 |
match self.client.register_node(request).await { |
| 66 |
Ok(response) => { |
| 67 |
if response.success { |
| 68 |
self.registration_status = RegistrationStatus::Registered; |
| 69 |
info!("Successfully registered with coordinator. Assigned ID: {}", response.assigned_node_id); |
| 70 |
|
| 71 |
// Update node ID if coordinator assigned a different one |
| 72 |
if !response.assigned_node_id.is_empty() { |
| 73 |
self.node_id = response.assigned_node_id.clone(); |
| 74 |
} |
| 75 |
|
| 76 |
let success_response = RegisterNodeResponse { |
| 77 |
success: response.success, |
| 78 |
message: response.message, |
| 79 |
assigned_node_id: response.assigned_node_id, |
| 80 |
bootstrap_peers: response.bootstrap_peers, |
| 81 |
}; |
| 82 |
Ok(success_response) |
| 83 |
} else { |
| 84 |
let error_msg = format!("Registration failed: {}", response.message); |
| 85 |
self.registration_status = RegistrationStatus::Failed(error_msg.clone()); |
| 86 |
warn!("{}", error_msg); |
| 87 |
Ok(response) |
| 88 |
} |
| 89 |
} |
| 90 |
Err(e) => { |
| 91 |
let error_msg = format!("Failed to register with coordinator: {}", e); |
| 92 |
self.registration_status = RegistrationStatus::Failed(error_msg.clone()); |
| 93 |
error!("{}", error_msg); |
| 94 |
Err(e) |
| 95 |
} |
| 96 |
} |
| 97 |
} |
| 98 |
|
| 99 |
/// Start heartbeat loop |
| 100 |
pub async fn start_heartbeat(&self, stats_provider: impl Fn() -> NodeStats + Send + 'static) { |
| 101 |
let client = self.client.clone(); |
| 102 |
let node_id = self.node_id.clone(); |
| 103 |
let heartbeat_interval = self.heartbeat_interval; |
| 104 |
|
| 105 |
tokio::spawn(async move { |
| 106 |
let mut interval = interval(heartbeat_interval); |
| 107 |
|
| 108 |
loop { |
| 109 |
interval.tick().await; |
| 110 |
|
| 111 |
let stats = stats_provider(); |
| 112 |
let request = NodeHeartbeatRequest { |
| 113 |
node_id: node_id.clone(), |
| 114 |
stats: Some(stats), |
| 115 |
}; |
| 116 |
|
| 117 |
match client.node_heartbeat(request).await { |
| 118 |
Ok(response) => { |
| 119 |
if response.success { |
| 120 |
debug!("Heartbeat sent successfully"); |
| 121 |
if !response.tasks.is_empty() { |
| 122 |
debug!("Coordinator assigned {} tasks", response.tasks.len()); |
| 123 |
// TODO: Handle assigned tasks |
| 124 |
} |
| 125 |
} else { |
| 126 |
warn!("Heartbeat failed: {}", response.message); |
| 127 |
} |
| 128 |
} |
| 129 |
Err(e) => { |
| 130 |
warn!("Failed to send heartbeat: {}", e); |
| 131 |
// TODO: Implement exponential backoff |
| 132 |
sleep(Duration::from_secs(5)).await; |
| 133 |
} |
| 134 |
} |
| 135 |
} |
| 136 |
}); |
| 137 |
} |
| 138 |
|
| 139 |
/// Register a file with the coordinator |
| 140 |
pub async fn register_file( |
| 141 |
&self, |
| 142 |
file_id: String, |
| 143 |
file_name: String, |
| 144 |
file_size: u64, |
| 145 |
file_hash: String, |
| 146 |
chunks: Vec<ChunkMetadata>, |
| 147 |
) -> Result<RegisterFileResponse> { |
| 148 |
debug!("Registering file {} with coordinator", file_id); |
| 149 |
|
| 150 |
let request = RegisterFileRequest { |
| 151 |
file_id, |
| 152 |
file_name, |
| 153 |
file_size: file_size as i64, |
| 154 |
file_hash, |
| 155 |
chunks, |
| 156 |
owner_node_id: self.node_id.clone(), |
| 157 |
}; |
| 158 |
|
| 159 |
self.client.register_file(request).await |
| 160 |
.context("Failed to register file with coordinator") |
| 161 |
} |
| 162 |
|
| 163 |
/// Find chunk locations from coordinator |
| 164 |
pub async fn find_chunk_locations(&self, chunk_id: String, preferred_count: i32) -> Result<FindChunkLocationsResponse> { |
| 165 |
debug!("Finding locations for chunk {} from coordinator", chunk_id); |
| 166 |
|
| 167 |
let request = FindChunkLocationsRequest { |
| 168 |
chunk_id, |
| 169 |
preferred_count, |
| 170 |
}; |
| 171 |
|
| 172 |
self.client.find_chunk_locations(request).await |
| 173 |
.context("Failed to find chunk locations from coordinator") |
| 174 |
} |
| 175 |
|
| 176 |
/// Get active nodes from coordinator |
| 177 |
pub async fn get_active_nodes(&self, limit: Option<i32>, exclude_nodes: Vec<String>) -> Result<GetActiveNodesResponse> { |
| 178 |
debug!("Getting active nodes from coordinator"); |
| 179 |
|
| 180 |
let request = GetActiveNodesRequest { |
| 181 |
limit: limit.unwrap_or(10), |
| 182 |
exclude_nodes, |
| 183 |
}; |
| 184 |
|
| 185 |
self.client.get_active_nodes(request).await |
| 186 |
.context("Failed to get active nodes from coordinator") |
| 187 |
} |
| 188 |
|
| 189 |
/// Get network status from coordinator |
| 190 |
pub async fn get_network_status(&self) -> Result<GetNetworkStatusResponse> { |
| 191 |
debug!("Getting network status from coordinator"); |
| 192 |
|
| 193 |
let request = GetNetworkStatusRequest {}; |
| 194 |
|
| 195 |
self.client.get_network_status(request).await |
| 196 |
.context("Failed to get network status from coordinator") |
| 197 |
} |
| 198 |
|
| 199 |
/// Update chunk locations |
| 200 |
pub async fn update_chunk_locations( |
| 201 |
&self, |
| 202 |
chunk_id: String, |
| 203 |
node_ids: Vec<String>, |
| 204 |
operation: String, |
| 205 |
) -> Result<UpdateChunkLocationsResponse> { |
| 206 |
debug!("Updating chunk locations for {} (operation: {})", chunk_id, operation); |
| 207 |
|
| 208 |
let request = UpdateChunkLocationsRequest { |
| 209 |
chunk_id, |
| 210 |
node_ids, |
| 211 |
operation, |
| 212 |
}; |
| 213 |
|
| 214 |
self.client.update_chunk_locations(request).await |
| 215 |
.context("Failed to update chunk locations") |
| 216 |
} |
| 217 |
|
| 218 |
/// Unregister this node from coordinator |
| 219 |
pub async fn unregister_node(&mut self, reason: Option<String>) -> Result<UnregisterNodeResponse> { |
| 220 |
info!("Unregistering node {} from coordinator", self.node_id); |
| 221 |
|
| 222 |
let request = UnregisterNodeRequest { |
| 223 |
node_id: self.node_id.clone(), |
| 224 |
reason: reason.unwrap_or_else(|| "Normal shutdown".to_string()), |
| 225 |
}; |
| 226 |
|
| 227 |
match self.client.unregister_node(request).await { |
| 228 |
Ok(response) => { |
| 229 |
if response.success { |
| 230 |
self.registration_status = RegistrationStatus::NotRegistered; |
| 231 |
info!("Successfully unregistered from coordinator"); |
| 232 |
} else { |
| 233 |
warn!("Unregistration failed: {}", response.message); |
| 234 |
} |
| 235 |
Ok(response) |
| 236 |
} |
| 237 |
Err(e) => { |
| 238 |
error!("Failed to unregister from coordinator: {}", e); |
| 239 |
Err(e) |
| 240 |
} |
| 241 |
} |
| 242 |
} |
| 243 |
|
| 244 |
/// Get current registration status |
| 245 |
pub fn get_registration_status(&self) -> &RegistrationStatus { |
| 246 |
&self.registration_status |
| 247 |
} |
| 248 |
|
| 249 |
/// Get node ID |
| 250 |
pub fn get_node_id(&self) -> &str { |
| 251 |
&self.node_id |
| 252 |
} |
| 253 |
} |
| 254 |
|
| 255 |
/// Convert system time to Unix timestamp |
| 256 |
pub fn system_time_to_unix_timestamp(time: SystemTime) -> i64 { |
| 257 |
time.duration_since(UNIX_EPOCH) |
| 258 |
.unwrap_or_default() |
| 259 |
.as_secs() as i64 |
| 260 |
} |