From 4bcfc35c87e45ce2d5843a76666f62fbc73159ba Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Tue, 22 Jul 2025 11:43:21 +0700 Subject: [PATCH] [events] introduce events module as a proxy between the app and push/sse --- deployments/grafana/dashboards/events.json | 351 ++++++++++++++++++ .../prometheus/alerts/events-alerts.yml | 11 + internal/sms-gateway/app.go | 2 + internal/sms-gateway/handlers/upstream.go | 8 +- internal/sms-gateway/modules/events/events.go | 29 ++ .../sms-gateway/modules/events/metrics.go | 70 ++++ internal/sms-gateway/modules/events/module.go | 30 ++ .../sms-gateway/modules/events/service.go | 116 ++++++ internal/sms-gateway/modules/events/types.go | 23 ++ .../sms-gateway/modules/messages/errors.go | 7 + .../sms-gateway/modules/messages/service.go | 29 +- .../sms-gateway/modules/push/domain/events.go | 36 -- .../sms-gateway/modules/push/fcm/client.go | 14 +- .../sms-gateway/modules/push/fcm/utils.go | 20 + internal/sms-gateway/modules/push/service.go | 64 +--- internal/sms-gateway/modules/push/types.go | 35 +- .../sms-gateway/modules/push/types/types.go | 10 + .../modules/push/upstream/client.go | 12 +- .../sms-gateway/modules/settings/service.go | 30 +- internal/sms-gateway/modules/sse/service.go | 55 +-- internal/sms-gateway/modules/sse/types.go | 8 + .../sms-gateway/modules/webhooks/service.go | 19 +- pkg/swagger/docs/mobile.http | 2 +- 23 files changed, 767 insertions(+), 214 deletions(-) create mode 100644 deployments/grafana/dashboards/events.json create mode 100644 deployments/prometheus/alerts/events-alerts.yml create mode 100644 internal/sms-gateway/modules/events/events.go create mode 100644 internal/sms-gateway/modules/events/metrics.go create mode 100644 internal/sms-gateway/modules/events/module.go create mode 100644 internal/sms-gateway/modules/events/service.go create mode 100644 internal/sms-gateway/modules/events/types.go create mode 100644 internal/sms-gateway/modules/messages/errors.go delete mode 100644 internal/sms-gateway/modules/push/domain/events.go create mode 100644 internal/sms-gateway/modules/push/fcm/utils.go create mode 100644 internal/sms-gateway/modules/push/types/types.go create mode 100644 internal/sms-gateway/modules/sse/types.go diff --git a/deployments/grafana/dashboards/events.json b/deployments/grafana/dashboards/events.json new file mode 100644 index 0000000..55e0a8b --- /dev/null +++ b/deployments/grafana/dashboards/events.json @@ -0,0 +1,351 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "description": "Dashboard for monitoring event notification metrics", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 12, + "links": [], + "panels": [ + { + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "expr": "sum by (event) (rate(sms_events_enqueued_total[5m]))", + "legendFormat": "{{event}}", + "refId": "A" + } + ], + "title": "Enqueued Events (per event type)", + "type": "timeseries" + }, + { + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "expr": "sum by (delivery_type) (rate(sms_events_sent_total[5m]))", + "legendFormat": "{{delivery_type}}", + "refId": "A" + } + ], + "title": "Sent Events (by delivery type)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + } + }, + "mappings": [] + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "pieType": "pie", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "editorMode": "code", + "exemplar": false, + "expr": "sum by (delivery_type, reason) (sms_events_failed_total)", + "format": "heatmap", + "instant": true, + "legendFormat": "{{delivery_type}} - {{reason}}", + "range": false, + "refId": "A" + } + ], + "title": "Failure Analysis (by delivery type and reason)", + "type": "piechart" + }, + { + "datasource": { + "type": "prometheus", + "uid": "edqp0a73uh2bka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": 0 + }, + { + "color": "green", + "value": 90 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 5, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "12.2.0-16711121739", + "targets": [ + { + "editorMode": "code", + "expr": "100 * (sum(sms_events_sent_total) / (sum(sms_events_sent_total) + sum(sms_events_failed_total)))", + "format": "table", + "instant": true, + "refId": "A" + } + ], + "title": "Delivery Success Rate", + "type": "gauge" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 41, + "tags": [ + "push", + "sms", + "sse" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Event Notifications", + "uid": "9886a96d-9329-4110-b80b-3a6aa8216520", + "version": 5 +} \ No newline at end of file diff --git a/deployments/prometheus/alerts/events-alerts.yml b/deployments/prometheus/alerts/events-alerts.yml new file mode 100644 index 0000000..1a3f9b3 --- /dev/null +++ b/deployments/prometheus/alerts/events-alerts.yml @@ -0,0 +1,11 @@ +groups: + - name: events-alerts + rules: + - alert: LowEventDeliverySuccessRate + expr: 100 * (sum(rate(sms_events_sent_total[5m])) / (sum(rate(sms_events_sent_total[5m])) + sum(rate(sms_events_failed_total[5m]))) < 90 + for: 10m + labels: + severity: critical + annotations: + summary: "Event delivery success rate below 90%" + description: "Event delivery success rate is at {{ $value }}%" diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index 42f81d8..a653d26 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -10,6 +10,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/cleaner" appdb "github.com/android-sms-gateway/server/internal/sms-gateway/modules/db" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/events" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/health" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/metrics" @@ -39,6 +40,7 @@ var Module = fx.Module( auth.Module, push.Module, db.Module, + events.Module, messages.Module, health.Module, webhooks.Module, diff --git a/internal/sms-gateway/handlers/upstream.go b/internal/sms-gateway/handlers/upstream.go index 903a467..871692a 100644 --- a/internal/sms-gateway/handlers/upstream.go +++ b/internal/sms-gateway/handlers/upstream.go @@ -68,10 +68,10 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error { return err } - event := push.NewEvent( - anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued), - v.Data, - ) + event := push.Event{ + Type: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued), + Data: v.Data, + } if err := h.pushSvc.Enqueue(v.Token, event); err != nil { h.Logger.Error("Can't push message", zap.Error(err)) diff --git a/internal/sms-gateway/modules/events/events.go b/internal/sms-gateway/modules/events/events.go new file mode 100644 index 0000000..89c9094 --- /dev/null +++ b/internal/sms-gateway/modules/events/events.go @@ -0,0 +1,29 @@ +package events + +import ( + "time" + + "github.com/android-sms-gateway/client-go/smsgateway" +) + +func NewMessageEnqueuedEvent() *Event { + return NewEvent(smsgateway.PushMessageEnqueued, nil) +} + +func NewWebhooksUpdatedEvent() *Event { + return NewEvent(smsgateway.PushWebhooksUpdated, nil) +} + +func NewMessagesExportRequestedEvent(since, until time.Time) *Event { + return NewEvent( + smsgateway.PushMessagesExportRequested, + map[string]string{ + "since": since.Format(time.RFC3339), + "until": until.Format(time.RFC3339), + }, + ) +} + +func NewSettingsUpdatedEvent() *Event { + return NewEvent(smsgateway.PushSettingsUpdated, nil) +} diff --git a/internal/sms-gateway/modules/events/metrics.go b/internal/sms-gateway/modules/events/metrics.go new file mode 100644 index 0000000..13afdc8 --- /dev/null +++ b/internal/sms-gateway/modules/events/metrics.go @@ -0,0 +1,70 @@ +package events + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metric constants +const ( + MetricEnqueuedTotal = "enqueued_total" + MetricSentTotal = "sent_total" + MetricFailedTotal = "failed_total" + + LabelEvent = "event" + LabelDeliveryType = "delivery_type" + LabelReason = "reason" + + DeliveryTypePush = "push" + DeliveryTypeSSE = "sse" + DeliveryTypeUnknown = "unknown" + + FailureReasonQueueFull = "queue_full" + FailureReasonProviderFailed = "provider_failed" +) + +// metrics contains all Prometheus metrics for the events module +type metrics struct { + enqueuedCounter *prometheus.CounterVec + sentCounter *prometheus.CounterVec + failedCounter *prometheus.CounterVec +} + +// newMetrics creates and initializes all events metrics +func newMetrics() *metrics { + return &metrics{ + enqueuedCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "events", + Name: MetricEnqueuedTotal, + Help: "Total number of events enqueued", + }, []string{LabelEvent}), + sentCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "events", + Name: MetricSentTotal, + Help: "Total number of events sent", + }, []string{LabelEvent, LabelDeliveryType}), + failedCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "events", + Name: MetricFailedTotal, + Help: "Total number of failed notifications", + }, []string{LabelEvent, LabelDeliveryType, LabelReason}), + } +} + +// IncrementEnqueued increments the enqueued counter for the given event type +func (m *metrics) IncrementEnqueued(eventType string) { + m.enqueuedCounter.WithLabelValues(eventType).Inc() +} + +// IncrementSent increments the sent counter for the given event type and delivery type +func (m *metrics) IncrementSent(eventType string, deliveryType string) { + m.sentCounter.WithLabelValues(eventType, deliveryType).Inc() +} + +// IncrementFailed increments the failed counter for the given event type, delivery type, and reason +func (m *metrics) IncrementFailed(eventType string, deliveryType string, reason string) { + m.failedCounter.WithLabelValues(eventType, deliveryType, reason).Inc() +} diff --git a/internal/sms-gateway/modules/events/module.go b/internal/sms-gateway/modules/events/module.go new file mode 100644 index 0000000..3b6ba7e --- /dev/null +++ b/internal/sms-gateway/modules/events/module.go @@ -0,0 +1,30 @@ +package events + +import ( + "context" + + "go.uber.org/fx" + "go.uber.org/zap" +) + +var Module = fx.Module( + "events", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("events") + }), + fx.Provide(newMetrics, fx.Private), + fx.Provide(NewService), + fx.Invoke(func(lc fx.Lifecycle, svc *Service) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go svc.Run(ctx) + return nil + }, + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + }), +) diff --git a/internal/sms-gateway/modules/events/service.go b/internal/sms-gateway/modules/events/service.go new file mode 100644 index 0000000..202b6bb --- /dev/null +++ b/internal/sms-gateway/modules/events/service.go @@ -0,0 +1,116 @@ +package events + +import ( + "context" + "fmt" + + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse" + "go.uber.org/zap" +) + +type Service struct { + deviceSvc *devices.Service + + sseSvc *sse.Service + pushSvc *push.Service + + queue chan eventWrapper + + metrics *metrics + + logger *zap.Logger +} + +func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, metrics *metrics, logger *zap.Logger) *Service { + return &Service{ + deviceSvc: devicesSvc, + sseSvc: sseSvc, + pushSvc: pushSvc, + + metrics: metrics, + + queue: make(chan eventWrapper, 128), + + logger: logger, + } +} + +func (s *Service) Notify(userID string, deviceID *string, event *Event) error { + wrapper := eventWrapper{ + UserID: userID, + DeviceID: deviceID, + Event: event, + } + + select { + case s.queue <- wrapper: + // Successfully enqueued + s.metrics.IncrementEnqueued(string(event.eventType)) + default: + s.metrics.IncrementFailed(string(event.eventType), DeliveryTypeUnknown, FailureReasonQueueFull) + return fmt.Errorf("event queue is full") + } + + return nil +} + +func (s *Service) Run(ctx context.Context) { + for { + select { + case wrapper := <-s.queue: + s.processEvent(wrapper) + case <-ctx.Done(): + s.logger.Info("Event service stopped") + return + } + } +} + +func (s *Service) processEvent(wrapper eventWrapper) { + // Load devices from database + filters := []devices.SelectFilter{} + if wrapper.DeviceID != nil { + filters = append(filters, devices.WithID(*wrapper.DeviceID)) + } + + devices, err := s.deviceSvc.Select(wrapper.UserID, filters...) + if err != nil { + s.logger.Error("Failed to select devices", zap.String("user_id", wrapper.UserID), zap.Error(err)) + return + } + + if len(devices) == 0 { + s.logger.Info("No devices found for user", zap.String("user_id", wrapper.UserID)) + return + } + + // Process each device + for _, device := range devices { + if device.PushToken != nil && *device.PushToken != "" { + // Device has push token, use push service + if err := s.pushSvc.Enqueue(*device.PushToken, push.Event{ + Type: wrapper.Event.eventType, + Data: wrapper.Event.data, + }); err != nil { + s.logger.Error("Failed to enqueue push notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err)) + s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypePush, FailureReasonProviderFailed) + } else { + s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypePush) + } + continue + } + + // No push token, use SSE service + if err := s.sseSvc.Send(device.ID, sse.Event{ + Type: wrapper.Event.eventType, + Data: wrapper.Event.data, + }); err != nil { + s.logger.Error("Failed to send SSE notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err)) + s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypeSSE, FailureReasonProviderFailed) + } else { + s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypeSSE) + } + } +} diff --git a/internal/sms-gateway/modules/events/types.go b/internal/sms-gateway/modules/events/types.go new file mode 100644 index 0000000..76755e1 --- /dev/null +++ b/internal/sms-gateway/modules/events/types.go @@ -0,0 +1,23 @@ +package events + +import ( + "github.com/android-sms-gateway/client-go/smsgateway" +) + +type Event struct { + eventType smsgateway.PushEventType + data map[string]string +} + +func NewEvent(eventType smsgateway.PushEventType, data map[string]string) *Event { + return &Event{ + eventType: eventType, + data: data, + } +} + +type eventWrapper struct { + UserID string + DeviceID *string + Event *Event +} diff --git a/internal/sms-gateway/modules/messages/errors.go b/internal/sms-gateway/modules/messages/errors.go new file mode 100644 index 0000000..195139d --- /dev/null +++ b/internal/sms-gateway/modules/messages/errors.go @@ -0,0 +1,7 @@ +package messages + +type ErrValidation string + +func (e ErrValidation) Error() string { + return string(e) +} diff --git a/internal/sms-gateway/modules/messages/service.go b/internal/sms-gateway/modules/messages/service.go index 9ed3f0b..994370c 100644 --- a/internal/sms-gateway/modules/messages/service.go +++ b/internal/sms-gateway/modules/messages/service.go @@ -11,7 +11,7 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" "github.com/android-sms-gateway/server/internal/sms-gateway/models" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/db" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/events" "github.com/capcom6/go-helpers/anys" "github.com/capcom6/go-helpers/slices" "github.com/nyaruka/phonenumbers" @@ -26,12 +26,6 @@ const ( ErrorTTLExpired = "TTL expired" ) -type ErrValidation string - -func (e ErrValidation) Error() string { - return string(e) -} - type EnqueueOptions struct { SkipPhoneValidation bool } @@ -46,8 +40,9 @@ type ServiceParams struct { Messages *repository HashingTask *HashingTask - PushSvc *push.Service - Logger *zap.Logger + EventsSvc *events.Service + + Logger *zap.Logger } type Service struct { @@ -56,8 +51,9 @@ type Service struct { messages *repository hashingTask *HashingTask - pushSvc *push.Service - logger *zap.Logger + eventsSvc *events.Service + + logger *zap.Logger messagesCounter *prometheus.CounterVec @@ -78,8 +74,9 @@ func NewService(params ServiceParams) *Service { messages: params.Messages, hashingTask: params.HashingTask, - pushSvc: params.PushSvc, - logger: params.Logger.Named("Service"), + eventsSvc: params.EventsSvc, + + logger: params.Logger.Named("Service"), messagesCounter: messagesCounter, @@ -223,7 +220,7 @@ func (s *Service) Enqueue(device models.Device, message MessageIn, opts EnqueueO s.messagesCounter.WithLabelValues(string(state.State)).Inc() go func(userID, deviceID string) { - if err := s.pushSvc.Notify(userID, &deviceID, push.NewMessageEnqueuedEvent()); err != nil { + if err := s.eventsSvc.Notify(userID, &deviceID, events.NewMessageEnqueuedEvent()); err != nil { s.logger.Error("can't notify device", zap.Error(err), zap.String("user_id", userID), zap.String("device_id", deviceID)) } }(device.UserID, device.ID) @@ -236,9 +233,9 @@ func (s *Service) ExportInbox(device models.Device, since, until time.Time) erro return errors.New("no push token") } - event := push.NewMessagesExportRequestedEvent(since, until) + event := events.NewMessagesExportRequestedEvent(since, until) - return s.pushSvc.Notify(device.UserID, &device.ID, event) + return s.eventsSvc.Notify(device.UserID, &device.ID, event) } func (s *Service) Clean(ctx context.Context) error { diff --git a/internal/sms-gateway/modules/push/domain/events.go b/internal/sms-gateway/modules/push/domain/events.go deleted file mode 100644 index 25f579d..0000000 --- a/internal/sms-gateway/modules/push/domain/events.go +++ /dev/null @@ -1,36 +0,0 @@ -package domain - -import ( - "encoding/json" - - "github.com/android-sms-gateway/client-go/smsgateway" -) - -type Event struct { - event smsgateway.PushEventType - data map[string]string -} - -func (e *Event) Event() smsgateway.PushEventType { - return e.event -} - -func (e *Event) Data() map[string]string { - return e.data -} - -func (e *Event) Map() map[string]string { - json, _ := json.Marshal(e.data) - - return map[string]string{ - "event": string(e.event), - "data": string(json), - } -} - -func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event { - return &Event{ - event: event, - data: data, - } -} diff --git a/internal/sms-gateway/modules/push/fcm/client.go b/internal/sms-gateway/modules/push/fcm/client.go index d06e2d2..3f3e294 100644 --- a/internal/sms-gateway/modules/push/fcm/client.go +++ b/internal/sms-gateway/modules/push/fcm/client.go @@ -7,7 +7,7 @@ import ( firebase "firebase.google.com/go/v4" "firebase.google.com/go/v4/messaging" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" "google.golang.org/api/option" ) @@ -52,11 +52,17 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) { +func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) { errs := make(map[string]error, len(messages)) for address, payload := range messages { - _, err := c.client.Send(ctx, &messaging.Message{ - Data: payload.Map(), + eventMap, err := eventToMap(payload) + if err != nil { + errs[address] = fmt.Errorf("can't marshal event: %w", err) + continue + } + + _, err = c.client.Send(ctx, &messaging.Message{ + Data: eventMap, Android: &messaging.AndroidConfig{ Priority: "high", }, diff --git a/internal/sms-gateway/modules/push/fcm/utils.go b/internal/sms-gateway/modules/push/fcm/utils.go new file mode 100644 index 0000000..d817950 --- /dev/null +++ b/internal/sms-gateway/modules/push/fcm/utils.go @@ -0,0 +1,20 @@ +package fcm + +import ( + "encoding/json" + "fmt" + + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" +) + +func eventToMap(event types.Event) (map[string]string, error) { + json, err := json.Marshal(event.Data) + if err != nil { + return nil, fmt.Errorf("can't marshal event data: %w", err) + } + + return map[string]string{ + "event": string(event.Type), + "data": string(json), + }, nil +} diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index c8d4a5b..69a120a 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -2,12 +2,10 @@ package push import ( "context" - "errors" "fmt" "time" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" "github.com/capcom6/go-helpers/cache" "github.com/capcom6/go-helpers/maps" @@ -33,8 +31,6 @@ type Params struct { Client client - DevicesSvc *devices.Service - Logger *zap.Logger } @@ -43,8 +39,6 @@ type Service struct { client client - devicesSvc *devices.Service - cache *cache.Cache[eventWrapper] blacklist *cache.Cache[struct{}] @@ -88,8 +82,6 @@ func New(params Params) *Service { config: params.Config, client: params.Client, - devicesSvc: params.DevicesSvc, - cache: cache.New[eventWrapper](cache.Config{}), blacklist: cache.New[struct{}](cache.Config{ TTL: blacklistTimeout, @@ -119,7 +111,7 @@ func (s *Service) Run(ctx context.Context) { } // Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0. -func (s *Service) Enqueue(token string, event *domain.Event) error { +func (s *Service) Enqueue(token string, event types.Event) error { if _, err := s.blacklist.Get(token); err == nil { s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc() s.logger.Debug("Skipping blacklisted token", zap.String("token", token)) @@ -128,7 +120,7 @@ func (s *Service) Enqueue(token string, event *domain.Event) error { wrapper := eventWrapper{ token: token, - event: event, + event: &event, retries: 0, } @@ -136,57 +128,11 @@ func (s *Service) Enqueue(token string, event *domain.Event) error { return fmt.Errorf("can't add message to cache: %w", err) } - s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc() + s.enqueuedCounter.WithLabelValues(string(event.Type)).Inc() return nil } -func (s *Service) Notify(userID string, deviceID *string, event *domain.Event) error { - logFields := []zap.Field{ - zap.String("user_id", userID), - } - if deviceID != nil { - logFields = append(logFields, zap.String("device_id", *deviceID)) - } - - s.logger.Info("Notifying devices", logFields...) - - var filters []devices.SelectFilter - if deviceID != nil { - filters = []devices.SelectFilter{devices.WithID(*deviceID)} - } - - devices, err := s.devicesSvc.Select(userID, filters...) - if err != nil { - return fmt.Errorf("failed to select devices: %w", err) - } - - if len(devices) == 0 { - s.logger.Info("No devices found", logFields...) - return nil - } - - errs := make([]error, 0, len(devices)) - notifiedCount := 0 - for _, device := range devices { - if device.PushToken == nil { - s.logger.Info("Device has no push token", zap.String("user_id", userID), zap.String("device_id", device.ID)) - continue - } - - if err := s.Enqueue(*device.PushToken, event); err != nil { - s.logger.Error("Failed to send push notification", zap.String("user_id", userID), zap.String("device_id", device.ID), zap.Error(err)) - errs = append(errs, err) - } else { - notifiedCount++ - } - } - - s.logger.Info("Notified devices", append(logFields, zap.Int("count", notifiedCount), zap.Int("total", len(devices)))...) - - return errors.Join(errs...) -} - // sendAll sends messages to all targets from the cache after initializing the service. func (s *Service) sendAll(ctx context.Context) { targets := s.cache.Drain() @@ -194,7 +140,7 @@ func (s *Service) sendAll(ctx context.Context) { return } - messages := maps.MapValues(targets, func(w eventWrapper) domain.Event { + messages := maps.MapValues(targets, func(w eventWrapper) types.Event { return *w.event }) diff --git a/internal/sms-gateway/modules/push/types.go b/internal/sms-gateway/modules/push/types.go index a0800eb..85f9a45 100644 --- a/internal/sms-gateway/modules/push/types.go +++ b/internal/sms-gateway/modules/push/types.go @@ -2,52 +2,27 @@ package push import ( "context" - "time" - "github.com/android-sms-gateway/client-go/smsgateway" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" ) type Mode string -type Event = domain.Event - -var NewEvent = domain.NewEvent const ( ModeFCM Mode = "fcm" ModeUpstream Mode = "upstream" ) +type Event = types.Event + type client interface { Open(ctx context.Context) error - Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) + Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) Close(ctx context.Context) error } type eventWrapper struct { token string - event *domain.Event + event *types.Event retries int } - -func NewMessageEnqueuedEvent() *domain.Event { - return domain.NewEvent(smsgateway.PushMessageEnqueued, nil) -} - -func NewWebhooksUpdatedEvent() *domain.Event { - return domain.NewEvent(smsgateway.PushWebhooksUpdated, nil) -} - -func NewMessagesExportRequestedEvent(since, until time.Time) *domain.Event { - return domain.NewEvent( - smsgateway.PushMessagesExportRequested, - map[string]string{ - "since": since.Format(time.RFC3339), - "until": until.Format(time.RFC3339), - }, - ) -} - -func NewSettingsUpdatedEvent() *domain.Event { - return domain.NewEvent(smsgateway.PushSettingsUpdated, nil) -} diff --git a/internal/sms-gateway/modules/push/types/types.go b/internal/sms-gateway/modules/push/types/types.go new file mode 100644 index 0000000..9057d96 --- /dev/null +++ b/internal/sms-gateway/modules/push/types/types.go @@ -0,0 +1,10 @@ +package types + +import ( + "github.com/android-sms-gateway/client-go/smsgateway" +) + +type Event struct { + Type smsgateway.PushEventType + Data map[string]string +} diff --git a/internal/sms-gateway/modules/push/upstream/client.go b/internal/sms-gateway/modules/push/upstream/client.go index 41f462b..8793508 100644 --- a/internal/sms-gateway/modules/push/upstream/client.go +++ b/internal/sms-gateway/modules/push/upstream/client.go @@ -10,7 +10,7 @@ import ( "sync" "github.com/android-sms-gateway/client-go/smsgateway" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" "github.com/capcom6/go-helpers/maps" ) @@ -42,14 +42,14 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) { +func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) { payload := make(smsgateway.UpstreamPushRequest, 0, len(messages)) for address, data := range messages { payload = append(payload, smsgateway.PushNotification{ Token: address, - Event: data.Event(), - Data: data.Data(), + Event: data.Type, + Data: data.Data, }) } @@ -84,8 +84,8 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (ma return nil, nil } -func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error { - return maps.MapValues(messages, func(e domain.Event) error { +func (c *Client) mapErrors(messages map[string]types.Event, err error) map[string]error { + return maps.MapValues(messages, func(e types.Event) error { return err }) } diff --git a/internal/sms-gateway/modules/settings/service.go b/internal/sms-gateway/modules/settings/service.go index ef409e8..1df4830 100644 --- a/internal/sms-gateway/modules/settings/service.go +++ b/internal/sms-gateway/modules/settings/service.go @@ -1,7 +1,7 @@ package settings import ( - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/events" "go.uber.org/fx" "go.uber.org/zap" ) @@ -11,17 +11,27 @@ type ServiceParams struct { Repository *repository - Logger *zap.Logger + EventsSvc *events.Service - PushSvc *push.Service + Logger *zap.Logger } type Service struct { settings *repository - logger *zap.Logger + eventsSvc *events.Service - pushSvc *push.Service + logger *zap.Logger +} + +func NewService(params ServiceParams) *Service { + return &Service{ + settings: params.Repository, + + eventsSvc: params.EventsSvc, + + logger: params.Logger.Named("service"), + } } func (s *Service) GetSettings(userID string, public bool) (map[string]any, error) { @@ -78,16 +88,8 @@ func (s *Service) ReplaceSettings(userID string, settings map[string]any) (map[s // notifyDevices asynchronously notifies all the user's devices. func (s *Service) notifyDevices(userID string) { go func(userID string) { - if err := s.pushSvc.Notify(userID, nil, push.NewSettingsUpdatedEvent()); err != nil { + if err := s.eventsSvc.Notify(userID, nil, events.NewSettingsUpdatedEvent()); err != nil { s.logger.Error("can't notify devices", zap.Error(err)) } }(userID) } - -func NewService(params ServiceParams) *Service { - return &Service{ - settings: params.Repository, - logger: params.Logger.Named("service"), - pushSvc: params.PushSvc, - } -} diff --git a/internal/sms-gateway/modules/sse/service.go b/internal/sms-gateway/modules/sse/service.go index b344ddb..7110f9b 100644 --- a/internal/sms-gateway/modules/sse/service.go +++ b/internal/sms-gateway/modules/sse/service.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/utils" "go.uber.org/zap" @@ -38,46 +37,30 @@ func NewService(config Config, logger *zap.Logger) *Service { } } -func (s *Service) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) { - errs := make(map[string]error) +func (s *Service) Send(deviceID string, event Event) error { s.mu.RLock() defer s.mu.RUnlock() - for deviceId, event := range messages { - conn, exists := s.connections[deviceId] - if !exists { - errs[deviceId] = fmt.Errorf("client not connected") - s.logger.Debug("Client not connected", zap.String("client_id", deviceId)) - continue - } - - data, err := json.Marshal(event.Map()) - if err != nil { - errs[deviceId] = fmt.Errorf("can't marshal payload: %w", err) - s.logger.Error("Failed to marshal event for client", - zap.String("client_id", deviceId), - zap.Any("event", event), - zap.Error(err)) - continue - } - - select { - case conn.channel <- data: - // Message sent successfully - case <-ctx.Done(): - errs[deviceId] = ctx.Err() - s.logger.Warn("Failed to send event to client", - zap.String("client_id", deviceId), - zap.Error(ctx.Err())) - case <-conn.closeSignal: - errs[deviceId] = fmt.Errorf("connection closed") - s.logger.Warn("Failed to send event to client", - zap.String("client_id", deviceId), - zap.Error(fmt.Errorf("connection closed"))) - } + conn, exists := s.connections[deviceID] + if !exists { + return fmt.Errorf("no connection for device %s", deviceID) } - return errs, nil + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("can't marshal event: %w", err) + } + + select { + case conn.channel <- data: + // Message sent successfully + case <-conn.closeSignal: + return fmt.Errorf("connection closed") + default: + return fmt.Errorf("connection buffer full") + } + + return nil } func (s *Service) Close(_ context.Context) error { diff --git a/internal/sms-gateway/modules/sse/types.go b/internal/sms-gateway/modules/sse/types.go new file mode 100644 index 0000000..471658f --- /dev/null +++ b/internal/sms-gateway/modules/sse/types.go @@ -0,0 +1,8 @@ +package sse + +import "github.com/android-sms-gateway/client-go/smsgateway" + +type Event struct { + Type smsgateway.PushEventType `json:"event"` + Data map[string]string `json:"data"` +} diff --git a/internal/sms-gateway/modules/webhooks/service.go b/internal/sms-gateway/modules/webhooks/service.go index ec55643..0e902bd 100644 --- a/internal/sms-gateway/modules/webhooks/service.go +++ b/internal/sms-gateway/modules/webhooks/service.go @@ -6,7 +6,7 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/db" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" - "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" + "github.com/android-sms-gateway/server/internal/sms-gateway/modules/events" "github.com/capcom6/go-helpers/slices" "go.uber.org/fx" "go.uber.org/zap" @@ -20,7 +20,7 @@ type ServiceParams struct { Webhooks *Repository DevicesSvc *devices.Service - PushSvc *push.Service + EventsSvc *events.Service Logger *zap.Logger } @@ -31,18 +31,21 @@ type Service struct { webhooks *Repository devicesSvc *devices.Service - pushSvc *push.Service + eventsSvc *events.Service logger *zap.Logger } func NewService(params ServiceParams) *Service { return &Service{ - idgen: params.IDGen, - webhooks: params.Webhooks, + idgen: params.IDGen, + + webhooks: params.Webhooks, + devicesSvc: params.DevicesSvc, - pushSvc: params.PushSvc, - logger: params.Logger, + eventsSvc: params.EventsSvc, + + logger: params.Logger, } } @@ -119,7 +122,7 @@ func (s *Service) Delete(userID string, filters ...SelectFilter) error { // notifyDevices asynchronously notifies all the user's devices. func (s *Service) notifyDevices(userID string, deviceID *string) { go func(userID string, deviceID *string) { - if err := s.pushSvc.Notify(userID, deviceID, push.NewWebhooksUpdatedEvent()); err != nil { + if err := s.eventsSvc.Notify(userID, deviceID, events.NewWebhooksUpdatedEvent()); err != nil { s.logger.Error("can't notify devices", zap.Error(err)) } }(userID, deviceID) diff --git a/pkg/swagger/docs/mobile.http b/pkg/swagger/docs/mobile.http index 9d8abf3..1662290 100644 --- a/pkg/swagger/docs/mobile.http +++ b/pkg/swagger/docs/mobile.http @@ -14,7 +14,7 @@ Authorization: Bearer {{mobileToken}} ### POST {{baseUrl}}/device HTTP/1.1 # Authorization: Bearer 123456789 -# Authorization: Basic {{credentials}} +Authorization: Basic {{credentials}} # Authorization: Code 065379 Content-Type: application/json