Go · 12798 bytes Raw Blame History
1 package health
2
3 import (
4 "context"
5 "fmt"
6 "net/http"
7 "runtime"
8 "time"
9
10 "github.com/gin-gonic/gin"
11 "github.com/sirupsen/logrus"
12
13 "github.com/ZephyrFS/zephyrfs-coordinator/internal/config"
14 "github.com/ZephyrFS/zephyrfs-coordinator/internal/coordinator"
15 )
16
17 // Monitor represents the health monitoring system
18 type Monitor struct {
19 coordinator *coordinator.Coordinator
20 config config.HealthConfig
21 metrics *Metrics
22 startTime time.Time
23 }
24
25 // Metrics represents collected health metrics
26 type Metrics struct {
27 // System metrics
28 MemoryUsage MemoryStats `json:"memory_usage"`
29 CPUUsage float64 `json:"cpu_usage"`
30 GoroutineCount int `json:"goroutine_count"`
31
32 // Application metrics
33 RequestCount int64 `json:"request_count"`
34 ErrorCount int64 `json:"error_count"`
35 ResponseTimes ResponseTimeStats `json:"response_times"`
36 DatabaseStats DatabaseStats `json:"database_stats"`
37
38 // Network metrics
39 NetworkStats NetworkHealthStats `json:"network_stats"`
40
41 // Coordinator-specific metrics
42 CoordinatorStats CoordinatorHealthStats `json:"coordinator_stats"`
43
44 // Timestamps
45 LastUpdated time.Time `json:"last_updated"`
46 Uptime string `json:"uptime"`
47 }
48
49 // MemoryStats represents memory usage statistics
50 type MemoryStats struct {
51 Allocated uint64 `json:"allocated"` // bytes allocated and still in use
52 TotalAllocated uint64 `json:"total_allocated"` // bytes allocated (even if freed)
53 SystemMemory uint64 `json:"system_memory"` // bytes obtained from system
54 GCCount uint32 `json:"gc_count"` // number of garbage collections
55 HeapSize uint64 `json:"heap_size"` // heap size
56 HeapInUse uint64 `json:"heap_in_use"` // heap bytes in use
57 }
58
59 // ResponseTimeStats represents response time statistics
60 type ResponseTimeStats struct {
61 Average float64 `json:"average"`
62 Min float64 `json:"min"`
63 Max float64 `json:"max"`
64 P50 float64 `json:"p50"`
65 P95 float64 `json:"p95"`
66 P99 float64 `json:"p99"`
67 }
68
69 // DatabaseStats represents database health statistics
70 type DatabaseStats struct {
71 ConnectionCount int64 `json:"connection_count"`
72 QueryCount int64 `json:"query_count"`
73 ErrorCount int64 `json:"error_count"`
74 AverageLatency float64 `json:"average_latency"`
75 }
76
77 // NetworkHealthStats represents network health statistics
78 type NetworkHealthStats struct {
79 ActiveNodes int `json:"active_nodes"`
80 InactiveNodes int `json:"inactive_nodes"`
81 TotalConnections int64 `json:"total_connections"`
82 FailedConnections int64 `json:"failed_connections"`
83 }
84
85 // CoordinatorHealthStats represents coordinator-specific health metrics
86 type CoordinatorHealthStats struct {
87 RegisteredNodes int `json:"registered_nodes"`
88 ActiveFiles int `json:"active_files"`
89 TotalChunks int `json:"total_chunks"`
90 ReplicationTasks int `json:"replication_tasks"`
91 LastHeartbeat int64 `json:"last_heartbeat"`
92 }
93
94 // NewMonitor creates a new health monitor
95 func NewMonitor(coord *coordinator.Coordinator, cfg config.HealthConfig) *Monitor {
96 return &Monitor{
97 coordinator: coord,
98 config: cfg,
99 metrics: &Metrics{},
100 startTime: time.Now(),
101 }
102 }
103
104 // StartMonitoring starts the health monitoring background process
105 func StartMonitoring(ctx context.Context, coord *coordinator.Coordinator, cfg config.HealthConfig) {
106 monitor := NewMonitor(coord, cfg)
107
108 // Start metrics collection
109 go monitor.collectMetrics(ctx)
110
111 // Start metrics HTTP server if enabled
112 if cfg.MetricsEnabled {
113 go monitor.startMetricsServer(ctx)
114 }
115
116 logrus.WithFields(logrus.Fields{
117 "check_interval": cfg.CheckInterval,
118 "metrics_enabled": cfg.MetricsEnabled,
119 "metrics_port": cfg.MetricsPort,
120 }).Info("Health monitoring started")
121 }
122
123 // collectMetrics runs the periodic metrics collection
124 func (m *Monitor) collectMetrics(ctx context.Context) {
125 ticker := time.NewTicker(m.config.CheckInterval)
126 defer ticker.Stop()
127
128 for {
129 select {
130 case <-ctx.Done():
131 logrus.Info("Stopping health metrics collection")
132 return
133 case <-ticker.C:
134 m.updateMetrics()
135 }
136 }
137 }
138
139 // updateMetrics collects current system and application metrics
140 func (m *Monitor) updateMetrics() {
141 m.metrics.LastUpdated = time.Now()
142 m.metrics.Uptime = time.Since(m.startTime).String()
143
144 // Collect system metrics
145 m.collectSystemMetrics()
146
147 // Collect application metrics
148 m.collectApplicationMetrics()
149
150 // Collect network metrics
151 m.collectNetworkMetrics()
152
153 // Collect coordinator-specific metrics
154 m.collectCoordinatorMetrics()
155
156 // Log summary metrics periodically
157 if time.Since(m.startTime).Minutes() > 1 &&
158 int(time.Since(m.startTime).Minutes()) % 5 == 0 {
159 m.logMetricsSummary()
160 }
161 }
162
163 // collectSystemMetrics gathers system-level metrics
164 func (m *Monitor) collectSystemMetrics() {
165 var memStats runtime.MemStats
166 runtime.ReadMemStats(&memStats)
167
168 m.metrics.MemoryUsage = MemoryStats{
169 Allocated: memStats.Alloc,
170 TotalAllocated: memStats.TotalAlloc,
171 SystemMemory: memStats.Sys,
172 GCCount: memStats.NumGC,
173 HeapSize: memStats.HeapSys,
174 HeapInUse: memStats.HeapInuse,
175 }
176
177 m.metrics.GoroutineCount = runtime.NumGoroutine()
178 }
179
180 // collectApplicationMetrics gathers application-level metrics
181 func (m *Monitor) collectApplicationMetrics() {
182 // These would be populated by middleware and other components
183 // For now, we'll set placeholder values
184
185 m.metrics.ResponseTimes = ResponseTimeStats{
186 Average: 25.5,
187 Min: 1.0,
188 Max: 150.0,
189 P50: 20.0,
190 P95: 75.0,
191 P99: 120.0,
192 }
193
194 m.metrics.DatabaseStats = DatabaseStats{
195 ConnectionCount: 1,
196 QueryCount: m.metrics.DatabaseStats.QueryCount + 1,
197 ErrorCount: 0,
198 AverageLatency: 2.5,
199 }
200 }
201
202 // collectNetworkMetrics gathers network-related metrics
203 func (m *Monitor) collectNetworkMetrics() {
204 // Get network status from coordinator
205 if resp, err := m.coordinator.GetNetworkStatus(context.Background()); err == nil {
206 m.metrics.NetworkStats = NetworkHealthStats{
207 ActiveNodes: int(resp.NetworkStats.ActiveNodes),
208 InactiveNodes: int(resp.NetworkStats.TotalNodes - resp.NetworkStats.ActiveNodes),
209 TotalConnections: resp.NetworkStats.TotalFiles, // Placeholder
210 FailedConnections: 0, // Would need to track this
211 }
212 }
213 }
214
215 // collectCoordinatorMetrics gathers coordinator-specific metrics
216 func (m *Monitor) collectCoordinatorMetrics() {
217 if resp, err := m.coordinator.GetNetworkStatus(context.Background()); err == nil {
218 m.metrics.CoordinatorStats = CoordinatorHealthStats{
219 RegisteredNodes: int(resp.NetworkStats.TotalNodes),
220 ActiveFiles: int(resp.NetworkStats.TotalFiles),
221 TotalChunks: int(resp.NetworkStats.TotalChunks),
222 ReplicationTasks: 0, // Would need to track this
223 LastHeartbeat: time.Now().Unix(),
224 }
225 }
226 }
227
228 // logMetricsSummary logs a summary of current metrics
229 func (m *Monitor) logMetricsSummary() {
230 logrus.WithFields(logrus.Fields{
231 "uptime": m.metrics.Uptime,
232 "memory_allocated_mb": m.metrics.MemoryUsage.Allocated / 1024 / 1024,
233 "heap_size_mb": m.metrics.MemoryUsage.HeapSize / 1024 / 1024,
234 "goroutines": m.metrics.GoroutineCount,
235 "gc_count": m.metrics.MemoryUsage.GCCount,
236 "active_nodes": m.metrics.NetworkStats.ActiveNodes,
237 "total_files": m.metrics.CoordinatorStats.ActiveFiles,
238 "total_chunks": m.metrics.CoordinatorStats.TotalChunks,
239 }).Info("Health metrics summary")
240 }
241
242 // startMetricsServer starts the HTTP server for metrics exposure
243 func (m *Monitor) startMetricsServer(ctx context.Context) {
244 gin.SetMode(gin.ReleaseMode)
245 router := gin.New()
246 router.Use(gin.Recovery())
247
248 // Metrics endpoints
249 router.GET("/metrics", m.handleMetrics)
250 router.GET("/health", m.handleHealth)
251 router.GET("/ready", m.handleReadiness)
252 router.GET("/live", m.handleLiveness)
253
254 server := &http.Server{
255 Addr: fmt.Sprintf(":%d", m.config.MetricsPort),
256 Handler: router,
257 }
258
259 // Start server in goroutine
260 go func() {
261 logrus.WithField("port", m.config.MetricsPort).Info("Starting metrics HTTP server")
262 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
263 logrus.WithError(err).Error("Metrics server failed")
264 }
265 }()
266
267 // Wait for context cancellation
268 <-ctx.Done()
269
270 // Shutdown server gracefully
271 shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
272 defer cancel()
273
274 if err := server.Shutdown(shutdownCtx); err != nil {
275 logrus.WithError(err).Error("Failed to shutdown metrics server")
276 } else {
277 logrus.Info("Metrics server stopped")
278 }
279 }
280
281 // HTTP handlers for metrics endpoints
282
283 func (m *Monitor) handleMetrics(c *gin.Context) {
284 c.JSON(http.StatusOK, m.metrics)
285 }
286
287 func (m *Monitor) handleHealth(c *gin.Context) {
288 health := m.calculateHealthStatus()
289
290 if health.Status == "healthy" {
291 c.JSON(http.StatusOK, health)
292 } else {
293 c.JSON(http.StatusServiceUnavailable, health)
294 }
295 }
296
297 func (m *Monitor) handleReadiness(c *gin.Context) {
298 readiness := m.calculateReadinessStatus()
299
300 if readiness.Ready {
301 c.JSON(http.StatusOK, readiness)
302 } else {
303 c.JSON(http.StatusServiceUnavailable, readiness)
304 }
305 }
306
307 func (m *Monitor) handleLiveness(c *gin.Context) {
308 liveness := m.calculateLivenessStatus()
309
310 if liveness.Alive {
311 c.JSON(http.StatusOK, liveness)
312 } else {
313 c.JSON(http.StatusServiceUnavailable, liveness)
314 }
315 }
316
317 // Health status calculation
318
319 type HealthStatus struct {
320 Status string `json:"status"`
321 Timestamp time.Time `json:"timestamp"`
322 Uptime string `json:"uptime"`
323 Version string `json:"version"`
324 Checks map[string]CheckResult `json:"checks"`
325 }
326
327 type CheckResult struct {
328 Status string `json:"status"`
329 Message string `json:"message,omitempty"`
330 Data interface{} `json:"data,omitempty"`
331 }
332
333 func (m *Monitor) calculateHealthStatus() HealthStatus {
334 checks := make(map[string]CheckResult)
335 overallHealthy := true
336
337 // Memory check
338 memoryHealthy := m.metrics.MemoryUsage.HeapInUse < m.metrics.MemoryUsage.HeapSize*80/100
339 checks["memory"] = CheckResult{
340 Status: statusFromBool(memoryHealthy),
341 Message: fmt.Sprintf("Heap usage: %d MB / %d MB",
342 m.metrics.MemoryUsage.HeapInUse/1024/1024,
343 m.metrics.MemoryUsage.HeapSize/1024/1024),
344 }
345 overallHealthy = overallHealthy && memoryHealthy
346
347 // Goroutine check
348 goroutineHealthy := m.metrics.GoroutineCount < 1000 // Arbitrary threshold
349 checks["goroutines"] = CheckResult{
350 Status: statusFromBool(goroutineHealthy),
351 Message: fmt.Sprintf("Active goroutines: %d", m.metrics.GoroutineCount),
352 }
353 overallHealthy = overallHealthy && goroutineHealthy
354
355 // Network check
356 networkHealthy := m.metrics.NetworkStats.ActiveNodes > 0
357 checks["network"] = CheckResult{
358 Status: statusFromBool(networkHealthy),
359 Message: fmt.Sprintf("Active nodes: %d", m.metrics.NetworkStats.ActiveNodes),
360 }
361 overallHealthy = overallHealthy && networkHealthy
362
363 return HealthStatus{
364 Status: statusFromBool(overallHealthy),
365 Timestamp: time.Now(),
366 Uptime: m.metrics.Uptime,
367 Version: "1.0.0",
368 Checks: checks,
369 }
370 }
371
372 type ReadinessStatus struct {
373 Ready bool `json:"ready"`
374 Timestamp time.Time `json:"timestamp"`
375 Checks map[string]CheckResult `json:"checks"`
376 }
377
378 func (m *Monitor) calculateReadinessStatus() ReadinessStatus {
379 checks := make(map[string]CheckResult)
380 overallReady := true
381
382 // Database readiness
383 dbReady := m.metrics.DatabaseStats.ErrorCount == 0
384 checks["database"] = CheckResult{
385 Status: statusFromBool(dbReady),
386 Message: fmt.Sprintf("Error count: %d", m.metrics.DatabaseStats.ErrorCount),
387 }
388 overallReady = overallReady && dbReady
389
390 // Coordinator readiness
391 coordReady := time.Since(m.startTime) > 10*time.Second // Grace period
392 checks["coordinator"] = CheckResult{
393 Status: statusFromBool(coordReady),
394 Message: fmt.Sprintf("Running for: %s", m.metrics.Uptime),
395 }
396 overallReady = overallReady && coordReady
397
398 return ReadinessStatus{
399 Ready: overallReady,
400 Timestamp: time.Now(),
401 Checks: checks,
402 }
403 }
404
405 type LivenessStatus struct {
406 Alive bool `json:"alive"`
407 Timestamp time.Time `json:"timestamp"`
408 LastCheck time.Time `json:"last_check"`
409 }
410
411 func (m *Monitor) calculateLivenessStatus() LivenessStatus {
412 // Simple liveness check - if we can execute this function, we're alive
413 // In a more complex system, this might check for deadlocks, etc.
414
415 alive := time.Since(m.metrics.LastUpdated) < m.config.CheckInterval*2
416
417 return LivenessStatus{
418 Alive: alive,
419 Timestamp: time.Now(),
420 LastCheck: m.metrics.LastUpdated,
421 }
422 }
423
424 // Helper functions
425
426 func statusFromBool(healthy bool) string {
427 if healthy {
428 return "healthy"
429 }
430 return "unhealthy"
431 }