From 41ceff48680bc5a9e9383eb3e7b19a4b06c86535 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Wed, 10 Sep 2025 17:41:33 +0700 Subject: [PATCH] [online] add online module --- deployments/grafana/dashboards/online.json | 781 ++++++++++++++++++ .../prometheus/alerts/online-alerts.yml | 38 + internal/sms-gateway/app.go | 2 + internal/sms-gateway/cache/factory.go | 2 + internal/sms-gateway/modules/auth/service.go | 12 +- .../sms-gateway/modules/devices/repository.go | 16 +- .../sms-gateway/modules/devices/service.go | 18 +- internal/sms-gateway/online/metrics.go | 122 +++ internal/sms-gateway/online/module.go | 36 + internal/sms-gateway/online/service.go | 128 +++ pkg/cache/memory_bench_test.go | 1 + pkg/cache/memory_concurrency_test.go | 1 + pkg/cache/memory_edge_test.go | 4 +- pkg/cache/memory_profile_test.go | 1 + 14 files changed, 1152 insertions(+), 10 deletions(-) create mode 100644 deployments/grafana/dashboards/online.json create mode 100644 deployments/prometheus/alerts/online-alerts.yml create mode 100644 internal/sms-gateway/online/metrics.go create mode 100644 internal/sms-gateway/online/module.go create mode 100644 internal/sms-gateway/online/service.go diff --git a/deployments/grafana/dashboards/online.json b/deployments/grafana/dashboards/online.json new file mode 100644 index 0000000..80e7b72 --- /dev/null +++ b/deployments/grafana/dashboards/online.json @@ -0,0 +1,781 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 19, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(instance) (sms_online_batch_size{instance=~\"$instance\"})", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Batch Size", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 10 + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "sum by(status) (rate(sms_online_status_set_total{instance=~\"$instance\"}[$__rate_interval]))", + "legendFormat": "{{status}}", + "range": true, + "refId": "A" + } + ], + "title": "Online Status Updates", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "sum by (operation, status) (rate(sms_online_cache_operations_total{instance=~\"$instance\", operation=~\"$operation\"}[$__rate_interval]))", + "legendFormat": "{{operation}} - {{status}}", + "range": true, + "refId": "A" + } + ], + "title": "Cache Operations (rate)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "fixedColor": "green", + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "dashed+area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0.3 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 6, + "options": { + "legend": { + "calcs": [ + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": false + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(sms_online_cache_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "legendFormat": "{{instance}} p95", + "range": true, + "refId": "A" + }, + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(sms_online_cache_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "hide": false, + "legendFormat": "{{instance}} p50", + "range": true, + "refId": "B" + } + ], + "title": "Cache Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "dashed+area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "orange", + "value": 0.3 + }, + { + "color": "red", + "value": 0.5 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 4, + "options": { + "legend": { + "calcs": [ + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": false + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(sms_online_persistence_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "legendFormat": "{{instance}} p95", + "range": true, + "refId": "A" + }, + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(sms_online_persistence_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "hide": false, + "legendFormat": "{{instance}} p50", + "range": true, + "refId": "B" + } + ], + "title": "Persistence Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "increase(sms_online_status_set_total{status=\"error\", instance=~\"$instance\"}[5m])", + "legendFormat": "Errors", + "range": true, + "refId": "A" + } + ], + "title": "Online Status Errors (5m increase)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "increase(sms_online_persistence_errors_total{instance=~\"$instance\"}[5m])", + "legendFormat": "Errors", + "range": true, + "refId": "A" + } + ], + "title": "Persistence Errors (5m increase)", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 41, + "tags": [ + "online", + "sms" + ], + "templating": { + "list": [ + { + "allValue": ".*", + "allowCustomValue": false, + "current": { + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "definition": "label_values(sms_online_status_set_total,instance)", + "includeAll": true, + "name": "instance", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(sms_online_status_set_total,instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "type": "query" + }, + { + "allValue": ".*", + "allowCustomValue": false, + "current": { + "text": "All", + "value": "$__all" + }, + "definition": "label_values(sms_online_cache_operations_total,operation)", + "includeAll": true, + "name": "operation", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(sms_online_cache_operations_total,operation)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "SMS Online Metrics", + "uid": "online_metrics", + "version": 10 +} \ No newline at end of file diff --git a/deployments/prometheus/alerts/online-alerts.yml b/deployments/prometheus/alerts/online-alerts.yml new file mode 100644 index 0000000..bf2cb92 --- /dev/null +++ b/deployments/prometheus/alerts/online-alerts.yml @@ -0,0 +1,38 @@ +groups: + - name: online_metrics_alerts + rules: + - alert: OnlineStatusErrors + expr: sum(increase(sms_online_status_set_total{status="error"}[5m])) > 10 + for: 5m + labels: + severity: warning + annotations: + summary: "High number of online status errors" + description: "The number of online status errors has exceeded 10 in the last 5 minutes." + + - alert: CacheOperationErrors + expr: sum by (operation) (increase(sms_online_cache_operations_total{status="error"}[5m])) > 5 + for: 5m + labels: + severity: warning + annotations: + summary: "High number of cache operation errors" + description: "Cache errors for operation={{ $labels.operation }} exceeded 5 in 5m." + + - alert: PersistenceErrors + expr: sum(increase(sms_online_persistence_errors_total[5m])) > 0 + for: 5m + labels: + severity: critical + annotations: + summary: "Persistence errors detected" + description: "Persistence errors have been detected in the online module." + + - alert: HighPersistenceLatency + expr: histogram_quantile(0.95, sum(rate(sms_online_persistence_latency_seconds_bucket[5m])) by (le)) > 0.5 + for: 5m + labels: + severity: warning + annotations: + summary: "High persistence latency" + description: "The 95th percentile persistence latency has exceeded 0.5 seconds." diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index 7bd549e..d378dda 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -19,6 +19,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/settings" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks" + "github.com/android-sms-gateway/server/internal/sms-gateway/online" "github.com/android-sms-gateway/server/internal/sms-gateway/openapi" "github.com/capcom6/go-infra-fx/cli" "github.com/capcom6/go-infra-fx/db" @@ -53,6 +54,7 @@ var Module = fx.Module( metrics.Module, cleaner.Module, sse.Module, + online.Module(), ) func Run() { diff --git a/internal/sms-gateway/cache/factory.go b/internal/sms-gateway/cache/factory.go index 657371a..2f0a536 100644 --- a/internal/sms-gateway/cache/factory.go +++ b/internal/sms-gateway/cache/factory.go @@ -8,6 +8,8 @@ import ( "github.com/android-sms-gateway/server/pkg/cache" ) +type Cache = cache.Cache + type Factory interface { New(name string) (cache.Cache, error) } diff --git a/internal/sms-gateway/modules/auth/service.go b/internal/sms-gateway/modules/auth/service.go index 0b4357e..fdbd839 100644 --- a/internal/sms-gateway/modules/auth/service.go +++ b/internal/sms-gateway/modules/auth/service.go @@ -11,6 +11,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/models" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/internal/sms-gateway/online" "github.com/android-sms-gateway/server/pkg/crypto" "github.com/capcom6/go-helpers/cache" "github.com/jaevor/go-nanoid" @@ -30,6 +31,7 @@ type Params struct { Users *repository DevicesSvc *devices.Service + OnlineSvc online.Service Logger *zap.Logger } @@ -42,6 +44,7 @@ type Service struct { usersCache *cache.Cache[models.User] devicesSvc *devices.Service + onlineSvc online.Service logger *zap.Logger @@ -55,7 +58,8 @@ func New(params Params) *Service { config: params.Config, users: params.Users, devicesSvc: params.DevicesSvc, - logger: params.Logger.Named("Service"), + onlineSvc: params.OnlineSvc, + logger: params.Logger, idgen: idgen, codesCache: cache.New[string](cache.Config{}), @@ -140,9 +144,9 @@ func (s *Service) AuthorizeDevice(token string) (models.Device, error) { } go func(id string) { - if err := s.devicesSvc.UpdateLastSeen(id); err != nil { - s.logger.Error("can't update last seen", zap.Error(err)) - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s.onlineSvc.SetOnline(ctx, id) }(device.ID) device.LastSeen = time.Now() diff --git a/internal/sms-gateway/modules/devices/repository.go b/internal/sms-gateway/modules/devices/repository.go index f56e933..07c6160 100644 --- a/internal/sms-gateway/modules/devices/repository.go +++ b/internal/sms-gateway/modules/devices/repository.go @@ -71,8 +71,20 @@ func (r *repository) UpdatePushToken(id, token string) error { return r.db.Model(&models.Device{}).Where("id", id).Update("push_token", token).Error } -func (r *repository) UpdateLastSeen(id string) error { - return r.db.Model(&models.Device{}).Where("id", id).Update("last_seen", time.Now()).Error +func (r *repository) SetLastSeen(ctx context.Context, id string, lastSeen time.Time) error { + if lastSeen.IsZero() { + return nil // ignore zero timestamps + } + res := r.db.WithContext(ctx). + Model(&models.Device{}). + Where("id = ? AND last_seen < ?", id, lastSeen). + Update("last_seen", lastSeen) + if res.Error != nil { + return res.Error + } + + // RowsAffected==0 => not found or stale timestamp; treat as no-op. + return nil } func (r *repository) Remove(filter ...SelectFilter) error { diff --git a/internal/sms-gateway/modules/devices/service.go b/internal/sms-gateway/modules/devices/service.go index 92c0f44..78c68ff 100644 --- a/internal/sms-gateway/modules/devices/service.go +++ b/internal/sms-gateway/modules/devices/service.go @@ -100,8 +100,22 @@ func (s *Service) UpdatePushToken(deviceId string, token string) error { return s.devices.UpdatePushToken(deviceId, token) } -func (s *Service) UpdateLastSeen(deviceId string) error { - return s.devices.UpdateLastSeen(deviceId) +func (s *Service) SetLastSeen(ctx context.Context, batch map[string]time.Time) error { + if len(batch) == 0 { + return nil + } + + for deviceId, lastSeen := range batch { + if ctx.Err() != nil { + break + } + + if err := s.devices.SetLastSeen(ctx, deviceId, lastSeen); err != nil { + s.logger.Error("can't set last seen", zap.String("device_id", deviceId), zap.Error(err)) + } + } + + return ctx.Err() } // Remove removes devices for a specific user that match the provided filters. diff --git a/internal/sms-gateway/online/metrics.go b/internal/sms-gateway/online/metrics.go new file mode 100644 index 0000000..cc556f2 --- /dev/null +++ b/internal/sms-gateway/online/metrics.go @@ -0,0 +1,122 @@ +package online + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metric constants +const ( + metricStatusSetTotal = "status_set_total" + metricCacheOperations = "cache_operations_total" + metricCacheLatency = "cache_latency_seconds" + metricPersistenceLatency = "persistence_latency_seconds" + metricPersistenceErrors = "persistence_errors_total" + metricBatchSize = "batch_size" + + labelOperation = "operation" + labelStatus = "status" + + operationSet = "set" + operationDrain = "drain" + + statusSuccess = "success" + statusError = "error" +) + +// metrics contains all Prometheus metrics for the online module +type metrics struct { + statusSetCounter *prometheus.CounterVec + cacheOperations *prometheus.CounterVec + cacheLatency prometheus.Histogram + persistenceLatency prometheus.Histogram + persistenceErrors prometheus.Counter + batchSize prometheus.Gauge +} + +// newMetrics creates and initializes all online metrics +func newMetrics() *metrics { + return &metrics{ + statusSetCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricStatusSetTotal, + Help: "Total number of online status updates", + }, []string{labelStatus}), + + cacheOperations: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricCacheOperations, + Help: "Total cache operations by type", + }, []string{labelOperation, labelStatus}), + + cacheLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricCacheLatency, + Help: "Cache operation latency in seconds", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }), + + persistenceLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricPersistenceLatency, + Help: "Persistence operation latency in seconds", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }), + + persistenceErrors: promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricPersistenceErrors, + Help: "Total persistence errors by type", + }), + + batchSize: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricBatchSize, + Help: "Current batch size", + }), + } +} + +// IncrementStatusSet increments the status set counter +func (m *metrics) IncrementStatusSet(success bool) { + status := statusSuccess + if !success { + status = statusError + } + m.statusSetCounter.WithLabelValues(status).Inc() +} + +// IncrementCacheOperation increments cache operation counter +func (m *metrics) IncrementCacheOperation(operation, status string) { + m.cacheOperations.WithLabelValues(operation, status).Inc() +} + +// ObserveCacheLatency observes cache operation latency +func (m *metrics) ObserveCacheLatency(f func()) { + timer := prometheus.NewTimer(m.cacheLatency) + f() + timer.ObserveDuration() +} + +// ObservePersistenceLatency observes persistence operation latency +func (m *metrics) ObservePersistenceLatency(f func()) { + timer := prometheus.NewTimer(m.persistenceLatency) + f() + timer.ObserveDuration() +} + +// IncrementPersistenceError increments persistence error counter +func (m *metrics) IncrementPersistenceError() { + m.persistenceErrors.Inc() +} + +// SetBatchSize sets the current batch size +func (m *metrics) SetBatchSize(size int) { + m.batchSize.Set(float64(size)) +} diff --git a/internal/sms-gateway/online/module.go b/internal/sms-gateway/online/module.go new file mode 100644 index 0000000..43d5fef --- /dev/null +++ b/internal/sms-gateway/online/module.go @@ -0,0 +1,36 @@ +package online + +import ( + "context" + + "github.com/android-sms-gateway/server/internal/sms-gateway/cache" + "go.uber.org/fx" + "go.uber.org/zap" +) + +func Module() fx.Option { + return fx.Module( + "online", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("online") + }), + fx.Provide(func(factory cache.Factory) (cache.Cache, error) { + return factory.New("online") + }, fx.Private), + fx.Provide(newMetrics), + fx.Provide(New), + fx.Invoke(func(lc fx.Lifecycle, svc Service) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go svc.Run(ctx) + return nil + }, + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + }), + ) +} diff --git a/internal/sms-gateway/online/service.go b/internal/sms-gateway/online/service.go new file mode 100644 index 0000000..721e179 --- /dev/null +++ b/internal/sms-gateway/online/service.go @@ -0,0 +1,128 @@ +package online + +import ( + "context" + "fmt" + "time" + + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/pkg/cache" + "github.com/capcom6/go-helpers/maps" + "go.uber.org/zap" +) + +type Service interface { + Run(ctx context.Context) + SetOnline(ctx context.Context, deviceID string) +} + +type service struct { + devicesSvc *devices.Service + + cache cache.Cache + + logger *zap.Logger + metrics *metrics +} + +func New(devicesSvc *devices.Service, cache cache.Cache, logger *zap.Logger, metrics *metrics) Service { + return &service{ + devicesSvc: devicesSvc, + + cache: cache, + + logger: logger, + metrics: metrics, + } +} + +func (s *service) Run(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.logger.Debug("Persisting online status") + if err := s.persist(ctx); err != nil { + s.logger.Error("Can't persist online status", zap.Error(err)) + } + } + } +} + +func (s *service) SetOnline(ctx context.Context, deviceID string) { + dt := time.Now().UTC().Format(time.RFC3339) + + s.logger.Debug("Setting online status", zap.String("device_id", deviceID), zap.String("last_seen", dt)) + + var err error + s.metrics.ObserveCacheLatency(func() { + if err = s.cache.Set(ctx, deviceID, dt); err != nil { + s.metrics.IncrementCacheOperation(operationSet, statusError) + s.logger.Error("Can't set online status", zap.String("device_id", deviceID), zap.Error(err)) + s.metrics.IncrementStatusSet(false) + } + }) + + if err != nil { + return + } + + s.metrics.IncrementCacheOperation(operationSet, statusSuccess) + s.logger.Debug("Online status set", zap.String("device_id", deviceID)) + s.metrics.IncrementStatusSet(true) +} + +func (s *service) persist(ctx context.Context) error { + var drainErr, persistErr error + + s.metrics.ObservePersistenceLatency(func() { + items, err := s.cache.Drain(ctx) + if err != nil { + drainErr = fmt.Errorf("can't drain cache: %w", err) + s.metrics.IncrementCacheOperation(operationDrain, statusError) + return + } + s.metrics.IncrementCacheOperation(operationDrain, statusSuccess) + s.metrics.SetBatchSize(len(items)) + + if len(items) == 0 { + s.logger.Debug("No online statuses to persist") + return + } + s.logger.Debug("Drained cache", zap.Int("count", len(items))) + + timestamps := maps.MapValues(items, func(v string) time.Time { + t, err := time.Parse(time.RFC3339, v) + if err != nil { + s.logger.Warn("Can't parse last seen", zap.String("last_seen", v), zap.Error(err)) + return time.Now().UTC() + } + + return t + }) + + s.logger.Debug("Parsed last seen timestamps", zap.Int("count", len(timestamps))) + + if err := s.devicesSvc.SetLastSeen(ctx, timestamps); err != nil { + persistErr = fmt.Errorf("can't set last seen: %w", err) + s.metrics.IncrementPersistenceError() + return + } + + s.logger.Info("Set last seen", zap.Int("count", len(timestamps))) + }) + + if drainErr != nil { + return drainErr + } + + if persistErr != nil { + return persistErr + } + + return nil +} diff --git a/pkg/cache/memory_bench_test.go b/pkg/cache/memory_bench_test.go index 70ce9c1..6f51eb5 100644 --- a/pkg/cache/memory_bench_test.go +++ b/pkg/cache/memory_bench_test.go @@ -1,3 +1,4 @@ +//nolint:errcheck package cache_test import ( diff --git a/pkg/cache/memory_concurrency_test.go b/pkg/cache/memory_concurrency_test.go index 18e3d44..c9253e9 100644 --- a/pkg/cache/memory_concurrency_test.go +++ b/pkg/cache/memory_concurrency_test.go @@ -1,3 +1,4 @@ +//nolint:errcheck package cache_test import ( diff --git a/pkg/cache/memory_edge_test.go b/pkg/cache/memory_edge_test.go index 516e84e..1134f6a 100644 --- a/pkg/cache/memory_edge_test.go +++ b/pkg/cache/memory_edge_test.go @@ -66,12 +66,12 @@ func TestMemoryCache_NilContext(t *testing.T) { key := "nil-context-key" value := "nil-context-value" - err := cache.Set(nil, key, value) + err := cache.Set(nil, key, value) //nolint:staticcheck if err != nil { t.Fatalf("Set with nil context failed: %v", err) } - retrieved, err := cache.Get(nil, key) + retrieved, err := cache.Get(nil, key) //nolint:staticcheck if err != nil { t.Fatalf("Get with nil context failed: %v", err) } diff --git a/pkg/cache/memory_profile_test.go b/pkg/cache/memory_profile_test.go index 5236d96..5214ac4 100644 --- a/pkg/cache/memory_profile_test.go +++ b/pkg/cache/memory_profile_test.go @@ -1,3 +1,4 @@ +//nolint:errcheck package cache_test import (