diff --git a/deployments/grafana/dashboards/events.json b/deployments/grafana/dashboards/events.json index 55e0a8b..d329a46 100644 --- a/deployments/grafana/dashboards/events.json +++ b/deployments/grafana/dashboards/events.json @@ -318,7 +318,7 @@ "targets": [ { "editorMode": "code", - "expr": "100 * (sum(sms_events_sent_total) / (sum(sms_events_sent_total) + sum(sms_events_failed_total)))", + "expr": "100 * (sum(rate(sms_events_sent_total[5m])) / (sum(rate(sms_events_sent_total[5m])) + sum(rate(sms_events_failed_total[5m]))))", "format": "table", "instant": true, "refId": "A" diff --git a/deployments/grafana/dashboards/sse.json b/deployments/grafana/dashboards/sse.json new file mode 100644 index 0000000..d843380 --- /dev/null +++ b/deployments/grafana/dashboards/sse.json @@ -0,0 +1,531 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 13, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 50 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto", + "text": {} + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "datasource": { + "uid": "Prometheus" + }, + "expr": "sms_sse_active_connections", + "interval": "", + "legendFormat": "Active Connections", + "refId": "A" + } + ], + "title": "Active Connections", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Events per Second", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "vis": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "datasource": { + "uid": "Prometheus" + }, + "expr": "rate(sms_sse_events_sent_total[5m])", + "interval": "", + "legendFormat": "{{event_type}}", + "refId": "A" + } + ], + "title": "Events Sent by Type", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Errors per Second", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "vis": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "datasource": { + "uid": "Prometheus" + }, + "expr": "rate(sms_sse_connection_errors_total[5m])", + "interval": "", + "legendFormat": "{{error_type}}", + "refId": "A" + } + ], + "title": "Connection Errors by Type", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Latency (seconds)", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "vis": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "datasource": { + "uid": "Prometheus" + }, + "editorMode": "code", + "exemplar": false, + "expr": "histogram_quantile(0.95, rate(sms_sse_event_delivery_latency_seconds_bucket[5m]))", + "format": "time_series", + "instant": false, + "interval": "", + "legendFormat": "95th percentile", + "range": true, + "refId": "A" + }, + { + "datasource": { + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(sms_sse_event_delivery_latency_seconds_bucket[5m]))", + "interval": "", + "legendFormat": "50th percentile", + "range": true, + "refId": "B" + }, + { + "datasource": { + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(sms_sse_event_delivery_latency_seconds_bucket[5m]))", + "interval": "", + "legendFormat": "99th percentile", + "range": true, + "refId": "C" + } + ], + "title": "Event Delivery Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Keepalives per Second", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "vis": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 16 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "datasource": { + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "irate(sms_sse_keepalives_sent_total[5m])", + "interval": "", + "legendFormat": "Keepalives Sent", + "range": true, + "refId": "A" + } + ], + "title": "Keepalives Sent", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 41, + "tags": [ + "sse", + "push" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "SSE Metrics", + "uid": "sse-metrics", + "version": 6 +} \ No newline at end of file diff --git a/deployments/prometheus/alerts/sse-alerts.yml b/deployments/prometheus/alerts/sse-alerts.yml new file mode 100644 index 0000000..d2144e0 --- /dev/null +++ b/deployments/prometheus/alerts/sse-alerts.yml @@ -0,0 +1,48 @@ +groups: + - name: sse-alerts + rules: + - alert: HighSSEErrorRate + expr: | + (sum(rate(sms_sse_connection_errors_total[5m])) / sum(rate(sms_sse_events_sent_total[5m]))) > 0.05 + for: 5m + labels: + severity: warning + annotations: + summary: "High SSE error rate ({{ $value | humanize }}%)" + description: "SSE error rate has exceeded 5% for 5 minutes. This may indicate client connectivity issues or server-side processing problems." + dashboard: "https://grafana.example.com/d/sse-dashboard" + runbook: "https://internal.dev-docs/server/sse-troubleshooting#high-error-rate" + + - alert: SSEConnectionLoss + expr: sms_sse_active_connections == 0 + for: 5m + labels: + severity: critical + annotations: + summary: "All SSE connections lost" + description: "No active SSE connections detected for 5 minutes. This indicates complete service disruption for real-time updates." + dashboard: "https://grafana.example.com/d/sse-dashboard" + runbook: "https://internal.dev-docs/server/sse-troubleshooting#connection-loss" + + - alert: HighSSELatency + expr: | + histogram_quantile(0.95, rate(sms_sse_event_delivery_latency_seconds_bucket[5m])) > 10 + for: 5m + labels: + severity: critical + annotations: + summary: "High SSE event latency (95th percentile > 10s)" + description: "95th percentile of event delivery latency has exceeded 10 seconds for 5 minutes. Clients may experience significant delays in receiving real-time updates." + dashboard: "https://grafana.example.com/d/sse-dashboard" + runbook: "https://internal.dev-docs/server/sse-troubleshooting#high-latency" + + - alert: HighConnectionChurn + expr: rate(sms_sse_connections_opened_total[5m]) > 10 + for: 5m + labels: + severity: warning + annotations: + summary: "High SSE connection churn ({{ $value | humanize }} connections/sec)" + description: "SSE connection churn rate has exceeded 10 connections per second for 5 minutes. This may indicate unstable client connections or aggressive reconnection logic." + dashboard: "https://grafana.example.com/d/sse-dashboard" + runbook: "https://internal.dev-docs/server/sse-troubleshooting#connection-churn" diff --git a/internal/sms-gateway/modules/sse/metrics.go b/internal/sms-gateway/modules/sse/metrics.go new file mode 100644 index 0000000..d939e8a --- /dev/null +++ b/internal/sms-gateway/modules/sse/metrics.go @@ -0,0 +1,97 @@ +package sse + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metric constants +const ( + MetricActiveConnections = "active_connections" + MetricEventsSent = "events_sent_total" + MetricConnectionErrors = "connection_errors_total" + MetricEventLatency = "event_delivery_latency_seconds" + MetricKeepalivesSent = "keepalives_sent_total" + + LabelEventType = "event_type" + LabelErrorType = "error_type" + + ErrorTypeBufferFull = "buffer_full" + ErrorTypeNoConnection = "no_connection" + ErrorTypeWriteFailure = "write_failure" + ErrorTypeMarshalError = "marshal_error" +) + +// metrics contains all Prometheus metrics for the SSE module +type metrics struct { + activeConnections *prometheus.GaugeVec + eventsSent *prometheus.CounterVec + connectionErrors *prometheus.CounterVec + eventDeliveryLatency *prometheus.HistogramVec + keepalivesSent *prometheus.CounterVec +} + +// newMetrics creates and initializes all SSE metrics +func newMetrics() *metrics { + metrics := &metrics{ + activeConnections: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "sms", + Subsystem: "sse", + Name: MetricActiveConnections, + Help: "Current number of active SSE connections", + }, []string{}), + eventsSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "sse", + Name: MetricEventsSent, + Help: "Total number of SSE events sent, labeled by event type", + }, []string{LabelEventType}), + connectionErrors: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "sse", + Name: MetricConnectionErrors, + Help: "Total number of SSE connection errors, labeled by error type", + }, []string{LabelErrorType}), + eventDeliveryLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "sms", + Subsystem: "sse", + Name: MetricEventLatency, + Help: "Event delivery latency in seconds", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }, []string{}), + keepalivesSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "sse", + Name: MetricKeepalivesSent, + Help: "Total keepalive messages sent", + }, []string{}), + } + + return metrics +} + +func (m *metrics) IncrementActiveConnections() { + m.activeConnections.WithLabelValues().Inc() +} + +func (m *metrics) DecrementActiveConnections() { + m.activeConnections.WithLabelValues().Dec() +} + +func (m *metrics) IncrementEventsSent(eventType string) { + m.eventsSent.WithLabelValues(eventType).Inc() +} + +func (m *metrics) IncrementConnectionErrors(errorType string) { + m.connectionErrors.WithLabelValues(errorType).Inc() +} + +func (m *metrics) ObserveEventDeliveryLatency(f func()) { + timer := prometheus.NewTimer(m.eventDeliveryLatency.WithLabelValues()) + f() + timer.ObserveDuration() +} + +func (m *metrics) IncrementKeepalivesSent() { + m.keepalivesSent.WithLabelValues().Inc() +} diff --git a/internal/sms-gateway/modules/sse/module.go b/internal/sms-gateway/modules/sse/module.go index 48a035f..fb18d52 100644 --- a/internal/sms-gateway/modules/sse/module.go +++ b/internal/sms-gateway/modules/sse/module.go @@ -12,7 +12,13 @@ var Module = fx.Module( fx.Decorate(func(log *zap.Logger) *zap.Logger { return log.Named("sse") }), - fx.Provide(NewService), + fx.Provide( + newMetrics, + fx.Private, + ), + fx.Provide( + NewService, + ), fx.Invoke(func(lc fx.Lifecycle, svc *Service) { lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { diff --git a/internal/sms-gateway/modules/sse/service.go b/internal/sms-gateway/modules/sse/service.go index 4c58574..e5cfe5e 100644 --- a/internal/sms-gateway/modules/sse/service.go +++ b/internal/sms-gateway/modules/sse/service.go @@ -20,7 +20,8 @@ type Service struct { mu sync.RWMutex connections map[string][]*sseConnection - logger *zap.Logger + logger *zap.Logger + metrics *metrics } type sseConnection struct { @@ -34,13 +35,14 @@ type eventWrapper struct { data []byte } -func NewService(config Config, logger *zap.Logger) *Service { +func NewService(config Config, logger *zap.Logger, metrics *metrics) *Service { return &Service{ config: config, connections: make(map[string][]*sseConnection), - logger: logger, + logger: logger, + metrics: metrics, } } @@ -50,11 +52,15 @@ func (s *Service) Send(deviceID string, event Event) error { connections, exists := s.connections[deviceID] if !exists { + // Increment connection errors metric for no connection + s.metrics.IncrementConnectionErrors(ErrorTypeNoConnection) return fmt.Errorf("no connection for device %s", deviceID) } data, err := json.Marshal(event.Data) if err != nil { + // Increment connection errors metric for marshaling error + s.metrics.IncrementConnectionErrors(ErrorTypeMarshalError) return fmt.Errorf("can't marshal event: %w", err) } @@ -68,13 +74,20 @@ func (s *Service) Send(deviceID string, event Event) error { s.logger.Warn("Connection closed while sending event", zap.String("device_id", deviceID), zap.String("connection_id", conn.id)) default: s.logger.Warn("Connection buffer full while sending event", zap.String("device_id", deviceID), zap.String("connection_id", conn.id)) + // Increment connection errors metric for buffer full + s.metrics.IncrementConnectionErrors(ErrorTypeBufferFull) } } if sent == 0 { + // Increment connection errors metric for no active connection + s.metrics.IncrementConnectionErrors(ErrorTypeNoConnection) return fmt.Errorf("no active connection for device %s", deviceID) } + // Count events sent + s.metrics.IncrementEventsSent(string(event.Type)) + return nil } @@ -111,13 +124,15 @@ func (s *Service) Handler(deviceID string, c *fiber.Ctx) error { for { select { case event := <-conn.channel: - if err := s.writeToStream(w, fmt.Sprintf("event: %s\ndata: %s", event.name, utils.UnsafeString(event.data))); err != nil { - s.logger.Warn("Failed to write event data", - zap.String("device_id", deviceID), - zap.String("connection_id", conn.id), - zap.Error(err)) - return - } + s.metrics.ObserveEventDeliveryLatency(func() { + if err := s.writeToStream(w, fmt.Sprintf("event: %s\ndata: %s", event.name, utils.UnsafeString(event.data))); err != nil { + s.logger.Warn("Failed to write event data", + zap.String("device_id", deviceID), + zap.String("connection_id", conn.id), + zap.Error(err)) + return + } + }) // Conditionally handle ticker events case <-func() <-chan time.Time { if ticker != nil { @@ -133,6 +148,8 @@ func (s *Service) Handler(deviceID string, c *fiber.Ctx) error { zap.Error(err)) return } + // Count keepalives sent + s.metrics.IncrementKeepalivesSent() case <-conn.closeSignal: return } @@ -144,6 +161,7 @@ func (s *Service) Handler(deviceID string, c *fiber.Ctx) error { func (s *Service) writeToStream(w *bufio.Writer, data string) error { if _, err := fmt.Fprintf(w, "%s\n\n", data); err != nil { + s.metrics.IncrementConnectionErrors(ErrorTypeWriteFailure) return err } return w.Flush() @@ -167,6 +185,9 @@ func (s *Service) registerConnection(deviceID string) *sseConnection { s.connections[deviceID] = append(s.connections[deviceID], conn) + // Increment active connections metric + s.metrics.IncrementActiveConnections() + s.logger.Info("Registering SSE connection", zap.String("device_id", deviceID), zap.String("connection_id", connID)) return conn @@ -186,6 +207,9 @@ func (s *Service) removeConnection(deviceID, connID string) { } } + // Decrement active connections metric + s.metrics.DecrementActiveConnections() + if len(s.connections[deviceID]) == 0 { delete(s.connections, deviceID) }