Rust · 11981 bytes Raw Blame History
1 use anyhow::{Context, Result};
2 use futures::StreamExt;
3 use libp2p::{
4 core::upgrade,
5 identity,
6 mdns,
7 noise,
8 swarm::{SwarmEvent, Swarm, Config as SwarmConfig},
9 tcp,
10 yamux,
11 Multiaddr,
12 PeerId,
13 Transport,
14 };
15 use std::time::Duration;
16 use tokio::sync::mpsc;
17 use tracing::{debug, info, warn, error};
18
19 use crate::config::Config;
20
21 pub mod behaviour;
22 pub mod peer_manager;
23 pub mod message_handler;
24
25 use behaviour::ZephyrBehaviour;
26 use peer_manager::PeerManager;
27 use message_handler::MessageHandler;
28
29 /// NetworkManager handles all P2P networking for ZephyrFS
30 ///
31 /// Safety: All connections use encrypted transport with Noise protocol
32 /// Privacy: Peer discovery respects privacy settings and reputation
33 /// Transparency: All network events are logged for auditability
34 pub struct NetworkManager {
35 swarm: Swarm<ZephyrBehaviour>,
36 peer_manager: PeerManager,
37 message_handler: MessageHandler,
38 config: Config,
39 shutdown_tx: Option<mpsc::Sender<()>>,
40 }
41
42 impl NetworkManager {
43 /// Create a new NetworkManager instance
44 ///
45 /// Safety: Generates cryptographically secure keypair for node identity
46 pub async fn new(config: Config) -> Result<Self> {
47 info!("Initializing NetworkManager with security-first design");
48
49 // Generate or load node keypair (Privacy: never logged)
50 let keypair = identity::Keypair::generate_ed25519();
51 let peer_id = PeerId::from(keypair.public());
52 info!("Node PeerID: {}", peer_id);
53
54 // Create authenticated and encrypted transport
55 // Safety: All communications use Noise encryption + TLS-like security
56 let transport = tcp::tokio::Transport::default()
57 .upgrade(upgrade::Version::V1)
58 .authenticate(noise::Config::new(&keypair)?)
59 .multiplex(yamux::Config::default())
60 .timeout(Duration::from_secs(30))
61 .boxed();
62
63 // Initialize network behaviour
64 let behaviour = ZephyrBehaviour::new(&config, peer_id, &keypair).await?;
65
66 // Create swarm with secure transport
67 let swarm = Swarm::new(transport, behaviour, peer_id, SwarmConfig::with_tokio_executor());
68
69 // Initialize managers
70 let peer_manager = PeerManager::new(&config);
71 let message_handler = MessageHandler::new(&config);
72
73 Ok(Self {
74 swarm,
75 peer_manager,
76 message_handler,
77 config,
78 shutdown_tx: None,
79 })
80 }
81
82 /// Start the network manager
83 ///
84 /// Transparency: All startup steps are logged
85 pub async fn start(&mut self) -> Result<()> {
86 info!("Starting ZephyrFS networking with privacy and security protections");
87
88 // Start listening on configured port
89 let listen_addr = format!("/ip4/0.0.0.0/tcp/{}", self.config.network.p2p_port);
90 self.swarm.listen_on(listen_addr.parse()?)
91 .context("Failed to start listening")?;
92
93 info!("Listening on port {}", self.config.network.p2p_port);
94
95 // Connect to bootstrap peers if configured
96 self.connect_bootstrap_peers().await?;
97
98 // Setup shutdown channel
99 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
100 self.shutdown_tx = Some(shutdown_tx);
101
102 // Main event loop
103 loop {
104 tokio::select! {
105 event = self.swarm.select_next_some() => {
106 if let Err(e) = self.handle_swarm_event(event).await {
107 error!("Error handling swarm event: {}", e);
108 }
109 }
110
111 _ = shutdown_rx.recv() => {
112 info!("Shutdown signal received, stopping network manager");
113 break;
114 }
115 }
116 }
117
118 Ok(())
119 }
120
121 /// Connect to bootstrap peers for initial network discovery
122 ///
123 /// Safety: Only connects to explicitly configured peers
124 async fn connect_bootstrap_peers(&mut self) -> Result<()> {
125 if self.config.network.bootstrap_peers.is_empty() {
126 info!("No bootstrap peers configured, relying on mDNS discovery");
127 return Ok(());
128 }
129
130 info!("Connecting to {} bootstrap peers", self.config.network.bootstrap_peers.len());
131
132 for peer_addr in &self.config.network.bootstrap_peers {
133 match peer_addr.parse::<Multiaddr>() {
134 Ok(addr) => {
135 info!("Connecting to bootstrap peer: {}", peer_addr);
136 if let Err(e) = self.swarm.dial(addr) {
137 warn!("Failed to dial bootstrap peer {}: {}", peer_addr, e);
138 }
139 }
140 Err(e) => {
141 warn!("Invalid bootstrap peer address {}: {}", peer_addr, e);
142 }
143 }
144 }
145
146 Ok(())
147 }
148
149 /// Handle swarm events with comprehensive logging
150 ///
151 /// Transparency: All network events are logged for audit trail
152 async fn handle_swarm_event(&mut self, event: SwarmEvent<behaviour::ZephyrBehaviourEvent>) -> Result<()> {
153 match event {
154 SwarmEvent::NewListenAddr { address, .. } => {
155 info!("Listening on address: {}", address);
156 }
157
158 SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
159 info!("Connection established with peer: {} via {}", peer_id, endpoint.get_remote_address());
160 self.peer_manager.add_peer(peer_id, endpoint.get_remote_address().clone()).await;
161 }
162
163 SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
164 match cause {
165 Some(error) => warn!("Connection closed with peer {}: {}", peer_id, error),
166 None => info!("Connection closed with peer {}", peer_id),
167 }
168 self.peer_manager.remove_peer(&peer_id).await;
169 }
170
171 SwarmEvent::IncomingConnection { local_addr, send_back_addr, .. } => {
172 debug!("Incoming connection from {} to {}", send_back_addr, local_addr);
173 // Safety: Let behaviour handle connection acceptance based on security rules
174 }
175
176 SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
177 warn!("Incoming connection error from {} to {}: {}", send_back_addr, local_addr, error);
178 }
179
180 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
181 match peer_id {
182 Some(peer) => warn!("Outgoing connection error to {}: {}", peer, error),
183 None => warn!("Outgoing connection error: {}", error),
184 }
185 }
186
187 SwarmEvent::Behaviour(event) => {
188 self.handle_behaviour_event(event).await?;
189 }
190
191 _ => {
192 debug!("Unhandled swarm event: {:?}", event);
193 }
194 }
195
196 Ok(())
197 }
198
199 /// Handle behaviour-specific events
200 ///
201 /// Privacy: Peer discovery events respect reputation and trust settings
202 async fn handle_behaviour_event(&mut self, event: behaviour::ZephyrBehaviourEvent) -> Result<()> {
203 match event {
204 behaviour::ZephyrBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
205 for (peer_id, multiaddr) in list {
206 info!("Discovered peer via mDNS: {} at {}", peer_id, multiaddr);
207
208 // Safety: Apply peer reputation check before connecting
209 if self.peer_manager.should_connect_to_peer(&peer_id).await {
210 if let Err(e) = self.swarm.dial(multiaddr.clone()) {
211 warn!("Failed to dial discovered peer {}: {}", peer_id, e);
212 }
213 } else {
214 debug!("Skipping connection to peer {} due to security policy", peer_id);
215 }
216 }
217 }
218
219 behaviour::ZephyrBehaviourEvent::Mdns(mdns::Event::Expired(list)) => {
220 for (peer_id, multiaddr) in list {
221 debug!("mDNS entry expired for peer: {} at {}", peer_id, multiaddr);
222 }
223 }
224
225 behaviour::ZephyrBehaviourEvent::Ping(ping_event) => {
226 match ping_event {
227 libp2p::ping::Event {
228 peer,
229 result: Ok(rtt),
230 ..
231 } => {
232 debug!("Ping to {} successful: {:?}", peer, rtt);
233 self.peer_manager.update_peer_stats(&peer, rtt).await;
234 }
235
236 libp2p::ping::Event {
237 peer,
238 result: Err(error),
239 ..
240 } => {
241 warn!("Ping to {} failed: {}", peer, error);
242 self.peer_manager.handle_peer_failure(&peer).await;
243 }
244 }
245 }
246
247 behaviour::ZephyrBehaviourEvent::Identify(identify_event) => {
248 match identify_event {
249 libp2p::identify::Event::Received { peer_id, info, .. } => {
250 info!("Identified peer {}: protocol_version={}, agent_version={}",
251 peer_id, info.protocol_version, info.agent_version);
252
253 // Safety: Validate peer compatibility before full trust
254 self.peer_manager.validate_peer_identity(&peer_id, &info).await?;
255 }
256
257 libp2p::identify::Event::Sent { peer_id, .. } => {
258 debug!("Sent identification to peer: {}", peer_id);
259 }
260
261 libp2p::identify::Event::Pushed { peer_id, .. } => {
262 debug!("Pushed identification to peer: {}", peer_id);
263 }
264
265 libp2p::identify::Event::Error { peer_id, error, .. } => {
266 warn!("Identify error with peer {}: {}", peer_id, error);
267 }
268 }
269 }
270 }
271
272 Ok(())
273 }
274
275 /// Shutdown the network manager gracefully
276 ///
277 /// Transparency: Shutdown process is fully logged
278 pub async fn shutdown(&mut self) -> Result<()> {
279 info!("Shutting down NetworkManager gracefully");
280
281 if let Some(tx) = self.shutdown_tx.take() {
282 let _ = tx.send(()).await;
283 }
284
285 // Close all connections cleanly
286 let connected_peers: Vec<_> = self.swarm.connected_peers().cloned().collect();
287 for peer in connected_peers {
288 info!("Disconnecting from peer: {}", peer);
289 let _ = self.swarm.disconnect_peer_id(peer);
290 }
291
292 info!("NetworkManager shutdown complete");
293 Ok(())
294 }
295
296 /// Get current network statistics for monitoring
297 ///
298 /// Transparency: Network stats available for health monitoring
299 pub fn get_network_stats(&self) -> NetworkStats {
300 NetworkStats {
301 connected_peers: self.swarm.connected_peers().count(),
302 listening_addresses: self.swarm.listeners().count(),
303 pending_connections: self.swarm.network_info().num_peers(),
304 }
305 }
306 }
307
308 /// Network statistics for monitoring and transparency
309 #[derive(Debug, Clone)]
310 pub struct NetworkStats {
311 pub connected_peers: usize,
312 pub listening_addresses: usize,
313 pub pending_connections: usize,
314 }