diff --git a/.github/workflows/close-issues.yml b/.github/workflows/close-issues.yml index f3ce2cd..1b3bb5b 100644 --- a/.github/workflows/close-issues.yml +++ b/.github/workflows/close-issues.yml @@ -1,4 +1,4 @@ -name: Close inactive issues +name: Close inactive issues and PRs on: schedule: - cron: "30 1 * * *" @@ -8,14 +8,18 @@ jobs: runs-on: ubuntu-latest permissions: issues: write + pull-requests: write steps: - - uses: actions/stale@v9 + - uses: actions/stale@v10 with: - days-before-issue-stale: 7 - days-before-issue-close: 7 - days-before-pr-close: -1 - days-before-pr-stale: -1 + days-before-stale: 7 + days-before-close: 7 + exempt-all-assignees: true + stale-issue-message: "This issue is stale because it has been open for 7 days with no activity." close-issue-message: "This issue was closed because it has been inactive for 7 days since being marked as stale." stale-issue-label: "stale" - exempt-all-assignees: true + + stale-pr-message: "This PR is stale because it has been open for 7 days with no activity." + close-pr-message: "This PR was closed because it has been inactive for 7 days since being marked as stale." + stale-pr-label: "stale" diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 7f663ed..9380682 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -7,28 +7,6 @@ permissions: contents: read jobs: - test: - name: Test - runs-on: ubuntu-latest - steps: - # step 1: checkout repository code - - name: Checkout code into workspace directory - uses: actions/checkout@v4 - - # step 2: set up go - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: stable - - # step 3: install dependencies - - name: Install all Go dependencies - run: go mod download - - # step 4: run test - - name: Run coverage - run: go test -race -coverprofile=coverage.out -covermode=atomic ./... - e2e: name: E2E runs-on: ubuntu-latest @@ -65,7 +43,6 @@ jobs: contents: read packages: write needs: - - test - e2e if: github.actor != 'dependabot[bot]' uses: ./.github/workflows/docker-build.yml diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..18fb253 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,118 @@ +name: Go + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [master] + paths: + - "**.go" + - "go.mod" + - "go.sum" + pull_request: + branches: [master] + paths: + - "**.go" + - "go.mod" + - "go.sum" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + golangci: + name: Lint + runs-on: ubuntu-latest + steps: + # step 1: checkout repository code + - name: Checkout code into workspace directory + uses: actions/checkout@v4 + + # step 2: set up go + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: stable + + # step 3: run golangci-lint + - name: Run golangci-lint + uses: golangci/golangci-lint-action@v8 + with: + version: latest + args: --timeout=5m + + test: + name: Test + runs-on: ubuntu-latest + steps: + # step 1: checkout repository code + - name: Checkout code into workspace directory + uses: actions/checkout@v4 + + # step 2: set up go + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: stable + + # step 3: install dependencies + - name: Install all Go dependencies + run: go mod download + + # step 4: run test + - name: Run coverage + run: go test -race -shuffle=on -covermode=atomic -coverpkg=./... -coverprofile=coverage.out ./... + + # step 5: upload coverage + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + + benchmark: + name: Benchmark + runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write + steps: + # step 1: checkout repository code + - name: Checkout code into workspace directory + uses: actions/checkout@v4 + + # step 2: set up go + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: stable + + # step 3: install dependencies + - name: Install all Go dependencies + run: go mod download + + # step 4: run benchmark + - name: Run benchmarks + run: go test -bench=. -benchmem ./... | tee benchmark.txt + + # step 5: download previous benchmark result from cache (if exists) + - name: Download previous benchmark data + uses: actions/cache@v4 + with: + path: ./cache + key: ${{ runner.os }}-benchmark + + # step 6: upload benchmark + - name: Upload benchmark results + uses: benchmark-action/github-action-benchmark@v1 + with: + # What benchmark tool the benchmark.txt came from + tool: "go" + # Where the output from the benchmark tool is stored + output-file-path: benchmark.txt + # Where the previous data file is stored + external-data-json-path: ./cache/benchmark-data.json + # Workflow will fail when an alert happens + fail-on-alert: true diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml deleted file mode 100644 index d289adb..0000000 --- a/.github/workflows/golangci-lint.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: golangci-lint -on: - pull_request: - -permissions: - contents: read - # Optional: allow read access to pull request. Use with `only-new-issues` option. - # pull-requests: read - -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-go@v5 - with: - go-version: stable - cache: false - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - # Require: The version of golangci-lint to use. - # When `install-mode` is `binary` (default) the value can be v1.2 or v1.2.3 or `latest` to use the latest version. - # When `install-mode` is `goinstall` the value can be v1.2.3, `latest`, or the hash of a commit. - version: latest - - # Optional: working directory, useful for monorepos - # working-directory: somedir - - # Optional: golangci-lint command line arguments. - # - # Note: By default, the `.golangci.yml` file should be at the root of the repository. - # The location of the configuration file can be changed by using `--config=` - # args: --timeout=30m --config=/my/path/.golangci.yml --issues-exit-code=0 - args: --timeout=5m - - # Optional: show only new issues if it's a pull request. The default value is `false`. - # only-new-issues: true - - # Optional: if set to true, then all caching functionality will be completely disabled, - # takes precedence over all other caching options. - # skip-cache: true - - # Optional: if set to true, then the action won't cache or restore ~/go/pkg. - # skip-pkg-cache: true - - # Optional: if set to true, then the action won't cache or restore ~/.cache/go-build. - # skip-build-cache: true - - # Optional: The mode to install golangci-lint. It can be 'binary' or 'goinstall'. - # install-mode: "goinstall" diff --git a/build/package/Dockerfile b/build/package/Dockerfile index 2cc5d4d..f516510 100644 --- a/build/package/Dockerfile +++ b/build/package/Dockerfile @@ -1,5 +1,5 @@ # Building the binary of the App -FROM golang:1.23-alpine AS build +FROM golang:1.24-alpine AS build ARG APP ARG APP_VERSION=1.0.0 diff --git a/build/package/Dockerfile.dev b/build/package/Dockerfile.dev index 9e0eebb..e56d9c0 100644 --- a/build/package/Dockerfile.dev +++ b/build/package/Dockerfile.dev @@ -1,4 +1,4 @@ -FROM golang:1.23-alpine +FROM golang:1.24-alpine ENV GO111MODULE="on" ENV GOOS="linux" diff --git a/configs/config.example.yml b/configs/config.example.yml index 0402413..f7ae528 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -24,6 +24,8 @@ fcm: # firebase cloud messaging config credentials_json: "{}" # firebase credentials json (for public mode only) [FCM__CREDENTIALS_JSON] timeout_seconds: 1 # push notification send timeout [FCM__TIMEOUT_SECONDS] debounce_seconds: 5 # push notification debounce (>= 5s) [FCM__DEBOUNCE_SECONDS] +cache: # cache config + url: memory:// # cache url (memory:// or redis://) [CACHE__URL] tasks: # tasks config hashing: # hashing task (hashes processed messages for privacy purposes) interval_seconds: 15 # hashing interval in seconds [TASKS__HASHING__INTERVAL_SECONDS] diff --git a/deployments/grafana/dashboards/online.json b/deployments/grafana/dashboards/online.json new file mode 100644 index 0000000..80e7b72 --- /dev/null +++ b/deployments/grafana/dashboards/online.json @@ -0,0 +1,781 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 19, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "disableTextWrap": false, + "editorMode": "builder", + "expr": "sum by(instance) (sms_online_batch_size{instance=~\"$instance\"})", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Batch Size", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 10 + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "sum by(status) (rate(sms_online_status_set_total{instance=~\"$instance\"}[$__rate_interval]))", + "legendFormat": "{{status}}", + "range": true, + "refId": "A" + } + ], + "title": "Online Status Updates", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "sum by (operation, status) (rate(sms_online_cache_operations_total{instance=~\"$instance\", operation=~\"$operation\"}[$__rate_interval]))", + "legendFormat": "{{operation}} - {{status}}", + "range": true, + "refId": "A" + } + ], + "title": "Cache Operations (rate)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "fixedColor": "green", + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "dashed+area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0.3 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 6, + "options": { + "legend": { + "calcs": [ + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": false + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(sms_online_cache_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "legendFormat": "{{instance}} p95", + "range": true, + "refId": "A" + }, + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(sms_online_cache_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "hide": false, + "legendFormat": "{{instance}} p50", + "range": true, + "refId": "B" + } + ], + "title": "Cache Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "dashed+area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "orange", + "value": 0.3 + }, + { + "color": "red", + "value": 0.5 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 4, + "options": { + "legend": { + "calcs": [ + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": false + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(sms_online_persistence_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "legendFormat": "{{instance}} p95", + "range": true, + "refId": "A" + }, + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(sms_online_persistence_latency_seconds_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, instance))", + "hide": false, + "legendFormat": "{{instance}} p50", + "range": true, + "refId": "B" + } + ], + "title": "Persistence Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "increase(sms_online_status_set_total{status=\"error\", instance=~\"$instance\"}[5m])", + "legendFormat": "Errors", + "range": true, + "refId": "A" + } + ], + "title": "Online Status Errors (5m increase)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 0 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.1.1", + "targets": [ + { + "datasource": "Prometheus", + "editorMode": "code", + "expr": "increase(sms_online_persistence_errors_total{instance=~\"$instance\"}[5m])", + "legendFormat": "Errors", + "range": true, + "refId": "A" + } + ], + "title": "Persistence Errors (5m increase)", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 41, + "tags": [ + "online", + "sms" + ], + "templating": { + "list": [ + { + "allValue": ".*", + "allowCustomValue": false, + "current": { + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "definition": "label_values(sms_online_status_set_total,instance)", + "includeAll": true, + "name": "instance", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(sms_online_status_set_total,instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "type": "query" + }, + { + "allValue": ".*", + "allowCustomValue": false, + "current": { + "text": "All", + "value": "$__all" + }, + "definition": "label_values(sms_online_cache_operations_total,operation)", + "includeAll": true, + "name": "operation", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(sms_online_cache_operations_total,operation)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "SMS Online Metrics", + "uid": "online_metrics", + "version": 10 +} \ No newline at end of file diff --git a/deployments/prometheus/alerts/online-alerts.yml b/deployments/prometheus/alerts/online-alerts.yml new file mode 100644 index 0000000..bf2cb92 --- /dev/null +++ b/deployments/prometheus/alerts/online-alerts.yml @@ -0,0 +1,38 @@ +groups: + - name: online_metrics_alerts + rules: + - alert: OnlineStatusErrors + expr: sum(increase(sms_online_status_set_total{status="error"}[5m])) > 10 + for: 5m + labels: + severity: warning + annotations: + summary: "High number of online status errors" + description: "The number of online status errors has exceeded 10 in the last 5 minutes." + + - alert: CacheOperationErrors + expr: sum by (operation) (increase(sms_online_cache_operations_total{status="error"}[5m])) > 5 + for: 5m + labels: + severity: warning + annotations: + summary: "High number of cache operation errors" + description: "Cache errors for operation={{ $labels.operation }} exceeded 5 in 5m." + + - alert: PersistenceErrors + expr: sum(increase(sms_online_persistence_errors_total[5m])) > 0 + for: 5m + labels: + severity: critical + annotations: + summary: "Persistence errors detected" + description: "Persistence errors have been detected in the online module." + + - alert: HighPersistenceLatency + expr: histogram_quantile(0.95, sum(rate(sms_online_persistence_latency_seconds_bucket[5m])) by (le)) > 0.5 + for: 5m + labels: + severity: warning + annotations: + summary: "High persistence latency" + description: "The 95th percentile persistence latency has exceeded 0.5 seconds." diff --git a/go.mod b/go.mod index fb04753..59112e7 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,16 @@ module github.com/android-sms-gateway/server -go 1.23.0 - -toolchain go1.23.2 +go 1.24.1 require ( firebase.google.com/go/v4 v4.12.1 github.com/android-sms-gateway/client-go v1.9.5 + github.com/android-sms-gateway/core v1.0.1 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/capcom6/go-helpers v0.3.0 github.com/capcom6/go-infra-fx v0.4.0 github.com/go-playground/assert/v2 v2.2.0 - github.com/go-playground/validator/v10 v10.16.0 + github.com/go-playground/validator/v10 v10.26.0 github.com/go-sql-driver/mysql v1.7.1 github.com/gofiber/fiber/v2 v2.52.9 github.com/gofiber/swagger v1.1.1 @@ -19,9 +18,10 @@ require ( github.com/jaevor/go-nanoid v1.3.0 github.com/nyaruka/phonenumbers v1.4.0 github.com/prometheus/client_golang v1.19.1 + github.com/redis/go-redis/v9 v9.9.0 github.com/swaggo/swag v1.16.6 - go.uber.org/fx v1.20.1 - go.uber.org/zap v1.26.0 + go.uber.org/fx v1.24.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.37.0 golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d google.golang.org/api v0.148.0 @@ -42,7 +42,8 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.6 // indirect github.com/go-openapi/spec v0.20.4 // indirect @@ -50,7 +51,7 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect - github.com/gofiber/contrib/fiberzap/v2 v2.1.2 // indirect + github.com/gofiber/contrib/fiberzap/v2 v2.1.6 // indirect github.com/golang-jwt/jwt/v4 v4.5.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -67,7 +68,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/klauspost/compress v1.17.9 // indirect - github.com/leodido/go-urn v1.2.4 // indirect + github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -84,11 +85,10 @@ require ( github.com/swaggo/files/v2 v2.0.2 // indirect github.com/tinylib/msgp v1.2.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.55.0 // indirect + github.com/valyala/fasthttp v1.56.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect - go.uber.org/atomic v1.11.0 // indirect - go.uber.org/dig v1.17.1 // indirect + go.uber.org/dig v1.19.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.39.0 // indirect diff --git a/go.sum b/go.sum index a0a9829..2738cf6 100644 --- a/go.sum +++ b/go.sum @@ -32,37 +32,23 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= -github.com/android-sms-gateway/client-go v1.9.3 h1:Ur2zS5P76UUTQVKJVqPfTdvY7oYN/JBLh8ZEGViRL3o= -github.com/android-sms-gateway/client-go v1.9.3/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4= -github.com/android-sms-gateway/client-go v1.9.5-0.20250823005321-9f448350966b h1:50u5sKrJlT4Ah2Ma0NqO+hsNmHwCOZyn0LpUzVfomaE= -github.com/android-sms-gateway/client-go v1.9.5-0.20250823005321-9f448350966b/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4= github.com/android-sms-gateway/client-go v1.9.5 h1:fHrE1Pi3rKUdPVMmI9evKW0iyjB5bMIhFRxyq1wVQ+o= github.com/android-sms-gateway/client-go v1.9.5/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4= +github.com/android-sms-gateway/core v1.0.1 h1:7QyqyW3UQSQmEXQuUgXjZwHSnOd65DTxHUyhXQi6gpc= +github.com/android-sms-gateway/core v1.0.1/go.mod h1:HXczGDCKxTeuiwadPElczCx/y3Y6Wamc5kl5nFp5rVM= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/capcom6/go-helpers v0.3.0 h1:ae18fLfluoPubiB2V+j4cIpfZaTuK4acS2entamaDkE= github.com/capcom6/go-helpers v0.3.0/go.mod h1:WDqc7HZNqHxUTisArkYIBZtqUfJBVyPWeQI+FMwEzAw= -github.com/capcom6/go-infra-fx v0.2.3 h1:ZSlBfz8qRaNVMtTBtJ4fLN89472CNimpJwy3kfBgGf8= -github.com/capcom6/go-infra-fx v0.2.3/go.mod h1:KHApbB6bwF7WQNIXW6ZdC4YG+d+ciwxvsnRpbOJa/Ys= -github.com/capcom6/go-infra-fx v0.2.5-0.20250828235240-ca43b2b295cd h1:2DXDbc0rCcIlcBNlf6Hz5wMzSVraLfZBr3zROJPdNWA= -github.com/capcom6/go-infra-fx v0.2.5-0.20250828235240-ca43b2b295cd/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829024248-89ef05730dad h1:wKMDHNDCCxv7Y3rAeBSvmDfJd01/cO2bMIcGsrWTiO4= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829024248-89ef05730dad/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829074435-d2b471163a3a h1:ii/6SfX7KgsJnd55Hui7//QU4W++K6YLmEONJ0frRUs= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829074435-d2b471163a3a/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829133704-1120ad345221 h1:3mPEGqCtn/9onOx9sxQjMgnYe0ULtsO73RfWxbj2qEs= -github.com/capcom6/go-infra-fx v0.2.5-0.20250829133704-1120ad345221/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= -github.com/capcom6/go-infra-fx v0.3.0 h1:JEemwoslSj5zSdWp/OQoMV66tXVIrb6IMAPHQfLa1UI= -github.com/capcom6/go-infra-fx v0.3.0/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= -github.com/capcom6/go-infra-fx v0.3.1-0.20250904104539-6f3f2b187981 h1:t7+vENpDYt8S+21G3y4bBIDyBByPQEsZz7Nq9yQ0zoE= -github.com/capcom6/go-infra-fx v0.3.1-0.20250904104539-6f3f2b187981/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= github.com/capcom6/go-infra-fx v0.4.0 h1:ijyEO6rOzLDLB8YvmqzcaIBw46ehDdx2GGzzR1m+VIQ= github.com/capcom6/go-infra-fx v0.4.0/go.mod h1:SpfKffK04JzQ18XNikHbKg3EhOdnUmItDfBFzmE0/9g= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -78,6 +64,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docker/cli v24.0.7+incompatible h1:wa/nIwYFW7BVTGa7SWPVyyXU9lgORqUb1xfI36MSkFg= github.com/docker/cli v24.0.7+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= @@ -96,8 +84,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= -github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= @@ -118,15 +106,15 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.16.0 h1:x+plE831WK4vaKHO/jpgUGsvLKIqRRkz6M78GuJAfGE= -github.com/go-playground/validator/v10 v10.16.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-playground/validator/v10 v10.26.0 h1:SP05Nqhjcvz81uJaRfEV0YBSSSGMc/iMaVtFbr3Sw2k= +github.com/go-playground/validator/v10 v10.26.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= -github.com/gofiber/contrib/fiberzap/v2 v2.1.2 h1:7Z1BqS1sYK9e9jTwqPcWx9qQt46PI8oeswgAp6YNZC4= -github.com/gofiber/contrib/fiberzap/v2 v2.1.2/go.mod h1:ulCCQOdDYABGsOQfbndASmCsCN86hsC96iKoOTNYfy8= +github.com/gofiber/contrib/fiberzap/v2 v2.1.6 h1:8aMBaO7jAB4w9o2uGC1S3ieKPxg8vfJ7t1aipq2pudg= +github.com/gofiber/contrib/fiberzap/v2 v2.1.6/go.mod h1:sGrPV2XzRrI6aJQOmORr5rdk4vXLR630Oc/REtMmCYs= github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/gofiber/swagger v1.1.1 h1:FZVhVQQ9s1ZKLHL/O0loLh49bYB5l1HEAgxDlcTtkRA= @@ -213,8 +201,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= -github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= @@ -267,6 +255,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/redis/go-redis/v9 v9.9.0 h1:URbPQ4xVQSQhZ27WMQVmZSo3uT3pL+4IdHVcYq2nVfM= +github.com/redis/go-redis/v9 v9.9.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -291,9 +281,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/swaggo/files/v2 v2.0.2 h1:Bq4tgS/yxLB/3nwOMcul5oLEUKa877Ykgz3CJMVbQKU= github.com/swaggo/files/v2 v2.0.2/go.mod h1:TVqetIzZsO9OhHX1Am9sRf9LdrFZqoK49N37KON/jr0= github.com/swaggo/swag v1.16.6 h1:qBNcx53ZaX+M5dxVyTrgQ0PJ/ACK+NzhwcbieTt+9yI= @@ -302,8 +291,8 @@ github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= -github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= +github.com/valyala/fasthttp v1.56.0 h1:bEZdJev/6LCBlpdORfrLu/WOZXXxvrUQSiyniuaoW8U= +github.com/valyala/fasthttp v1.56.0/go.mod h1:sReBt3XZVnudxuLOx4J/fMrJVorWRiWY2koQKgABiVI= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vertica/vertica-sql-go v1.3.3 h1:fL+FKEAEy5ONmsvya2WH5T8bhkvY27y/Ik3ReR2T+Qw= @@ -327,22 +316,20 @@ go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ= go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= -go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= -go.uber.org/fx v1.20.1 h1:zVwVQGS8zYvhh9Xxcu4w1M6ESyeMzebzj2NbSayZ4Mk= -go.uber.org/fx v1.20.1/go.mod h1:iSYNbHf2y55acNCwCXKx7LbWb5WG1Bnue5RDXz1OREg= +go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= +go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= +go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= +go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/config/config.go b/internal/config/config.go index 96c21a1..05d91ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,6 +14,7 @@ type Config struct { FCM FCMConfig `yaml:"fcm"` // firebase cloud messaging config Tasks Tasks `yaml:"tasks"` // tasks config SSE SSE `yaml:"sse"` // server-sent events config + Cache Cache `yaml:"cache"` // cache (memory or redis) config } type Gateway struct { @@ -70,6 +71,10 @@ type SSE struct { KeepAlivePeriodSeconds uint16 `yaml:"keep_alive_period_seconds" envconfig:"SSE__KEEP_ALIVE_PERIOD_SECONDS"` // keep alive period in seconds, 0 for no keep alive } +type Cache struct { + URL string `yaml:"url" envconfig:"CACHE__URL"` +} + var defaultConfig = Config{ Gateway: Gateway{Mode: GatewayModePublic}, HTTP: HTTP{ @@ -95,4 +100,7 @@ var defaultConfig = Config{ SSE: SSE{ KeepAlivePeriodSeconds: 15, }, + Cache: Cache{ + URL: "memory://", + }, } diff --git a/internal/config/module.go b/internal/config/module.go index e854ac7..30ee1f6 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/android-sms-gateway/server/internal/sms-gateway/cache" "github.com/android-sms-gateway/server/internal/sms-gateway/handlers" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/auth" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" @@ -115,4 +116,9 @@ var Module = fx.Module( sse.WithKeepAlivePeriod(time.Duration(cfg.SSE.KeepAlivePeriodSeconds) * time.Second), ) }), + fx.Provide(func(cfg Config) cache.Config { + return cache.Config{ + URL: cfg.Cache.URL, + } + }), ) diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index 0c997bf..d378dda 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -5,6 +5,7 @@ import ( "sync" appconfig "github.com/android-sms-gateway/server/internal/config" + "github.com/android-sms-gateway/server/internal/sms-gateway/cache" "github.com/android-sms-gateway/server/internal/sms-gateway/handlers" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/auth" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/cleaner" @@ -18,6 +19,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/settings" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks" + "github.com/android-sms-gateway/server/internal/sms-gateway/online" "github.com/android-sms-gateway/server/internal/sms-gateway/openapi" "github.com/capcom6/go-infra-fx/cli" "github.com/capcom6/go-infra-fx/db" @@ -42,6 +44,7 @@ var Module = fx.Module( auth.Module, push.Module, db.Module, + cache.Module(), events.Module, messages.Module, health.Module, @@ -51,6 +54,7 @@ var Module = fx.Module( metrics.Module, cleaner.Module, sse.Module, + online.Module(), ) func Run() { diff --git a/internal/sms-gateway/cache/config.go b/internal/sms-gateway/cache/config.go new file mode 100644 index 0000000..fa29957 --- /dev/null +++ b/internal/sms-gateway/cache/config.go @@ -0,0 +1,6 @@ +package cache + +// Config controls the cache backend via a URL (e.g., "memory://", "redis://..."). +type Config struct { + URL string +} diff --git a/internal/sms-gateway/cache/factory.go b/internal/sms-gateway/cache/factory.go new file mode 100644 index 0000000..7689519 --- /dev/null +++ b/internal/sms-gateway/cache/factory.go @@ -0,0 +1,60 @@ +package cache + +import ( + "fmt" + "net/url" + + "github.com/android-sms-gateway/core/redis" + "github.com/android-sms-gateway/server/pkg/cache" +) + +const ( + keyPrefix = "sms-gateway:" +) + +type Cache = cache.Cache + +type Factory interface { + New(name string) (Cache, error) +} + +type factory struct { + new func(name string) (Cache, error) +} + +func NewFactory(config Config) (Factory, error) { + if config.URL == "" { + config.URL = "memory://" + } + + u, err := url.Parse(config.URL) + if err != nil { + return nil, fmt.Errorf("can't parse url: %w", err) + } + + switch u.Scheme { + case "memory": + return &factory{ + new: func(name string) (Cache, error) { + return cache.NewMemory(0), nil + }, + }, nil + case "redis": + client, err := redis.New(redis.Config{URL: config.URL}) + if err != nil { + return nil, fmt.Errorf("can't create redis client: %w", err) + } + return &factory{ + new: func(name string) (Cache, error) { + return cache.NewRedis(client, name, 0), nil + }, + }, nil + default: + return nil, fmt.Errorf("invalid scheme: %s", u.Scheme) + } +} + +// New implements Factory. +func (f *factory) New(name string) (Cache, error) { + return f.new(keyPrefix + name) +} diff --git a/internal/sms-gateway/cache/module.go b/internal/sms-gateway/cache/module.go new file mode 100644 index 0000000..cfb4daf --- /dev/null +++ b/internal/sms-gateway/cache/module.go @@ -0,0 +1,16 @@ +package cache + +import ( + "go.uber.org/fx" + "go.uber.org/zap" +) + +func Module() fx.Option { + return fx.Module( + "cache", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("cache") + }), + fx.Provide(NewFactory), + ) +} diff --git a/internal/sms-gateway/models/migrations/mysql/20250918231606_add_last_seen_index.sql b/internal/sms-gateway/models/migrations/mysql/20250918231606_add_last_seen_index.sql new file mode 100644 index 0000000..a04d310 --- /dev/null +++ b/internal/sms-gateway/models/migrations/mysql/20250918231606_add_last_seen_index.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +CREATE INDEX `idx_devices_last_seen` ON `devices`(`last_seen`); +-- +goose StatementEnd +--- +-- +goose Down +-- +goose StatementBegin +DROP INDEX `idx_devices_last_seen` ON `devices`; +-- +goose StatementEnd \ No newline at end of file diff --git a/internal/sms-gateway/models/models.go b/internal/sms-gateway/models/models.go index f99e9ab..058963e 100644 --- a/internal/sms-gateway/models/models.go +++ b/internal/sms-gateway/models/models.go @@ -28,7 +28,7 @@ type Device struct { AuthToken string `gorm:"not null;uniqueIndex;type:char(21)"` PushToken *string `gorm:"type:varchar(256)"` - LastSeen time.Time `gorm:"not null;autocreatetime:false;default:CURRENT_TIMESTAMP(3)"` + LastSeen time.Time `gorm:"not null;autocreatetime:false;default:CURRENT_TIMESTAMP(3);index:idx_devices_last_seen"` UserID string `gorm:"not null;type:varchar(32)"` diff --git a/internal/sms-gateway/modules/auth/service.go b/internal/sms-gateway/modules/auth/service.go index 0b4357e..fdbd839 100644 --- a/internal/sms-gateway/modules/auth/service.go +++ b/internal/sms-gateway/modules/auth/service.go @@ -11,6 +11,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/models" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/internal/sms-gateway/online" "github.com/android-sms-gateway/server/pkg/crypto" "github.com/capcom6/go-helpers/cache" "github.com/jaevor/go-nanoid" @@ -30,6 +31,7 @@ type Params struct { Users *repository DevicesSvc *devices.Service + OnlineSvc online.Service Logger *zap.Logger } @@ -42,6 +44,7 @@ type Service struct { usersCache *cache.Cache[models.User] devicesSvc *devices.Service + onlineSvc online.Service logger *zap.Logger @@ -55,7 +58,8 @@ func New(params Params) *Service { config: params.Config, users: params.Users, devicesSvc: params.DevicesSvc, - logger: params.Logger.Named("Service"), + onlineSvc: params.OnlineSvc, + logger: params.Logger, idgen: idgen, codesCache: cache.New[string](cache.Config{}), @@ -140,9 +144,9 @@ func (s *Service) AuthorizeDevice(token string) (models.Device, error) { } go func(id string) { - if err := s.devicesSvc.UpdateLastSeen(id); err != nil { - s.logger.Error("can't update last seen", zap.Error(err)) - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s.onlineSvc.SetOnline(ctx, id) }(device.ID) device.LastSeen = time.Now() diff --git a/internal/sms-gateway/modules/devices/repository.go b/internal/sms-gateway/modules/devices/repository.go index f56e933..8ef2739 100644 --- a/internal/sms-gateway/modules/devices/repository.go +++ b/internal/sms-gateway/modules/devices/repository.go @@ -68,11 +68,23 @@ func (r *repository) Insert(device *models.Device) error { } func (r *repository) UpdatePushToken(id, token string) error { - return r.db.Model(&models.Device{}).Where("id", id).Update("push_token", token).Error + return r.db.Model(&models.Device{}).Where("id = ?", id).Update("push_token", token).Error } -func (r *repository) UpdateLastSeen(id string) error { - return r.db.Model(&models.Device{}).Where("id", id).Update("last_seen", time.Now()).Error +func (r *repository) SetLastSeen(ctx context.Context, id string, lastSeen time.Time) error { + if lastSeen.IsZero() { + return nil // ignore zero timestamps + } + res := r.db.WithContext(ctx). + Model(&models.Device{}). + Where("id = ? AND last_seen < ?", id, lastSeen). + UpdateColumn("last_seen", lastSeen) + if res.Error != nil { + return res.Error + } + + // RowsAffected==0 => not found or stale timestamp; treat as no-op. + return nil } func (r *repository) Remove(filter ...SelectFilter) error { @@ -87,7 +99,7 @@ func (r *repository) Remove(filter ...SelectFilter) error { func (r *repository) removeUnused(ctx context.Context, since time.Time) (int64, error) { res := r.db. WithContext(ctx). - Where("updated_at < ?", since). + Where("last_seen < ?", since). Delete(&models.Device{}) return res.RowsAffected, res.Error diff --git a/internal/sms-gateway/modules/devices/service.go b/internal/sms-gateway/modules/devices/service.go index 92c0f44..f676f84 100644 --- a/internal/sms-gateway/modules/devices/service.go +++ b/internal/sms-gateway/modules/devices/service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "fmt" "time" @@ -100,8 +101,26 @@ func (s *Service) UpdatePushToken(deviceId string, token string) error { return s.devices.UpdatePushToken(deviceId, token) } -func (s *Service) UpdateLastSeen(deviceId string) error { - return s.devices.UpdateLastSeen(deviceId) +func (s *Service) SetLastSeen(ctx context.Context, batch map[string]time.Time) error { + if len(batch) == 0 { + return nil + } + + var multiErr error + for deviceID, lastSeen := range batch { + if err := ctx.Err(); err != nil { + return errors.Join(err, multiErr) + } + if err := s.devices.SetLastSeen(ctx, deviceID, lastSeen); err != nil { + multiErr = errors.Join(multiErr, fmt.Errorf("device %s: %w", deviceID, err)) + s.logger.Error("can't set last seen", + zap.String("device_id", deviceID), + zap.Time("last_seen", lastSeen), + zap.Error(err), + ) + } + } + return multiErr } // Remove removes devices for a specific user that match the provided filters. @@ -114,8 +133,15 @@ func (s *Service) Remove(userID string, filter ...SelectFilter) error { return err } - if err := s.tokensCache.Delete(device.AuthToken); err != nil { - s.logger.Error("can't invalidate token cache", zap.Error(err)) + hash := sha256.Sum256([]byte(device.AuthToken)) + cacheKey := hex.EncodeToString(hash[:]) + + if err := s.tokensCache.Delete(cacheKey); err != nil { + s.logger.Error("can't invalidate token cache", + zap.String("device_id", device.ID), + zap.String("cache_key", cacheKey), + zap.Error(err), + ) } return s.devices.Remove(filter...) diff --git a/internal/sms-gateway/online/metrics.go b/internal/sms-gateway/online/metrics.go new file mode 100644 index 0000000..cc556f2 --- /dev/null +++ b/internal/sms-gateway/online/metrics.go @@ -0,0 +1,122 @@ +package online + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metric constants +const ( + metricStatusSetTotal = "status_set_total" + metricCacheOperations = "cache_operations_total" + metricCacheLatency = "cache_latency_seconds" + metricPersistenceLatency = "persistence_latency_seconds" + metricPersistenceErrors = "persistence_errors_total" + metricBatchSize = "batch_size" + + labelOperation = "operation" + labelStatus = "status" + + operationSet = "set" + operationDrain = "drain" + + statusSuccess = "success" + statusError = "error" +) + +// metrics contains all Prometheus metrics for the online module +type metrics struct { + statusSetCounter *prometheus.CounterVec + cacheOperations *prometheus.CounterVec + cacheLatency prometheus.Histogram + persistenceLatency prometheus.Histogram + persistenceErrors prometheus.Counter + batchSize prometheus.Gauge +} + +// newMetrics creates and initializes all online metrics +func newMetrics() *metrics { + return &metrics{ + statusSetCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricStatusSetTotal, + Help: "Total number of online status updates", + }, []string{labelStatus}), + + cacheOperations: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricCacheOperations, + Help: "Total cache operations by type", + }, []string{labelOperation, labelStatus}), + + cacheLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricCacheLatency, + Help: "Cache operation latency in seconds", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }), + + persistenceLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricPersistenceLatency, + Help: "Persistence operation latency in seconds", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }), + + persistenceErrors: promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricPersistenceErrors, + Help: "Total persistence errors by type", + }), + + batchSize: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "sms", + Subsystem: "online", + Name: metricBatchSize, + Help: "Current batch size", + }), + } +} + +// IncrementStatusSet increments the status set counter +func (m *metrics) IncrementStatusSet(success bool) { + status := statusSuccess + if !success { + status = statusError + } + m.statusSetCounter.WithLabelValues(status).Inc() +} + +// IncrementCacheOperation increments cache operation counter +func (m *metrics) IncrementCacheOperation(operation, status string) { + m.cacheOperations.WithLabelValues(operation, status).Inc() +} + +// ObserveCacheLatency observes cache operation latency +func (m *metrics) ObserveCacheLatency(f func()) { + timer := prometheus.NewTimer(m.cacheLatency) + f() + timer.ObserveDuration() +} + +// ObservePersistenceLatency observes persistence operation latency +func (m *metrics) ObservePersistenceLatency(f func()) { + timer := prometheus.NewTimer(m.persistenceLatency) + f() + timer.ObserveDuration() +} + +// IncrementPersistenceError increments persistence error counter +func (m *metrics) IncrementPersistenceError() { + m.persistenceErrors.Inc() +} + +// SetBatchSize sets the current batch size +func (m *metrics) SetBatchSize(size int) { + m.batchSize.Set(float64(size)) +} diff --git a/internal/sms-gateway/online/module.go b/internal/sms-gateway/online/module.go new file mode 100644 index 0000000..43d5fef --- /dev/null +++ b/internal/sms-gateway/online/module.go @@ -0,0 +1,36 @@ +package online + +import ( + "context" + + "github.com/android-sms-gateway/server/internal/sms-gateway/cache" + "go.uber.org/fx" + "go.uber.org/zap" +) + +func Module() fx.Option { + return fx.Module( + "online", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("online") + }), + fx.Provide(func(factory cache.Factory) (cache.Cache, error) { + return factory.New("online") + }, fx.Private), + fx.Provide(newMetrics), + fx.Provide(New), + fx.Invoke(func(lc fx.Lifecycle, svc Service) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go svc.Run(ctx) + return nil + }, + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + }), + ) +} diff --git a/internal/sms-gateway/online/service.go b/internal/sms-gateway/online/service.go new file mode 100644 index 0000000..721e179 --- /dev/null +++ b/internal/sms-gateway/online/service.go @@ -0,0 +1,128 @@ +package online + +import ( + "context" + "fmt" + "time" + + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/pkg/cache" + "github.com/capcom6/go-helpers/maps" + "go.uber.org/zap" +) + +type Service interface { + Run(ctx context.Context) + SetOnline(ctx context.Context, deviceID string) +} + +type service struct { + devicesSvc *devices.Service + + cache cache.Cache + + logger *zap.Logger + metrics *metrics +} + +func New(devicesSvc *devices.Service, cache cache.Cache, logger *zap.Logger, metrics *metrics) Service { + return &service{ + devicesSvc: devicesSvc, + + cache: cache, + + logger: logger, + metrics: metrics, + } +} + +func (s *service) Run(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.logger.Debug("Persisting online status") + if err := s.persist(ctx); err != nil { + s.logger.Error("Can't persist online status", zap.Error(err)) + } + } + } +} + +func (s *service) SetOnline(ctx context.Context, deviceID string) { + dt := time.Now().UTC().Format(time.RFC3339) + + s.logger.Debug("Setting online status", zap.String("device_id", deviceID), zap.String("last_seen", dt)) + + var err error + s.metrics.ObserveCacheLatency(func() { + if err = s.cache.Set(ctx, deviceID, dt); err != nil { + s.metrics.IncrementCacheOperation(operationSet, statusError) + s.logger.Error("Can't set online status", zap.String("device_id", deviceID), zap.Error(err)) + s.metrics.IncrementStatusSet(false) + } + }) + + if err != nil { + return + } + + s.metrics.IncrementCacheOperation(operationSet, statusSuccess) + s.logger.Debug("Online status set", zap.String("device_id", deviceID)) + s.metrics.IncrementStatusSet(true) +} + +func (s *service) persist(ctx context.Context) error { + var drainErr, persistErr error + + s.metrics.ObservePersistenceLatency(func() { + items, err := s.cache.Drain(ctx) + if err != nil { + drainErr = fmt.Errorf("can't drain cache: %w", err) + s.metrics.IncrementCacheOperation(operationDrain, statusError) + return + } + s.metrics.IncrementCacheOperation(operationDrain, statusSuccess) + s.metrics.SetBatchSize(len(items)) + + if len(items) == 0 { + s.logger.Debug("No online statuses to persist") + return + } + s.logger.Debug("Drained cache", zap.Int("count", len(items))) + + timestamps := maps.MapValues(items, func(v string) time.Time { + t, err := time.Parse(time.RFC3339, v) + if err != nil { + s.logger.Warn("Can't parse last seen", zap.String("last_seen", v), zap.Error(err)) + return time.Now().UTC() + } + + return t + }) + + s.logger.Debug("Parsed last seen timestamps", zap.Int("count", len(timestamps))) + + if err := s.devicesSvc.SetLastSeen(ctx, timestamps); err != nil { + persistErr = fmt.Errorf("can't set last seen: %w", err) + s.metrics.IncrementPersistenceError() + return + } + + s.logger.Info("Set last seen", zap.Int("count", len(timestamps))) + }) + + if drainErr != nil { + return drainErr + } + + if persistErr != nil { + return persistErr + } + + return nil +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go new file mode 100644 index 0000000..db90b3d --- /dev/null +++ b/pkg/cache/cache.go @@ -0,0 +1,36 @@ +package cache + +import "context" + +type Cache interface { + // Set sets the value for the given key in the cache. + Set(ctx context.Context, key string, value string, opts ...Option) error + + // SetOrFail is like Set, but returns ErrKeyExists if the key already exists. + SetOrFail(ctx context.Context, key string, value string, opts ...Option) error + + // Get gets the value for the given key from the cache. + // + // If the key is not found, it returns ErrKeyNotFound. + // If the key has expired, it returns ErrKeyExpired. + // Otherwise, it returns the value and nil. + Get(ctx context.Context, key string) (string, error) + + // GetAndDelete is like Get, but also deletes the key from the cache. + GetAndDelete(ctx context.Context, key string) (string, error) + + // Delete removes the item associated with the given key from the cache. + // If the key does not exist, it performs no action and returns nil. + // The operation is safe for concurrent use. + Delete(ctx context.Context, key string) error + + // Cleanup removes all expired items from the cache. + // The operation is safe for concurrent use. + Cleanup(ctx context.Context) error + + // Drain returns a map of all the non-expired items in the cache. + // The returned map is a snapshot of the cache at the time of the call. + // The cache is cleared after the call. + // The operation is safe for concurrent use. + Drain(ctx context.Context) (map[string]string, error) +} diff --git a/pkg/cache/errors.go b/pkg/cache/errors.go new file mode 100644 index 0000000..4d5568f --- /dev/null +++ b/pkg/cache/errors.go @@ -0,0 +1,12 @@ +package cache + +import "errors" + +var ( + // ErrKeyNotFound indicates no value exists for the given key. + ErrKeyNotFound = errors.New("key not found") + // ErrKeyExpired indicates a value exists but has expired. + ErrKeyExpired = errors.New("key expired") + // ErrKeyExists indicates a conflicting set when the key already exists. + ErrKeyExists = errors.New("key already exists") +) diff --git a/pkg/cache/memory.go b/pkg/cache/memory.go new file mode 100644 index 0000000..6a1a18c --- /dev/null +++ b/pkg/cache/memory.go @@ -0,0 +1,170 @@ +package cache + +import ( + "context" + "sync" + "time" +) + +type memoryCache struct { + items map[string]*memoryItem + ttl time.Duration + + mux sync.RWMutex +} + +func NewMemory(ttl time.Duration) Cache { + return &memoryCache{ + items: make(map[string]*memoryItem), + ttl: ttl, + + mux: sync.RWMutex{}, + } +} + +type memoryItem struct { + value string + validUntil time.Time +} + +func newItem(value string, opts options) *memoryItem { + item := &memoryItem{ + value: value, + validUntil: opts.validUntil, + } + + return item +} + +func (i *memoryItem) isExpired(now time.Time) bool { + return !i.validUntil.IsZero() && now.After(i.validUntil) +} + +// Cleanup implements Cache. +func (m *memoryCache) Cleanup(_ context.Context) error { + m.cleanup(func() {}) + + return nil +} + +// Delete implements Cache. +func (m *memoryCache) Delete(_ context.Context, key string) error { + m.mux.Lock() + delete(m.items, key) + m.mux.Unlock() + + return nil +} + +// Drain implements Cache. +func (m *memoryCache) Drain(_ context.Context) (map[string]string, error) { + var cpy map[string]*memoryItem + + m.cleanup(func() { + cpy = m.items + m.items = make(map[string]*memoryItem) + }) + + items := make(map[string]string, len(cpy)) + for key, item := range cpy { + items[key] = item.value + } + + return items, nil +} + +// Get implements Cache. +func (m *memoryCache) Get(_ context.Context, key string) (string, error) { + return m.getValue(func() (*memoryItem, bool) { + m.mux.RLock() + item, ok := m.items[key] + m.mux.RUnlock() + + return item, ok + }) +} + +// GetAndDelete implements Cache. +func (m *memoryCache) GetAndDelete(_ context.Context, key string) (string, error) { + return m.getValue(func() (*memoryItem, bool) { + m.mux.Lock() + item, ok := m.items[key] + delete(m.items, key) + m.mux.Unlock() + + return item, ok + }) +} + +// Set implements Cache. +func (m *memoryCache) Set(_ context.Context, key string, value string, opts ...Option) error { + m.mux.Lock() + m.items[key] = m.newItem(value, opts...) + m.mux.Unlock() + + return nil +} + +// SetOrFail implements Cache. +func (m *memoryCache) SetOrFail(_ context.Context, key string, value string, opts ...Option) error { + m.mux.Lock() + defer m.mux.Unlock() + + if item, ok := m.items[key]; ok { + if !item.isExpired(time.Now()) { + return ErrKeyExists + } + } + + m.items[key] = m.newItem(value, opts...) + return nil +} + +func (m *memoryCache) newItem(value string, opts ...Option) *memoryItem { + o := options{ + validUntil: time.Time{}, + } + if m.ttl > 0 { + o.validUntil = time.Now().Add(m.ttl) + } + o.apply(opts...) + + return newItem(value, o) +} + +func (m *memoryCache) getItem(getter func() (*memoryItem, bool)) (*memoryItem, error) { + item, ok := getter() + + if !ok { + return nil, ErrKeyNotFound + } + + if item.isExpired(time.Now()) { + return nil, ErrKeyExpired + } + + return item, nil +} + +func (m *memoryCache) getValue(getter func() (*memoryItem, bool)) (string, error) { + item, err := m.getItem(getter) + if err != nil { + return "", err + } + + return item.value, nil +} + +func (m *memoryCache) cleanup(cb func()) { + t := time.Now() + + m.mux.Lock() + for key, item := range m.items { + if item.isExpired(t) { + delete(m.items, key) + } + } + + cb() + m.mux.Unlock() +} diff --git a/pkg/cache/memory_bench_test.go b/pkg/cache/memory_bench_test.go new file mode 100644 index 0000000..a8764a0 --- /dev/null +++ b/pkg/cache/memory_bench_test.go @@ -0,0 +1,470 @@ +//nolint:errcheck +package cache_test + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/android-sms-gateway/server/pkg/cache" +) + +// BenchmarkMemoryCache_Set measures the performance of Set operations +func BenchmarkMemoryCache_Set(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + value := "benchmark-value" + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Set(ctx, key, value) + } + }) +} + +// BenchmarkMemoryCache_Get measures the performance of Get operations +func BenchmarkMemoryCache_Get(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + value := "benchmark-value" + + // Pre-populate the cache + cache.Set(ctx, key, value) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Get(ctx, key) + } + }) +} + +// BenchmarkMemoryCache_SetAndGet measures the performance of Set followed by Get +func BenchmarkMemoryCache_SetAndGet(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + i++ + + cache.Set(ctx, key, value) + cache.Get(ctx, key) + } + }) +} + +// BenchmarkMemoryCache_SetOrFail measures the performance of SetOrFail operations +func BenchmarkMemoryCache_SetOrFail(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + value := "benchmark-value" + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.SetOrFail(ctx, key, value) + } + }) +} + +// BenchmarkMemoryCache_GetAndDelete measures the performance of GetAndDelete operations +func BenchmarkMemoryCache_GetAndDelete(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + i++ + + cache.Set(ctx, key, value) + cache.GetAndDelete(ctx, key) + } + }) +} + +// BenchmarkMemoryCache_Delete measures the performance of Delete operations +func BenchmarkMemoryCache_Delete(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + i++ + + cache.Set(ctx, key, value) + cache.Delete(ctx, key) + } + }) +} + +// BenchmarkMemoryCache_Cleanup measures the performance of Cleanup operations +func BenchmarkMemoryCache_Cleanup(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + // Pre-populate cache with many items + for i := 0; i < 1000; i++ { + key := "item-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + cache.Set(ctx, key, value) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Cleanup(ctx) + } + }) +} + +// BenchmarkMemoryCache_Drain measures the performance of Drain operations +func BenchmarkMemoryCache_Drain(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + // Pre-populate cache with many items + for i := 0; i < 1000; i++ { + key := "item-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + cache.Set(ctx, key, value) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Drain(ctx) + } + }) +} + +// BenchmarkMemoryCache_ConcurrentReads measures performance with different numbers of concurrent readers +func BenchmarkMemoryCache_ConcurrentReads(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + value := "benchmark-value" + + // Pre-populate the cache + cache.Set(ctx, key, value) + + benchmarks := []struct { + name string + goroutines int + }{ + {"1 Reader", 1}, + {"4 Readers", 4}, + {"16 Readers", 16}, + {"64 Readers", 64}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.SetParallelism(bm.goroutines) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Get(ctx, key) + } + }) + }) + } +} + +// BenchmarkMemoryCache_ConcurrentWrites measures performance with different numbers of concurrent writers +func BenchmarkMemoryCache_ConcurrentWrites(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + benchmarks := []struct { + name string + goroutines int + }{ + {"1 Writer", 1}, + {"4 Writers", 4}, + {"16 Writers", 16}, + {"64 Writers", 64}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.SetParallelism(bm.goroutines) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + i++ + + cache.Set(ctx, key, value) + } + }) + }) + } +} + +// BenchmarkMemoryCache_MixedWorkload measures performance with mixed read/write operations +func BenchmarkMemoryCache_MixedWorkload(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + benchmarks := []struct { + name string + readRatio float64 + goroutines int + }{ + {"Read-Heavy 90/10", 0.9, 16}, + {"Balanced 50/50", 0.5, 16}, + {"Write-Heavy 10/90", 0.1, 16}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.SetParallelism(bm.goroutines) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + i := 0 + + for pb.Next() { + if r.Float64() < bm.readRatio { + // Read operation + key := "key-" + strconv.Itoa(i%100) // Reuse keys to simulate working set + cache.Get(ctx, key) + } else { + // Write operation + key := "key-" + strconv.Itoa(i%100) + value := "value-" + strconv.Itoa(i) + i++ + + cache.Set(ctx, key, value) + } + } + }) + }) + } +} + +// BenchmarkMemoryCache_Scaling measures how performance scales with increasing load +func BenchmarkMemoryCache_Scaling(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + + benchmarks := []struct { + name string + operationsPerGoroutine int + goroutines int + }{ + {"Small Load", 10, 1}, + {"Medium Load", 100, 10}, + {"Large Load", 1000, 100}, + {"Very Large Load", 10000, 1000}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + // Pre-populate cache + for i := 0; i < bm.operationsPerGoroutine*bm.goroutines; i++ { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + cache.Set(ctx, key, value) + } + + b.SetParallelism(bm.goroutines) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + localI := 0 + for pb.Next() { + // Simulate random access + key := "key-" + strconv.Itoa(localI%(bm.operationsPerGoroutine*bm.goroutines)) + cache.Get(ctx, key) + localI++ + } + }) + }) + } +} + +// BenchmarkMemoryCache_TTLOverhead measures the performance impact of TTL operations +func BenchmarkMemoryCache_TTLOverhead(b *testing.B) { + c := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + value := "benchmark-value" + ttl := time.Hour + + benchmarks := []struct { + name string + withTTL bool + }{ + {"Without TTL", false}, + {"With TTL", true}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if bm.withTTL { + c.Set(ctx, key, value, cache.WithTTL(ttl)) + } else { + c.Set(ctx, key, value) + } + } + }) + }) + } +} + +// BenchmarkMemoryCache_LargeValues measures performance with large values +func BenchmarkMemoryCache_LargeValues(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + key := "benchmark-key" + + sizes := []struct { + name string + size int + }{ + {"1KB", 1 * 1024}, + {"10KB", 10 * 1024}, + {"100KB", 100 * 1024}, + {"1MB", 1024 * 1024}, + } + + for _, size := range sizes { + b.Run(size.name, func(b *testing.B) { + value := make([]byte, size.size) + for i := range value { + value[i] = byte(i % 256) + } + valueStr := string(value) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := cache.Set(ctx, key, valueStr); err != nil { + b.Fatal(err) + } + if _, err := cache.Get(ctx, key); err != nil { + b.Fatal(err) + } + } + }) + }) + } +} + +// BenchmarkMemoryCache_MemoryGrowth measures memory allocation patterns +func BenchmarkMemoryCache_MemoryGrowth(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + b.ReportAllocs() + + sizes := []int{100, 1000, 10000, 100000} + + for _, size := range sizes { + b.Run(fmt.Sprintf("%d_items", size), func(b *testing.B) { + b.ResetTimer() + + for b.Loop() { + // Clear cache + cache.Drain(ctx) + + // Add new items + for j := range size { + key := "key-" + strconv.Itoa(j) + value := "value-" + strconv.Itoa(j) + cache.Set(ctx, key, value) + } + } + }) + } +} + +// BenchmarkMemoryCache_RandomAccess measures performance with random key access patterns +func BenchmarkMemoryCache_RandomAccess(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + const numKeys = 1000 + + // Pre-populate cache with many keys + for i := 0; i < numKeys; i++ { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + cache.Set(ctx, key, value) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for pb.Next() { + key := "key-" + strconv.Itoa(r.Intn(numKeys)) + cache.Get(ctx, key) + } + }) +} + +// BenchmarkMemoryCache_HotKey measures performance with a frequently accessed key +func BenchmarkMemoryCache_HotKey(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + hotKey := "hot-key" + value := "hot-value" + + // Pre-populate the hot key + cache.Set(ctx, hotKey, value) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Get(ctx, hotKey) + } + }) +} + +// BenchmarkMemoryCache_ColdKey measures performance with rarely accessed keys +func BenchmarkMemoryCache_ColdKey(b *testing.B) { + cache := cache.NewMemory(0) + ctx := context.Background() + const numKeys = 10000 + + // Pre-populate cache with many keys + for i := 0; i < numKeys; i++ { + key := "key-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + cache.Set(ctx, key, value) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for pb.Next() { + key := "key-" + strconv.Itoa(r.Intn(numKeys)) + cache.Get(ctx, key) + } + }) +} diff --git a/pkg/cache/memory_concurrency_test.go b/pkg/cache/memory_concurrency_test.go new file mode 100644 index 0000000..c9253e9 --- /dev/null +++ b/pkg/cache/memory_concurrency_test.go @@ -0,0 +1,437 @@ +//nolint:errcheck +package cache_test + +import ( + "context" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/android-sms-gateway/server/pkg/cache" +) + +func TestMemoryCache_ConcurrentReads(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Set initial value + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + const numGoroutines = 100 + var wg sync.WaitGroup + + // Launch multiple concurrent reads + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Errorf("Get failed: %v", err) + return + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } + }() + } + + wg.Wait() +} + +func TestMemoryCache_ConcurrentWrites(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + const numKeys = 100 + const numGoroutines = 10 + var wg sync.WaitGroup + + // Launch multiple concurrent writes + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + for j := 0; j < numKeys/numGoroutines; j++ { + key := "key-" + strconv.Itoa(goroutineID) + "-" + strconv.Itoa(j) + value := "value-" + strconv.Itoa(goroutineID) + "-" + strconv.Itoa(j) + + err := cache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed for key %s: %v", key, err) + } + } + }(i) + } + + wg.Wait() + + // Verify all keys were set correctly + for i := 0; i < numGoroutines; i++ { + for j := 0; j < numKeys/numGoroutines; j++ { + key := "key-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + expectedValue := "value-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Errorf("Get failed for key %s: %v", key, err) + continue + } + + if retrieved != expectedValue { + t.Errorf("Expected %s, got %s for key %s", expectedValue, retrieved, key) + } + } + } +} + +func TestMemoryCache_ConcurrentReadWrite(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + const numOperations = 1000 + const numReaders = 5 + const numWriters = 2 + var wg sync.WaitGroup + var readCount, writeCount atomic.Int64 + + // Launch concurrent readers + for range numReaders { + wg.Add(1) + go func() { + defer wg.Done() + + for range numOperations / numReaders { + key := "shared-key" + _, err := c.Get(ctx, key) + if err != nil && err != cache.ErrKeyNotFound { + t.Errorf("Get failed: %v", err) + } else if err == nil { + readCount.Add(1) + } + } + }() + } + + // Launch concurrent writers + for range numWriters { + wg.Add(1) + go func() { + defer wg.Done() + + for j := range numOperations / numWriters { + key := "shared-key" + value := "value-" + strconv.Itoa(j) + + err := c.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } else { + writeCount.Add(1) + } + } + }() + } + + wg.Wait() + t.Logf("Completed %d successful reads and %d writes", readCount.Load(), writeCount.Load()) +} + +func TestMemoryCache_ConcurrentSetAndGetAndDelete(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + const numOperations = 500 + const numGoroutines = 10 + var wg sync.WaitGroup + + // Launch goroutines that perform Set, Get, and Delete operations + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + for j := 0; j < numOperations/numGoroutines; j++ { + key := "key-" + strconv.Itoa(goroutineID) + "-" + strconv.Itoa(j) + value := "value-" + strconv.Itoa(goroutineID) + "-" + strconv.Itoa(j) + + // Set + err := cache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + continue + } + + // Get + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Errorf("Get failed: %v", err) + continue + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } + + // Delete + err = cache.Delete(ctx, key) + if err != nil { + t.Errorf("Delete failed: %v", err) + } + } + }(i) + } + + wg.Wait() +} + +func TestMemoryCache_ConcurrentSetOrFail(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + const numGoroutines = 10 + const attemptsPerGoroutine = 100 + var wg sync.WaitGroup + + // Launch goroutines that try to SetOrFail the same key + key := "contentious-key" + value := "initial-value" + + var successCount atomic.Int32 + var existsCount atomic.Int32 + + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + + for range attemptsPerGoroutine { + err := c.SetOrFail(ctx, key, value) + switch err { + case nil: + successCount.Add(1) + case cache.ErrKeyExists: + existsCount.Add(1) + default: + t.Errorf("SetOrFail failed: %v", err) + } + } + }() + } + + wg.Wait() + + // Only one goroutine should succeed, all others should get ErrKeyExists + if c := successCount.Load(); c != 1 { + t.Errorf("Expected 1 successful SetOrFail, got %d", c) + } + + expectedExistsCount := (numGoroutines * attemptsPerGoroutine) - 1 + if c := int(existsCount.Load()); c != expectedExistsCount { + t.Errorf("Expected %d ErrKeyExists, got %d", expectedExistsCount, c) + } +} + +func TestMemoryCache_ConcurrentDrain(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + const numItems = 100 + const numGoroutines = 5 + var wg sync.WaitGroup + var drainResults sync.Map + + // Pre-populate cache with items + for i := range numItems { + key := "item-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + + err := c.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed for item %d: %v", i, err) + } + } + + // Launch concurrent drain operations + for i := range numGoroutines { + wg.Add(1) + go func(id int) { + defer wg.Done() + + items, err := c.Drain(ctx) + if err != nil { + t.Errorf("Drain failed: %v", err) + } + drainResults.Store(id, items) + }(i) + } + + wg.Wait() + + // Verify that items were drained (at least one goroutine should have gotten items) + totalDrained := 0 + drainResults.Range(func(key, value any) bool { + items := value.(map[string]string) + totalDrained += len(items) + return true + }) + + if totalDrained != numItems { + t.Errorf("Expected %d total items drained, got %d", numItems, totalDrained) + } + + // Cache should be empty after all drain operations + for i := range numItems { + key := "item-" + strconv.Itoa(i) + _, err := c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound for key %s after drain, got %v", key, err) + } + } +} + +func TestMemoryCache_ConcurrentCleanup(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + const numItems = 50 + const numGoroutines = 5 + var wg sync.WaitGroup + + // Pre-populate cache with items that will expire quickly + for i := range numItems { + key := "item-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + + err := c.Set(ctx, key, value, cache.WithTTL(10*time.Millisecond)) + if err != nil { + t.Fatalf("Set failed for item %d: %v", i, err) + } + } + + // Wait for items to expire before launching cleanup operations + time.Sleep(15 * time.Millisecond) + + // Launch concurrent cleanup operations + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + + err := c.Cleanup(ctx) + if err != nil { + t.Errorf("Cleanup failed: %v", err) + } + }() + } + + wg.Wait() + + // All items should be expired and removed + for i := range numItems { + key := "item-" + strconv.Itoa(i) + _, err := c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound for key %s, got %v", key, err) + } + } +} + +func TestMemoryCache_ConcurrentGetAndDelete(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + const numGoroutines = 10 + const attemptsPerGoroutine = 50 + var wg sync.WaitGroup + + // Pre-populate cache with items + for i := 0; i < numGoroutines*attemptsPerGoroutine; i++ { + key := "item-" + strconv.Itoa(i) + value := "value-" + strconv.Itoa(i) + + err := c.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed for item %d: %v", i, err) + } + } + + // Launch goroutines that perform GetAndDelete operations + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + for j := 0; j < attemptsPerGoroutine; j++ { + key := "item-" + strconv.Itoa(goroutineID*attemptsPerGoroutine+j) + + _, err := c.GetAndDelete(ctx, key) + if err != nil && err != cache.ErrKeyNotFound { + t.Errorf("GetAndDelete failed: %v", err) + } + } + }(i) + } + + wg.Wait() + + // All items should be deleted + for i := 0; i < numGoroutines*attemptsPerGoroutine; i++ { + key := "item-" + strconv.Itoa(i) + _, err := c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound for key %s after GetAndDelete, got %v", key, err) + } + } +} + +func TestMemoryCache_RaceConditionDetection(t *testing.T) { + // This test is specifically designed to detect race conditions + // by running many operations concurrently with the race detector enabled + + cache := cache.NewMemory(0) + + ctx := context.Background() + const duration = 2 * time.Second + const numGoroutines = 20 + var wg sync.WaitGroup + + start := time.Now() + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + for time.Since(start) < duration { + key := "race-key-" + strconv.Itoa(goroutineID) + value := "race-value-" + strconv.Itoa(goroutineID) + "-" + strconv.FormatInt(time.Now().UnixNano(), 10) + + // Randomly choose operation + switch time.Now().UnixNano() % 4 { + case 0: + cache.Set(ctx, key, value) + case 1: + cache.Get(ctx, key) + case 2: + cache.Delete(ctx, key) + case 3: + cache.GetAndDelete(ctx, key) + } + } + }(i) + } + + wg.Wait() +} diff --git a/pkg/cache/memory_edge_test.go b/pkg/cache/memory_edge_test.go new file mode 100644 index 0000000..8fb9cba --- /dev/null +++ b/pkg/cache/memory_edge_test.go @@ -0,0 +1,391 @@ +package cache_test + +import ( + "context" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/android-sms-gateway/server/pkg/cache" +) + +func TestMemoryCache_ZeroTTL(t *testing.T) { + // Test cache with zero TTL (no expiration) + cache := cache.NewMemory(0) + ctx := context.Background() + + key := "zero-ttl-key" + value := "zero-ttl-value" + + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Wait some time to ensure no expiration + time.Sleep(100 * time.Millisecond) + + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_ImmediateExpiration(t *testing.T) { + // Test c with very short TTL + c := cache.NewMemory(0) + ctx := context.Background() + + key := "expiring-key" + value := "expiring-value" + ttl := 1 * time.Millisecond + + err := c.Set(ctx, key, value, cache.WithTTL(ttl)) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Wait for expiration + time.Sleep(2 * ttl) + + _, err = c.Get(ctx, key) + if err != cache.ErrKeyExpired { + t.Errorf("Expected ErrKeyExpired, got %v", err) + } +} + +func TestMemoryCache_NilContext(t *testing.T) { + // Test cache operations with nil context + cache := cache.NewMemory(0) + key := "nil-context-key" + value := "nil-context-value" + + err := cache.Set(nil, key, value) //nolint:staticcheck + if err != nil { + t.Fatalf("Set with nil context failed: %v", err) + } + + retrieved, err := cache.Get(nil, key) //nolint:staticcheck + if err != nil { + t.Fatalf("Get with nil context failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_EmptyKey(t *testing.T) { + // Test cache operations with empty key + cache := cache.NewMemory(0) + ctx := context.Background() + key := "" + value := "empty-key-value" + + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set with empty key failed: %v", err) + } + + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get with empty key failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_OverwriteWithDifferentTTL(t *testing.T) { + // Test overwriting a key with different TTL + c := cache.NewMemory(0) + ctx := context.Background() + key := "ttl-key" + value1 := "value1" + value2 := "value2" + + // Set with short TTL + err := c.Set(ctx, key, value1, cache.WithTTL(100*time.Millisecond)) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Overwrite with longer TTL + err = c.Set(ctx, key, value2, cache.WithTTL(1*time.Second)) + if err != nil { + t.Fatalf("Set overwrite failed: %v", err) + } + + retrieved, err := c.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value2 { + t.Errorf("Expected %s, got %s", value2, retrieved) + } + + // Wait for short TTL to expire but not long TTL + time.Sleep(200 * time.Millisecond) + + retrieved, err = c.Get(ctx, key) + if err != nil { + t.Fatalf("Get after partial wait failed: %v", err) + } + + if retrieved != value2 { + t.Errorf("Expected %s after partial wait, got %s", value2, retrieved) + } +} + +func TestMemoryCache_MixedTTLScenarios(t *testing.T) { + // Test various TTL scenarios + c := cache.NewMemory(0) + ctx := context.Background() + + // Set multiple keys with different TTLs + keys := map[string]time.Duration{ + "no-ttl": 0, + "short-ttl": 50 * time.Millisecond, + "medium-ttl": 200 * time.Millisecond, + "long-ttl": 500 * time.Millisecond, + } + + for key, ttl := range keys { + value := "value-" + key + var err error + if ttl > 0 { + err = c.Set(ctx, key, value, cache.WithTTL(ttl)) + } else { + err = c.Set(ctx, key, value) + } + if err != nil { + t.Fatalf("Set %s failed: %v", key, err) + } + } + + // Verify all keys are present initially + for key := range keys { + _, err := c.Get(ctx, key) + if err != nil { + t.Fatalf("Get %s failed: %v", key, err) + } + } + + // Wait for short TTL to expire + time.Sleep(100 * time.Millisecond) + + // Short TTL key should be expired, others should still be there + _, err := c.Get(ctx, "short-ttl") + if err != cache.ErrKeyExpired { + t.Errorf("Expected ErrKeyExpired for short-ttl, got %v", err) + } + + for key := range keys { + if key == "short-ttl" { + continue + } + _, err := c.Get(ctx, key) + if err != nil { + t.Errorf("Get %s failed: %v", key, err) + } + } + + // Wait for medium TTL to expire + time.Sleep(150 * time.Millisecond) + + // Medium TTL key should be expired, others should still be there + _, err = c.Get(ctx, "medium-ttl") + if err != cache.ErrKeyExpired { + t.Errorf("Expected ErrKeyExpired for medium-ttl, got %v", err) + } + + for key := range keys { + if key == "short-ttl" || key == "medium-ttl" { + continue + } + _, err := c.Get(ctx, key) + if err != nil { + t.Errorf("Get %s failed: %v", key, err) + } + } +} + +func TestMemoryCache_RapidOperations(t *testing.T) { + // Test rapid c operations + c := cache.NewMemory(0) + ctx := context.Background() + + const numOperations = 1000 + const duration = 100 * time.Millisecond + + start := time.Now() + opsCompleted := 0 + + for i := range numOperations { + // Alternate between set and get + if i%2 == 0 { + key := "rapid-key-" + strconv.Itoa(i) + value := "rapid-value-" + strconv.Itoa(i) + err := c.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + } else { + key := "rapid-key-" + strconv.Itoa(i-1) + _, err := c.Get(ctx, key) + if err != nil { + t.Errorf("Get failed: %v", err) + } + } + opsCompleted++ + } + + durationTaken := time.Since(start) + t.Logf("Completed %d operations in %v (%.2f ops/ms)", opsCompleted, durationTaken, float64(opsCompleted)/float64(durationTaken.Milliseconds())) + + // Verify operations completed within reasonable time + if durationTaken > 2*duration { + t.Errorf("Operations took too long: %v", durationTaken) + } +} + +func TestMemoryCache_CleanupOnEmptyCache(t *testing.T) { + // Test cleanup operation on empty cache + cache := cache.NewMemory(0) + ctx := context.Background() + + err := cache.Cleanup(ctx) + if err != nil { + t.Fatalf("Cleanup failed: %v", err) + } + + // Should still work normally after cleanup + key := "post-cleanup-key" + value := "post-cleanup-value" + + err = cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set after cleanup failed: %v", err) + } + + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get after cleanup failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_DrainWithExpiredItems(t *testing.T) { + // Test drain operation with mix of expired and non-expired items + c := cache.NewMemory(0) + ctx := context.Background() + + // Set non-expired item + err := c.Set(ctx, "valid-key", "valid-value") + if err != nil { + t.Fatalf("Set valid key failed: %v", err) + } + + // Set expired item + err = c.Set(ctx, "expired-key", "expired-value", cache.WithTTL(1*time.Millisecond)) + if err != nil { + t.Fatalf("Set expired key failed: %v", err) + } + + // Wait for expiration + time.Sleep(10 * time.Millisecond) + + // Drain should only return non-expired items + items, err := c.Drain(ctx) + if err != nil { + t.Fatalf("Drain failed: %v", err) + } + + if len(items) != 1 { + t.Errorf("Expected 1 item in drain result, got %d", len(items)) + } + + if items["valid-key"] != "valid-value" { + t.Errorf("Expected valid-value, got %s", items["valid-key"]) + } + + // Verify expired item is gone (should be completely removed, not just expired) + _, err = c.Get(ctx, "expired-key") + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound, got %v", err) + } +} + +func TestMemoryCache_ExtremeKeyLength(t *testing.T) { + // Test with very long keys + cache := cache.NewMemory(0) + ctx := context.Background() + + // Create a very long key (1KB) + longKey := strings.Repeat("a", 1024) + value := "extreme-key-value" + + err := cache.Set(ctx, longKey, value) + if err != nil { + t.Fatalf("Set with long key failed: %v", err) + } + + retrieved, err := cache.Get(ctx, longKey) + if err != nil { + t.Fatalf("Get with long key failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_RaceConditionWithExpiration(t *testing.T) { + // Test race conditions between expiration and access + c := cache.NewMemory(0) + ctx := context.Background() + + key := "race-expire-key" + value := "race-expire-value" + ttl := 10 * time.Millisecond + + // Set item with short TTL + err := c.Set(ctx, key, value, cache.WithTTL(ttl)) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + const numGoroutines = 50 + var wg sync.WaitGroup + + // Launch goroutines that try to access the key while it's expiring + for i := range numGoroutines { + wg.Add(1) + go func(id int) { + defer wg.Done() + + // Wait for the item to be close to expiration with some jitter + jitter := time.Duration(id%3) * time.Millisecond + time.Sleep(ttl - 2*time.Millisecond + jitter) + + // Try to get the item + _, err := c.Get(ctx, key) + if err != nil && err != cache.ErrKeyExpired && err != cache.ErrKeyNotFound { + t.Errorf("Get failed: %v", err) + } + }(i) + } + + wg.Wait() +} diff --git a/pkg/cache/memory_profile_test.go b/pkg/cache/memory_profile_test.go new file mode 100644 index 0000000..c7c0493 --- /dev/null +++ b/pkg/cache/memory_profile_test.go @@ -0,0 +1,301 @@ +//nolint:errcheck +package cache_test + +import ( + "context" + "runtime" + "strconv" + "testing" + + "github.com/android-sms-gateway/server/pkg/cache" +) + +func TestMemoryCache_MemoryAllocationPattern(t *testing.T) { + // This test analyzes memory allocation patterns during cache operations + cache := cache.NewMemory(0) + ctx := context.Background() + + // Force GC and get baseline memory + runtime.GC() + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + // Perform cache operations that trigger allocations + const numItems = 1000 + for i := range numItems { + key := "profile-key-" + strconv.Itoa(i) + value := "profile-value-" + strconv.Itoa(i) + + err := cache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + + // Get the value to trigger read path + _, err = cache.Get(ctx, key) + if err != nil { + t.Errorf("Get failed: %v", err) + } + } + + // Force GC again and measure memory + runtime.GC() + runtime.ReadMemStats(&m2) + + // Calculate memory growth + allocDiff := m2.TotalAlloc - m1.TotalAlloc + allocPerItem := float64(allocDiff) / float64(numItems) + + t.Logf("Memory allocation stats:") + t.Logf(" Total allocated: %d bytes", m2.TotalAlloc) + t.Logf(" Allocation difference: %d bytes", allocDiff) + t.Logf(" Allocations per item: %.2f bytes", allocPerItem) + t.Logf(" Heap objects: %d", m2.HeapObjects) + t.Logf(" GC cycles: %d", m2.NumGC) + + // Reasonable bounds for memory allocation (these are approximate) + // Higher threshold due to both Set and Get operations + if allocPerItem > 300 { + t.Errorf("Expected less than 300 bytes per item, got %.2f bytes", allocPerItem) + } +} + +func TestMemoryCache_MemoryCleanup(t *testing.T) { + // This test verifies that memory is properly cleaned up after cache operations + cache := cache.NewMemory(0) + ctx := context.Background() + + // Force GC and get baseline memory + runtime.GC() + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + // Add many items to cache + const numItems = 5000 + for i := range numItems { + key := "cleanup-key-" + strconv.Itoa(i) + value := "cleanup-value-" + strconv.Itoa(i) + + err := cache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + } + + // Drain the cache to clear all items + _, err := cache.Drain(ctx) + if err != nil { + t.Errorf("Drain failed: %v", err) + } + + // Force GC and measure memory after cleanup + runtime.GC() + runtime.ReadMemStats(&m2) + + // Calculate memory reduction + allocDiff := m2.TotalAlloc - m1.TotalAlloc + + t.Logf("Memory cleanup stats:") + t.Logf(" Total allocated: %d bytes", m2.TotalAlloc) + t.Logf(" Allocation difference: %d bytes", allocDiff) + t.Logf(" Heap objects: %d", m2.HeapObjects) + t.Logf(" GC cycles: %d", m2.NumGC) + + // Memory should not grow significantly after cleanup + // Allow some growth for overhead, but it should be reasonable + if allocDiff > 2*1024*1024 { // 2MB + t.Errorf("Expected less than 2MB memory growth after cleanup, got %d bytes", allocDiff) + } +} + +func TestMemoryCache_MemoryPressure(t *testing.T) { + // This test simulates memory pressure scenarios + ctx := context.Background() + + // Force GC and get baseline memory + runtime.GC() + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + // Simulate memory pressure by creating and destroying many cache instances + const numCaches = 100 + const itemsPerCache = 50 + + for i := 0; i < numCaches; i++ { + // Create a new cache + tempCache := cache.NewMemory(0) + + // Add items to cache + for j := 0; j < itemsPerCache; j++ { + key := "pressure-key-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + value := "pressure-value-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + + err := tempCache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + } + + // Drain the cache + _, err := tempCache.Drain(ctx) + if err != nil { + t.Errorf("Drain failed: %v", err) + } + } + + // Force GC and measure memory after pressure test + runtime.GC() + runtime.ReadMemStats(&m2) + + // Calculate memory growth + allocDiff := m2.TotalAlloc - m1.TotalAlloc + + t.Logf("Memory pressure stats:") + t.Logf(" Total allocated: %d bytes", m2.TotalAlloc) + t.Logf(" Allocation difference: %d bytes", allocDiff) + t.Logf(" Heap objects: %d", m2.HeapObjects) + t.Logf(" GC cycles: %d", m2.NumGC) + + // Memory growth should be reasonable even under pressure + // Allow some growth for overhead, but it should be proportional + // Higher threshold due to cache creation/destruction overhead + expectedMaxGrowth := uint64(numCaches * itemsPerCache * 300) // 300 bytes per item estimate + if allocDiff > expectedMaxGrowth { + t.Errorf("Expected less than %d bytes memory growth under pressure, got %d bytes", expectedMaxGrowth, allocDiff) + } +} + +func TestMemoryCache_GCStress(t *testing.T) { + // This test verifies cache behavior under frequent GC cycles + c := cache.NewMemory(0) + ctx := context.Background() + + // Add items to cache + const numItems = 1000 + for i := range numItems { + key := "gc-key-" + strconv.Itoa(i) + value := "gc-value-" + strconv.Itoa(i) + + err := c.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + } + + // Perform frequent GC operations and verify cache still works + const numGCs = 10 + for range numGCs { + // Force GC + runtime.GC() + + // Verify cache operations still work + for j := range 100 { + key := "gc-key-" + strconv.Itoa(j) + _, err := c.Get(ctx, key) + if err != nil { + t.Errorf("Get failed during GC stress test: %v", err) + } + } + } + + // Verify all items are still accessible + for i := range numItems { + key := "gc-key-" + strconv.Itoa(i) + value := "gc-value-" + strconv.Itoa(i) + + retrieved, err := c.Get(ctx, key) + if err != nil { + t.Errorf("Get failed after GC stress: %v", err) + } + + if retrieved != value { + t.Errorf("Value mismatch after GC stress: expected %s, got %s", value, retrieved) + } + } +} + +func TestMemoryCache_MemoryLeakDetection(t *testing.T) { + // This test helps detect memory leaks by creating and destroying many caches + ctx := context.Background() + + // Force GC and get baseline memory + runtime.GC() + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + + // Create and destroy many cache instances + const numCaches = 1000 + for i := range numCaches { + // Create a new cache + tempCache := cache.NewMemory(0) + + // Add some items + for j := range 10 { + key := "leak-key-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + value := "leak-value-" + strconv.Itoa(i) + "-" + strconv.Itoa(j) + + err := tempCache.Set(ctx, key, value) + if err != nil { + t.Errorf("Set failed: %v", err) + } + } + + // Clear the cache + tempCache.Drain(ctx) + + // Help GC by clearing reference + tempCache = nil + } + + // Force GC and measure memory + runtime.GC() + runtime.ReadMemStats(&m2) + + // Calculate memory growth + // Convert to int64 to avoid unsigned wrap-around when memory decreases + heapDiff := int64(m2.HeapAlloc) - int64(m1.HeapAlloc) + + t.Logf("Memory leak detection stats:") + t.Logf(" Initial heap: %d bytes", m1.HeapAlloc) + t.Logf(" Final heap: %d bytes", m2.HeapAlloc) + t.Logf(" Heap delta: %d bytes", heapDiff) + t.Logf(" Heap objects: %d", m2.HeapObjects) + t.Logf(" GC cycles: %d", m2.NumGC) + + // Only report as leak if memory increased beyond threshold + if heapDiff > 1*1024*1024 { // 1MB threshold for leak detection + t.Errorf("Potential memory leak detected: %d bytes retained after cleanup", heapDiff) + } else if heapDiff < 0 { + t.Logf("Memory reduced by %d bytes after cleanup", -heapDiff) + } +} + +func BenchmarkMemoryCache_MemoryUsage(b *testing.B) { + // This benchmark tracks memory usage patterns + cache := cache.NewMemory(0) + ctx := context.Background() + + b.ReportAllocs() + runtime.GC() + var m1, m2 runtime.MemStats + runtime.ReadMemStats(&m1) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := "bench-key-" + strconv.Itoa(i) + value := "bench-value-" + strconv.Itoa(i) + + // Set and get + cache.Set(ctx, key, value) + cache.Get(ctx, key) + + // Delete + cache.Delete(ctx, key) + + i++ + } + }) + runtime.ReadMemStats(&m2) + b.Logf("TotalAlloc per op: %.2f bytes/op", float64(m2.TotalAlloc-m1.TotalAlloc)/float64(b.N)) +} diff --git a/pkg/cache/memory_test.go b/pkg/cache/memory_test.go new file mode 100644 index 0000000..d8d9e7f --- /dev/null +++ b/pkg/cache/memory_test.go @@ -0,0 +1,491 @@ +package cache_test + +import ( + "context" + "testing" + "time" + + "github.com/android-sms-gateway/server/pkg/cache" +) + +func TestMemoryCache_SetAndGet(t *testing.T) { + cache := cache.NewMemory(0) // No TTL for basic tests + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Test setting a value + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Test getting the value + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_SetAndGetWithTTL(t *testing.T) { + c := cache.NewMemory(0) // No default TTL + + ctx := context.Background() + key := "test-key" + value := "test-value" + ttl := 2 * time.Hour + + // Test setting a value with TTL + err := c.Set(ctx, key, value, cache.WithTTL(ttl)) + if err != nil { + t.Fatalf("Set with TTL failed: %v", err) + } + + // Test getting the value + retrieved, err := c.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_SetAndGetWithValidUntil(t *testing.T) { + c := cache.NewMemory(0) // No default TTL + + ctx := context.Background() + key := "test-key" + value := "test-value" + validUntil := time.Now().Add(2 * time.Hour) + + // Test setting a value with validUntil + err := c.Set(ctx, key, value, cache.WithValidUntil(validUntil)) + if err != nil { + t.Fatalf("Set with validUntil failed: %v", err) + } + + // Test getting the value + retrieved, err := c.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_SetAndGetWithDefaultTTL(t *testing.T) { + defaultTTL := 1 * time.Hour + cache := cache.NewMemory(defaultTTL) // With default TTL + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Test setting a value without explicit TTL (should use default) + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Test getting the value + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_GetNotFound(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + key := "non-existent-key" + + _, err := c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound, got %v", err) + } +} + +func TestMemoryCache_SetOrFailNewKey(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Test SetOrFail with new key + err := cache.SetOrFail(ctx, key, value) + if err != nil { + t.Fatalf("SetOrFail failed: %v", err) + } + + // Verify the value was set + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_SetOrFailExistingKey(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value1 := "value1" + value2 := "value2" + + // Set initial value + err := c.Set(ctx, key, value1) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Try SetOrFail with existing key + err = c.SetOrFail(ctx, key, value2) + if err != cache.ErrKeyExists { + t.Errorf("Expected ErrKeyExists, got %v", err) + } + + // Verify original value is still there + retrieved, err := c.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value1 { + t.Errorf("Expected %s, got %s", value1, retrieved) + } +} + +func TestMemoryCache_Delete(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Set a value + err := c.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Delete the key + err = c.Delete(ctx, key) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // Verify the key is gone + _, err = c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound after delete, got %v", err) + } +} + +func TestMemoryCache_DeleteNonExistent(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "non-existent-key" + + // Delete non-existent key should not fail + err := cache.Delete(ctx, key) + if err != nil { + t.Errorf("Delete of non-existent key failed: %v", err) + } +} + +func TestMemoryCache_GetAndDelete(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Set a value + err := c.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Get and delete the key + retrieved, err := c.GetAndDelete(ctx, key) + if err != nil { + t.Fatalf("GetAndDelete failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } + + // Verify the key is gone + _, err = c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound after GetAndDelete, got %v", err) + } +} + +func TestMemoryCache_GetAndDeleteNonExistent(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + key := "non-existent-key" + + // GetAndDelete non-existent key should return ErrKeyNotFound + _, err := c.GetAndDelete(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound, got %v", err) + } +} + +func TestMemoryCache_Drain(t *testing.T) { + c := cache.NewMemory(0) + + ctx := context.Background() + items := map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + } + + // Set multiple values + for key, value := range items { + err := c.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed for %s: %v", key, err) + } + } + + // Drain the cache + drained, err := c.Drain(ctx) + if err != nil { + t.Fatalf("Drain failed: %v", err) + } + + // Verify all items are drained + if len(drained) != len(items) { + t.Errorf("Expected %d items, got %d", len(items), len(drained)) + } + + for key, expectedValue := range items { + actualValue, ok := drained[key] + if !ok { + t.Errorf("Expected key %s in drained items", key) + continue + } + if actualValue != expectedValue { + t.Errorf("Expected %s, got %s for key %s", expectedValue, actualValue, key) + } + } + + // Verify cache is now empty + for key := range items { + _, err := c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound for key %s after drain, got %v", key, err) + } + } +} + +func TestMemoryCache_DrainEmpty(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + + // Drain empty cache + drained, err := cache.Drain(ctx) + if err != nil { + t.Fatalf("Drain failed: %v", err) + } + + if len(drained) != 0 { + t.Errorf("Expected 0 items from empty cache, got %d", len(drained)) + } +} + +func TestMemoryCache_Cleanup(t *testing.T) { + c := cache.NewMemory(0) // No default TTL + + ctx := context.Background() + key := "test-key" + value := "test-value" + shortTTL := 100 * time.Millisecond + + // Set a value with short TTL + err := c.Set(ctx, key, value, cache.WithTTL(shortTTL)) + if err != nil { + t.Fatalf("Set with TTL failed: %v", err) + } + + // Verify the value is there initially + _, err = c.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + // Wait for the item to expire + time.Sleep(2 * shortTTL) + + // Run cleanup + err = c.Cleanup(ctx) + if err != nil { + t.Fatalf("Cleanup failed: %v", err) + } + + // Verify the expired item is gone + _, err = c.Get(ctx, key) + if err != cache.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound after cleanup, got %v", err) + } +} + +func TestMemoryCache_CleanupNoExpired(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "test-value" + + // Set a value without TTL + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Run cleanup on cache with no expired items + err = cache.Cleanup(ctx) + if err != nil { + t.Fatalf("Cleanup failed: %v", err) + } + + // Verify the value is still there + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_Overwrite(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value1 := "value1" + value2 := "value2" + + // Set initial value + err := cache.Set(ctx, key, value1) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Overwrite with new value + err = cache.Set(ctx, key, value2) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Verify the new value is there + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value2 { + t.Errorf("Expected %s, got %s", value2, retrieved) + } +} + +func TestMemoryCache_EmptyValue(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test-key" + value := "" + + // Set empty value + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Get the empty value + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected empty string, got %s", retrieved) + } +} + +func TestMemoryCache_SpecialCharacters(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "test:key/with@special#chars" + value := "value with special chars: !@#$%^&*()" + + // Set value with special characters + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Get the value + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Expected %s, got %s", value, retrieved) + } +} + +func TestMemoryCache_LargeValue(t *testing.T) { + cache := cache.NewMemory(0) + + ctx := context.Background() + key := "large-key" + value := string(make([]byte, 1024*1024)) // 1MB value + + // Set large value + err := cache.Set(ctx, key, value) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Get the large value + retrieved, err := cache.Get(ctx, key) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved != value { + t.Errorf("Large value mismatch") + } +} diff --git a/pkg/cache/options.go b/pkg/cache/options.go new file mode 100644 index 0000000..e6d0b7d --- /dev/null +++ b/pkg/cache/options.go @@ -0,0 +1,38 @@ +package cache + +import "time" + +// Option configures per-item cache behavior (e.g., expiry). +type Option func(*options) + +type options struct { + validUntil time.Time +} + +func (o *options) apply(opts ...Option) *options { + for _, opt := range opts { + opt(o) + } + + return o +} + +// WithTTL is an Option that sets the TTL (time to live) for an item, i.e. the +// item will expire after the given duration from the time of insertion. +func WithTTL(ttl time.Duration) Option { + return func(o *options) { + if ttl <= 0 { + o.validUntil = time.Time{} + } + + o.validUntil = time.Now().Add(ttl) + } +} + +// WithValidUntil is an Option that sets the valid until time for an item, i.e. +// the item will expire at the given time. +func WithValidUntil(validUntil time.Time) Option { + return func(o *options) { + o.validUntil = validUntil + } +} diff --git a/pkg/cache/redis.go b/pkg/cache/redis.go new file mode 100644 index 0000000..9ea2f3a --- /dev/null +++ b/pkg/cache/redis.go @@ -0,0 +1,168 @@ +package cache + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/redis/go-redis/v9" +) + +const ( + redisCacheKey = "cache" + + // getAndDeleteScript atomically gets and deletes a hash field + getAndDeleteScript = ` +local value = redis.call('HGET', KEYS[1], ARGV[1]) +if value then + redis.call('HDEL', KEYS[1], ARGV[1]) + return value +else + return false +end +` + + hgetallAndDeleteScript = ` +local items = redis.call('HGETALL', KEYS[1]) +if #items > 0 then + local ok = pcall(redis.call, 'UNLINK', KEYS[1]) + if not ok then redis.call('DEL', KEYS[1]) end +end +return items +` +) + +type redisCache struct { + client *redis.Client + + key string + + ttl time.Duration +} + +func NewRedis(client *redis.Client, prefix string, ttl time.Duration) Cache { + if prefix != "" && !strings.HasSuffix(prefix, ":") { + prefix += ":" + } + + return &redisCache{ + client: client, + + key: prefix + redisCacheKey, + + ttl: ttl, + } +} + +// Cleanup implements Cache. +func (r *redisCache) Cleanup(_ context.Context) error { + return nil +} + +// Delete implements Cache. +func (r *redisCache) Delete(ctx context.Context, key string) error { + if err := r.client.HDel(ctx, r.key, key).Err(); err != nil { + return fmt.Errorf("can't delete cache item: %w", err) + } + + return nil +} + +// Drain implements Cache. +func (r *redisCache) Drain(ctx context.Context) (map[string]string, error) { + res, err := r.client.Eval(ctx, hgetallAndDeleteScript, []string{r.key}).Result() + if err != nil { + return nil, fmt.Errorf("can't drain cache: %w", err) + } + + arr, ok := res.([]any) + if !ok || len(arr) == 0 { + return map[string]string{}, nil + } + + out := make(map[string]string, len(arr)/2) + for i := 0; i < len(arr); i += 2 { + f, _ := arr[i].(string) + v, _ := arr[i+1].(string) + out[f] = v + } + + return out, nil +} + +// Get implements Cache. +func (r *redisCache) Get(ctx context.Context, key string) (string, error) { + val, err := r.client.HGet(ctx, r.key, key).Result() + if err != nil { + if err == redis.Nil { + return "", ErrKeyNotFound + } + + return "", fmt.Errorf("can't get cache item: %w", err) + } + + return val, nil +} + +// GetAndDelete implements Cache. +func (r *redisCache) GetAndDelete(ctx context.Context, key string) (string, error) { + result, err := r.client.Eval(ctx, getAndDeleteScript, []string{r.key}, key).Result() + if err != nil { + return "", fmt.Errorf("can't get cache item: %w", err) + } + + if value, ok := result.(string); ok { + return value, nil + } + + return "", ErrKeyNotFound +} + +// Set implements Cache. +func (r *redisCache) Set(ctx context.Context, key string, value string, opts ...Option) error { + options := new(options) + if r.ttl > 0 { + options.validUntil = time.Now().Add(r.ttl) + } + options.apply(opts...) + + _, err := r.client.Pipelined(ctx, func(p redis.Pipeliner) error { + p.HSet(ctx, r.key, key, value) + if !options.validUntil.IsZero() { + p.HExpireAt(ctx, r.key, options.validUntil, key) + } + return nil + }) + if err != nil { + return fmt.Errorf("can't set cache item: %w", err) + } + + return nil +} + +// SetOrFail implements Cache. +func (r *redisCache) SetOrFail(ctx context.Context, key string, value string, opts ...Option) error { + val, err := r.client.HSetNX(ctx, r.key, key, value).Result() + if err != nil { + return fmt.Errorf("can't set cache item: %w", err) + } + + if !val { + return ErrKeyExists + } + + options := new(options) + if r.ttl > 0 { + options.validUntil = time.Now().Add(r.ttl) + } + options.apply(opts...) + + if !options.validUntil.IsZero() { + if err := r.client.HExpireAt(ctx, r.key, options.validUntil).Err(); err != nil { + return fmt.Errorf("can't set cache item ttl: %w", err) + } + } + + return nil +}