Rust · 8974 bytes Raw Blame History
1 use anyhow::{Result, Context};
2 use std::time::{Duration, SystemTime, UNIX_EPOCH};
3 use tokio::time::{interval, sleep};
4 use tracing::{debug, info, warn, error};
5 use uuid::Uuid;
6
7 pub mod client;
8 pub mod types;
9
10 pub use client::CoordinatorClient;
11 pub use types::*;
12
13 /// Coordinator integration for node registration and coordination
14 pub struct CoordinatorManager {
15 client: CoordinatorClient,
16 node_id: String,
17 coordinator_url: String,
18 heartbeat_interval: Duration,
19 registration_status: RegistrationStatus,
20 }
21
22 #[derive(Debug, Clone)]
23 pub enum RegistrationStatus {
24 NotRegistered,
25 Registering,
26 Registered,
27 Failed(String),
28 }
29
30 impl CoordinatorManager {
31 /// Create a new coordinator manager
32 pub async fn new(coordinator_url: String) -> Result<Self> {
33 let client = CoordinatorClient::new(&coordinator_url).await
34 .context("Failed to create coordinator client")?;
35
36 let node_id = Uuid::new_v4().to_string();
37 let heartbeat_interval = Duration::from_secs(10);
38
39 Ok(Self {
40 client,
41 node_id,
42 coordinator_url,
43 heartbeat_interval,
44 registration_status: RegistrationStatus::NotRegistered,
45 })
46 }
47
48 /// Register this node with the coordinator
49 pub async fn register_node(
50 &mut self,
51 addresses: Vec<String>,
52 storage_capacity: u64,
53 capabilities: std::collections::HashMap<String, String>,
54 ) -> Result<RegisterNodeResponse> {
55 info!("Registering node {} with coordinator at {}", self.node_id, self.coordinator_url);
56 self.registration_status = RegistrationStatus::Registering;
57
58 let request = RegisterNodeRequest {
59 node_id: self.node_id.clone(),
60 addresses,
61 storage_capacity: storage_capacity as i64,
62 capabilities,
63 };
64
65 match self.client.register_node(request).await {
66 Ok(response) => {
67 if response.success {
68 self.registration_status = RegistrationStatus::Registered;
69 info!("Successfully registered with coordinator. Assigned ID: {}", response.assigned_node_id);
70
71 // Update node ID if coordinator assigned a different one
72 if !response.assigned_node_id.is_empty() {
73 self.node_id = response.assigned_node_id.clone();
74 }
75
76 let success_response = RegisterNodeResponse {
77 success: response.success,
78 message: response.message,
79 assigned_node_id: response.assigned_node_id,
80 bootstrap_peers: response.bootstrap_peers,
81 };
82 Ok(success_response)
83 } else {
84 let error_msg = format!("Registration failed: {}", response.message);
85 self.registration_status = RegistrationStatus::Failed(error_msg.clone());
86 warn!("{}", error_msg);
87 Ok(response)
88 }
89 }
90 Err(e) => {
91 let error_msg = format!("Failed to register with coordinator: {}", e);
92 self.registration_status = RegistrationStatus::Failed(error_msg.clone());
93 error!("{}", error_msg);
94 Err(e)
95 }
96 }
97 }
98
99 /// Start heartbeat loop
100 pub async fn start_heartbeat(&self, stats_provider: impl Fn() -> NodeStats + Send + 'static) {
101 let client = self.client.clone();
102 let node_id = self.node_id.clone();
103 let heartbeat_interval = self.heartbeat_interval;
104
105 tokio::spawn(async move {
106 let mut interval = interval(heartbeat_interval);
107
108 loop {
109 interval.tick().await;
110
111 let stats = stats_provider();
112 let request = NodeHeartbeatRequest {
113 node_id: node_id.clone(),
114 stats: Some(stats),
115 };
116
117 match client.node_heartbeat(request).await {
118 Ok(response) => {
119 if response.success {
120 debug!("Heartbeat sent successfully");
121 if !response.tasks.is_empty() {
122 debug!("Coordinator assigned {} tasks", response.tasks.len());
123 // TODO: Handle assigned tasks
124 }
125 } else {
126 warn!("Heartbeat failed: {}", response.message);
127 }
128 }
129 Err(e) => {
130 warn!("Failed to send heartbeat: {}", e);
131 // TODO: Implement exponential backoff
132 sleep(Duration::from_secs(5)).await;
133 }
134 }
135 }
136 });
137 }
138
139 /// Register a file with the coordinator
140 pub async fn register_file(
141 &self,
142 file_id: String,
143 file_name: String,
144 file_size: u64,
145 file_hash: String,
146 chunks: Vec<ChunkMetadata>,
147 ) -> Result<RegisterFileResponse> {
148 debug!("Registering file {} with coordinator", file_id);
149
150 let request = RegisterFileRequest {
151 file_id,
152 file_name,
153 file_size: file_size as i64,
154 file_hash,
155 chunks,
156 owner_node_id: self.node_id.clone(),
157 };
158
159 self.client.register_file(request).await
160 .context("Failed to register file with coordinator")
161 }
162
163 /// Find chunk locations from coordinator
164 pub async fn find_chunk_locations(&self, chunk_id: String, preferred_count: i32) -> Result<FindChunkLocationsResponse> {
165 debug!("Finding locations for chunk {} from coordinator", chunk_id);
166
167 let request = FindChunkLocationsRequest {
168 chunk_id,
169 preferred_count,
170 };
171
172 self.client.find_chunk_locations(request).await
173 .context("Failed to find chunk locations from coordinator")
174 }
175
176 /// Get active nodes from coordinator
177 pub async fn get_active_nodes(&self, limit: Option<i32>, exclude_nodes: Vec<String>) -> Result<GetActiveNodesResponse> {
178 debug!("Getting active nodes from coordinator");
179
180 let request = GetActiveNodesRequest {
181 limit: limit.unwrap_or(10),
182 exclude_nodes,
183 };
184
185 self.client.get_active_nodes(request).await
186 .context("Failed to get active nodes from coordinator")
187 }
188
189 /// Get network status from coordinator
190 pub async fn get_network_status(&self) -> Result<GetNetworkStatusResponse> {
191 debug!("Getting network status from coordinator");
192
193 let request = GetNetworkStatusRequest {};
194
195 self.client.get_network_status(request).await
196 .context("Failed to get network status from coordinator")
197 }
198
199 /// Update chunk locations
200 pub async fn update_chunk_locations(
201 &self,
202 chunk_id: String,
203 node_ids: Vec<String>,
204 operation: String,
205 ) -> Result<UpdateChunkLocationsResponse> {
206 debug!("Updating chunk locations for {} (operation: {})", chunk_id, operation);
207
208 let request = UpdateChunkLocationsRequest {
209 chunk_id,
210 node_ids,
211 operation,
212 };
213
214 self.client.update_chunk_locations(request).await
215 .context("Failed to update chunk locations")
216 }
217
218 /// Unregister this node from coordinator
219 pub async fn unregister_node(&mut self, reason: Option<String>) -> Result<UnregisterNodeResponse> {
220 info!("Unregistering node {} from coordinator", self.node_id);
221
222 let request = UnregisterNodeRequest {
223 node_id: self.node_id.clone(),
224 reason: reason.unwrap_or_else(|| "Normal shutdown".to_string()),
225 };
226
227 match self.client.unregister_node(request).await {
228 Ok(response) => {
229 if response.success {
230 self.registration_status = RegistrationStatus::NotRegistered;
231 info!("Successfully unregistered from coordinator");
232 } else {
233 warn!("Unregistration failed: {}", response.message);
234 }
235 Ok(response)
236 }
237 Err(e) => {
238 error!("Failed to unregister from coordinator: {}", e);
239 Err(e)
240 }
241 }
242 }
243
244 /// Get current registration status
245 pub fn get_registration_status(&self) -> &RegistrationStatus {
246 &self.registration_status
247 }
248
249 /// Get node ID
250 pub fn get_node_id(&self) -> &str {
251 &self.node_id
252 }
253 }
254
255 /// Convert system time to Unix timestamp
256 pub fn system_time_to_unix_timestamp(time: SystemTime) -> i64 {
257 time.duration_since(UNIX_EPOCH)
258 .unwrap_or_default()
259 .as_secs() as i64
260 }