[events] introduce events module as a proxy between the app and push/sse

This commit is contained in:
Aleksandr Soloshenko 2025-07-22 11:43:21 +07:00 committed by Aleksandr
parent d86a0e4234
commit 4bcfc35c87
23 changed files with 767 additions and 214 deletions

View File

@ -0,0 +1,351 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"target": {
"limit": 100,
"matchAny": false,
"tags": [],
"type": "dashboard"
},
"type": "dashboard"
}
]
},
"description": "Dashboard for monitoring event notification metrics",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 12,
"links": [],
"panels": [
{
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.2.0-16711121739",
"targets": [
{
"expr": "sum by (event) (rate(sms_events_enqueued_total[5m]))",
"legendFormat": "{{event}}",
"refId": "A"
}
],
"title": "Enqueued Events (per event type)",
"type": "timeseries"
},
{
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "normal"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 3,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.2.0-16711121739",
"targets": [
{
"expr": "sum by (delivery_type) (rate(sms_events_sent_total[5m]))",
"legendFormat": "{{delivery_type}}",
"refId": "A"
}
],
"title": "Sent Events (by delivery type)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
},
"mappings": []
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 4,
"options": {
"legend": {
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.2.0-16711121739",
"targets": [
{
"editorMode": "code",
"exemplar": false,
"expr": "sum by (delivery_type, reason) (sms_events_failed_total)",
"format": "heatmap",
"instant": true,
"legendFormat": "{{delivery_type}} - {{reason}}",
"range": false,
"refId": "A"
}
],
"title": "Failure Analysis (by delivery type and reason)",
"type": "piechart"
},
{
"datasource": {
"type": "prometheus",
"uid": "edqp0a73uh2bka"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"max": 100,
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "red",
"value": 0
},
{
"color": "green",
"value": 90
}
]
},
"unit": "percent"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"id": 5,
"options": {
"minVizHeight": 75,
"minVizWidth": 75,
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true,
"sizing": "auto"
},
"pluginVersion": "12.2.0-16711121739",
"targets": [
{
"editorMode": "code",
"expr": "100 * (sum(sms_events_sent_total) / (sum(sms_events_sent_total) + sum(sms_events_failed_total)))",
"format": "table",
"instant": true,
"refId": "A"
}
],
"title": "Delivery Success Rate",
"type": "gauge"
}
],
"preload": false,
"refresh": "auto",
"schemaVersion": 41,
"tags": [
"push",
"sms",
"sse"
],
"templating": {
"list": []
},
"time": {
"from": "now-24h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Event Notifications",
"uid": "9886a96d-9329-4110-b80b-3a6aa8216520",
"version": 5
}

View File

@ -0,0 +1,11 @@
groups:
- name: events-alerts
rules:
- alert: LowEventDeliverySuccessRate
expr: 100 * (sum(rate(sms_events_sent_total[5m])) / (sum(rate(sms_events_sent_total[5m])) + sum(rate(sms_events_failed_total[5m]))) < 90
for: 10m
labels:
severity: critical
annotations:
summary: "Event delivery success rate below 90%"
description: "Event delivery success rate is at {{ $value }}%"

View File

@ -10,6 +10,7 @@ import (
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/cleaner"
appdb "github.com/android-sms-gateway/server/internal/sms-gateway/modules/db"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/events"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/health"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/metrics"
@ -39,6 +40,7 @@ var Module = fx.Module(
auth.Module,
push.Module,
db.Module,
events.Module,
messages.Module,
health.Module,
webhooks.Module,

View File

@ -68,10 +68,10 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error {
return err
}
event := push.NewEvent(
anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
v.Data,
)
event := push.Event{
Type: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
Data: v.Data,
}
if err := h.pushSvc.Enqueue(v.Token, event); err != nil {
h.Logger.Error("Can't push message", zap.Error(err))

View File

@ -0,0 +1,29 @@
package events
import (
"time"
"github.com/android-sms-gateway/client-go/smsgateway"
)
func NewMessageEnqueuedEvent() *Event {
return NewEvent(smsgateway.PushMessageEnqueued, nil)
}
func NewWebhooksUpdatedEvent() *Event {
return NewEvent(smsgateway.PushWebhooksUpdated, nil)
}
func NewMessagesExportRequestedEvent(since, until time.Time) *Event {
return NewEvent(
smsgateway.PushMessagesExportRequested,
map[string]string{
"since": since.Format(time.RFC3339),
"until": until.Format(time.RFC3339),
},
)
}
func NewSettingsUpdatedEvent() *Event {
return NewEvent(smsgateway.PushSettingsUpdated, nil)
}

View File

@ -0,0 +1,70 @@
package events
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Metric constants
const (
MetricEnqueuedTotal = "enqueued_total"
MetricSentTotal = "sent_total"
MetricFailedTotal = "failed_total"
LabelEvent = "event"
LabelDeliveryType = "delivery_type"
LabelReason = "reason"
DeliveryTypePush = "push"
DeliveryTypeSSE = "sse"
DeliveryTypeUnknown = "unknown"
FailureReasonQueueFull = "queue_full"
FailureReasonProviderFailed = "provider_failed"
)
// metrics contains all Prometheus metrics for the events module
type metrics struct {
enqueuedCounter *prometheus.CounterVec
sentCounter *prometheus.CounterVec
failedCounter *prometheus.CounterVec
}
// newMetrics creates and initializes all events metrics
func newMetrics() *metrics {
return &metrics{
enqueuedCounter: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "events",
Name: MetricEnqueuedTotal,
Help: "Total number of events enqueued",
}, []string{LabelEvent}),
sentCounter: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "events",
Name: MetricSentTotal,
Help: "Total number of events sent",
}, []string{LabelEvent, LabelDeliveryType}),
failedCounter: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "events",
Name: MetricFailedTotal,
Help: "Total number of failed notifications",
}, []string{LabelEvent, LabelDeliveryType, LabelReason}),
}
}
// IncrementEnqueued increments the enqueued counter for the given event type
func (m *metrics) IncrementEnqueued(eventType string) {
m.enqueuedCounter.WithLabelValues(eventType).Inc()
}
// IncrementSent increments the sent counter for the given event type and delivery type
func (m *metrics) IncrementSent(eventType string, deliveryType string) {
m.sentCounter.WithLabelValues(eventType, deliveryType).Inc()
}
// IncrementFailed increments the failed counter for the given event type, delivery type, and reason
func (m *metrics) IncrementFailed(eventType string, deliveryType string, reason string) {
m.failedCounter.WithLabelValues(eventType, deliveryType, reason).Inc()
}

View File

@ -0,0 +1,30 @@
package events
import (
"context"
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"events",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("events")
}),
fx.Provide(newMetrics, fx.Private),
fx.Provide(NewService),
fx.Invoke(func(lc fx.Lifecycle, svc *Service) {
ctx, cancel := context.WithCancel(context.Background())
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go svc.Run(ctx)
return nil
},
OnStop: func(_ context.Context) error {
cancel()
return nil
},
})
}),
)

View File

@ -0,0 +1,116 @@
package events
import (
"context"
"fmt"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse"
"go.uber.org/zap"
)
type Service struct {
deviceSvc *devices.Service
sseSvc *sse.Service
pushSvc *push.Service
queue chan eventWrapper
metrics *metrics
logger *zap.Logger
}
func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, metrics *metrics, logger *zap.Logger) *Service {
return &Service{
deviceSvc: devicesSvc,
sseSvc: sseSvc,
pushSvc: pushSvc,
metrics: metrics,
queue: make(chan eventWrapper, 128),
logger: logger,
}
}
func (s *Service) Notify(userID string, deviceID *string, event *Event) error {
wrapper := eventWrapper{
UserID: userID,
DeviceID: deviceID,
Event: event,
}
select {
case s.queue <- wrapper:
// Successfully enqueued
s.metrics.IncrementEnqueued(string(event.eventType))
default:
s.metrics.IncrementFailed(string(event.eventType), DeliveryTypeUnknown, FailureReasonQueueFull)
return fmt.Errorf("event queue is full")
}
return nil
}
func (s *Service) Run(ctx context.Context) {
for {
select {
case wrapper := <-s.queue:
s.processEvent(wrapper)
case <-ctx.Done():
s.logger.Info("Event service stopped")
return
}
}
}
func (s *Service) processEvent(wrapper eventWrapper) {
// Load devices from database
filters := []devices.SelectFilter{}
if wrapper.DeviceID != nil {
filters = append(filters, devices.WithID(*wrapper.DeviceID))
}
devices, err := s.deviceSvc.Select(wrapper.UserID, filters...)
if err != nil {
s.logger.Error("Failed to select devices", zap.String("user_id", wrapper.UserID), zap.Error(err))
return
}
if len(devices) == 0 {
s.logger.Info("No devices found for user", zap.String("user_id", wrapper.UserID))
return
}
// Process each device
for _, device := range devices {
if device.PushToken != nil && *device.PushToken != "" {
// Device has push token, use push service
if err := s.pushSvc.Enqueue(*device.PushToken, push.Event{
Type: wrapper.Event.eventType,
Data: wrapper.Event.data,
}); err != nil {
s.logger.Error("Failed to enqueue push notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err))
s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypePush, FailureReasonProviderFailed)
} else {
s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypePush)
}
continue
}
// No push token, use SSE service
if err := s.sseSvc.Send(device.ID, sse.Event{
Type: wrapper.Event.eventType,
Data: wrapper.Event.data,
}); err != nil {
s.logger.Error("Failed to send SSE notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err))
s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypeSSE, FailureReasonProviderFailed)
} else {
s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypeSSE)
}
}
}

View File

@ -0,0 +1,23 @@
package events
import (
"github.com/android-sms-gateway/client-go/smsgateway"
)
type Event struct {
eventType smsgateway.PushEventType
data map[string]string
}
func NewEvent(eventType smsgateway.PushEventType, data map[string]string) *Event {
return &Event{
eventType: eventType,
data: data,
}
}
type eventWrapper struct {
UserID string
DeviceID *string
Event *Event
}

View File

@ -0,0 +1,7 @@
package messages
type ErrValidation string
func (e ErrValidation) Error() string {
return string(e)
}

View File

@ -11,7 +11,7 @@ import (
"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/models"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/db"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/events"
"github.com/capcom6/go-helpers/anys"
"github.com/capcom6/go-helpers/slices"
"github.com/nyaruka/phonenumbers"
@ -26,12 +26,6 @@ const (
ErrorTTLExpired = "TTL expired"
)
type ErrValidation string
func (e ErrValidation) Error() string {
return string(e)
}
type EnqueueOptions struct {
SkipPhoneValidation bool
}
@ -46,8 +40,9 @@ type ServiceParams struct {
Messages *repository
HashingTask *HashingTask
PushSvc *push.Service
Logger *zap.Logger
EventsSvc *events.Service
Logger *zap.Logger
}
type Service struct {
@ -56,8 +51,9 @@ type Service struct {
messages *repository
hashingTask *HashingTask
pushSvc *push.Service
logger *zap.Logger
eventsSvc *events.Service
logger *zap.Logger
messagesCounter *prometheus.CounterVec
@ -78,8 +74,9 @@ func NewService(params ServiceParams) *Service {
messages: params.Messages,
hashingTask: params.HashingTask,
pushSvc: params.PushSvc,
logger: params.Logger.Named("Service"),
eventsSvc: params.EventsSvc,
logger: params.Logger.Named("Service"),
messagesCounter: messagesCounter,
@ -223,7 +220,7 @@ func (s *Service) Enqueue(device models.Device, message MessageIn, opts EnqueueO
s.messagesCounter.WithLabelValues(string(state.State)).Inc()
go func(userID, deviceID string) {
if err := s.pushSvc.Notify(userID, &deviceID, push.NewMessageEnqueuedEvent()); err != nil {
if err := s.eventsSvc.Notify(userID, &deviceID, events.NewMessageEnqueuedEvent()); err != nil {
s.logger.Error("can't notify device", zap.Error(err), zap.String("user_id", userID), zap.String("device_id", deviceID))
}
}(device.UserID, device.ID)
@ -236,9 +233,9 @@ func (s *Service) ExportInbox(device models.Device, since, until time.Time) erro
return errors.New("no push token")
}
event := push.NewMessagesExportRequestedEvent(since, until)
event := events.NewMessagesExportRequestedEvent(since, until)
return s.pushSvc.Notify(device.UserID, &device.ID, event)
return s.eventsSvc.Notify(device.UserID, &device.ID, event)
}
func (s *Service) Clean(ctx context.Context) error {

View File

@ -1,36 +0,0 @@
package domain
import (
"encoding/json"
"github.com/android-sms-gateway/client-go/smsgateway"
)
type Event struct {
event smsgateway.PushEventType
data map[string]string
}
func (e *Event) Event() smsgateway.PushEventType {
return e.event
}
func (e *Event) Data() map[string]string {
return e.data
}
func (e *Event) Map() map[string]string {
json, _ := json.Marshal(e.data)
return map[string]string{
"event": string(e.event),
"data": string(json),
}
}
func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event {
return &Event{
event: event,
data: data,
}
}

View File

@ -7,7 +7,7 @@ import (
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
"google.golang.org/api/option"
)
@ -52,11 +52,17 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) {
errs := make(map[string]error, len(messages))
for address, payload := range messages {
_, err := c.client.Send(ctx, &messaging.Message{
Data: payload.Map(),
eventMap, err := eventToMap(payload)
if err != nil {
errs[address] = fmt.Errorf("can't marshal event: %w", err)
continue
}
_, err = c.client.Send(ctx, &messaging.Message{
Data: eventMap,
Android: &messaging.AndroidConfig{
Priority: "high",
},

View File

@ -0,0 +1,20 @@
package fcm
import (
"encoding/json"
"fmt"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
)
func eventToMap(event types.Event) (map[string]string, error) {
json, err := json.Marshal(event.Data)
if err != nil {
return nil, fmt.Errorf("can't marshal event data: %w", err)
}
return map[string]string{
"event": string(event.Type),
"data": string(json),
}, nil
}

View File

@ -2,12 +2,10 @@ package push
import (
"context"
"errors"
"fmt"
"time"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
"github.com/capcom6/go-helpers/cache"
"github.com/capcom6/go-helpers/maps"
@ -33,8 +31,6 @@ type Params struct {
Client client
DevicesSvc *devices.Service
Logger *zap.Logger
}
@ -43,8 +39,6 @@ type Service struct {
client client
devicesSvc *devices.Service
cache *cache.Cache[eventWrapper]
blacklist *cache.Cache[struct{}]
@ -88,8 +82,6 @@ func New(params Params) *Service {
config: params.Config,
client: params.Client,
devicesSvc: params.DevicesSvc,
cache: cache.New[eventWrapper](cache.Config{}),
blacklist: cache.New[struct{}](cache.Config{
TTL: blacklistTimeout,
@ -119,7 +111,7 @@ func (s *Service) Run(ctx context.Context) {
}
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
func (s *Service) Enqueue(token string, event *domain.Event) error {
func (s *Service) Enqueue(token string, event types.Event) error {
if _, err := s.blacklist.Get(token); err == nil {
s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc()
s.logger.Debug("Skipping blacklisted token", zap.String("token", token))
@ -128,7 +120,7 @@ func (s *Service) Enqueue(token string, event *domain.Event) error {
wrapper := eventWrapper{
token: token,
event: event,
event: &event,
retries: 0,
}
@ -136,57 +128,11 @@ func (s *Service) Enqueue(token string, event *domain.Event) error {
return fmt.Errorf("can't add message to cache: %w", err)
}
s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc()
s.enqueuedCounter.WithLabelValues(string(event.Type)).Inc()
return nil
}
func (s *Service) Notify(userID string, deviceID *string, event *domain.Event) error {
logFields := []zap.Field{
zap.String("user_id", userID),
}
if deviceID != nil {
logFields = append(logFields, zap.String("device_id", *deviceID))
}
s.logger.Info("Notifying devices", logFields...)
var filters []devices.SelectFilter
if deviceID != nil {
filters = []devices.SelectFilter{devices.WithID(*deviceID)}
}
devices, err := s.devicesSvc.Select(userID, filters...)
if err != nil {
return fmt.Errorf("failed to select devices: %w", err)
}
if len(devices) == 0 {
s.logger.Info("No devices found", logFields...)
return nil
}
errs := make([]error, 0, len(devices))
notifiedCount := 0
for _, device := range devices {
if device.PushToken == nil {
s.logger.Info("Device has no push token", zap.String("user_id", userID), zap.String("device_id", device.ID))
continue
}
if err := s.Enqueue(*device.PushToken, event); err != nil {
s.logger.Error("Failed to send push notification", zap.String("user_id", userID), zap.String("device_id", device.ID), zap.Error(err))
errs = append(errs, err)
} else {
notifiedCount++
}
}
s.logger.Info("Notified devices", append(logFields, zap.Int("count", notifiedCount), zap.Int("total", len(devices)))...)
return errors.Join(errs...)
}
// sendAll sends messages to all targets from the cache after initializing the service.
func (s *Service) sendAll(ctx context.Context) {
targets := s.cache.Drain()
@ -194,7 +140,7 @@ func (s *Service) sendAll(ctx context.Context) {
return
}
messages := maps.MapValues(targets, func(w eventWrapper) domain.Event {
messages := maps.MapValues(targets, func(w eventWrapper) types.Event {
return *w.event
})

View File

@ -2,52 +2,27 @@ package push
import (
"context"
"time"
"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
)
type Mode string
type Event = domain.Event
var NewEvent = domain.NewEvent
const (
ModeFCM Mode = "fcm"
ModeUpstream Mode = "upstream"
)
type Event = types.Event
type client interface {
Open(ctx context.Context) error
Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error)
Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error)
Close(ctx context.Context) error
}
type eventWrapper struct {
token string
event *domain.Event
event *types.Event
retries int
}
func NewMessageEnqueuedEvent() *domain.Event {
return domain.NewEvent(smsgateway.PushMessageEnqueued, nil)
}
func NewWebhooksUpdatedEvent() *domain.Event {
return domain.NewEvent(smsgateway.PushWebhooksUpdated, nil)
}
func NewMessagesExportRequestedEvent(since, until time.Time) *domain.Event {
return domain.NewEvent(
smsgateway.PushMessagesExportRequested,
map[string]string{
"since": since.Format(time.RFC3339),
"until": until.Format(time.RFC3339),
},
)
}
func NewSettingsUpdatedEvent() *domain.Event {
return domain.NewEvent(smsgateway.PushSettingsUpdated, nil)
}

View File

@ -0,0 +1,10 @@
package types
import (
"github.com/android-sms-gateway/client-go/smsgateway"
)
type Event struct {
Type smsgateway.PushEventType
Data map[string]string
}

View File

@ -10,7 +10,7 @@ import (
"sync"
"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
"github.com/capcom6/go-helpers/maps"
)
@ -42,14 +42,14 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) {
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
for address, data := range messages {
payload = append(payload, smsgateway.PushNotification{
Token: address,
Event: data.Event(),
Data: data.Data(),
Event: data.Type,
Data: data.Data,
})
}
@ -84,8 +84,8 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (ma
return nil, nil
}
func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error {
return maps.MapValues(messages, func(e domain.Event) error {
func (c *Client) mapErrors(messages map[string]types.Event, err error) map[string]error {
return maps.MapValues(messages, func(e types.Event) error {
return err
})
}

View File

@ -1,7 +1,7 @@
package settings
import (
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/events"
"go.uber.org/fx"
"go.uber.org/zap"
)
@ -11,17 +11,27 @@ type ServiceParams struct {
Repository *repository
Logger *zap.Logger
EventsSvc *events.Service
PushSvc *push.Service
Logger *zap.Logger
}
type Service struct {
settings *repository
logger *zap.Logger
eventsSvc *events.Service
pushSvc *push.Service
logger *zap.Logger
}
func NewService(params ServiceParams) *Service {
return &Service{
settings: params.Repository,
eventsSvc: params.EventsSvc,
logger: params.Logger.Named("service"),
}
}
func (s *Service) GetSettings(userID string, public bool) (map[string]any, error) {
@ -78,16 +88,8 @@ func (s *Service) ReplaceSettings(userID string, settings map[string]any) (map[s
// notifyDevices asynchronously notifies all the user's devices.
func (s *Service) notifyDevices(userID string) {
go func(userID string) {
if err := s.pushSvc.Notify(userID, nil, push.NewSettingsUpdatedEvent()); err != nil {
if err := s.eventsSvc.Notify(userID, nil, events.NewSettingsUpdatedEvent()); err != nil {
s.logger.Error("can't notify devices", zap.Error(err))
}
}(userID)
}
func NewService(params ServiceParams) *Service {
return &Service{
settings: params.Repository,
logger: params.Logger.Named("service"),
pushSvc: params.PushSvc,
}
}

View File

@ -8,7 +8,6 @@ import (
"sync"
"time"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/utils"
"go.uber.org/zap"
@ -38,46 +37,30 @@ func NewService(config Config, logger *zap.Logger) *Service {
}
}
func (s *Service) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
errs := make(map[string]error)
func (s *Service) Send(deviceID string, event Event) error {
s.mu.RLock()
defer s.mu.RUnlock()
for deviceId, event := range messages {
conn, exists := s.connections[deviceId]
if !exists {
errs[deviceId] = fmt.Errorf("client not connected")
s.logger.Debug("Client not connected", zap.String("client_id", deviceId))
continue
}
data, err := json.Marshal(event.Map())
if err != nil {
errs[deviceId] = fmt.Errorf("can't marshal payload: %w", err)
s.logger.Error("Failed to marshal event for client",
zap.String("client_id", deviceId),
zap.Any("event", event),
zap.Error(err))
continue
}
select {
case conn.channel <- data:
// Message sent successfully
case <-ctx.Done():
errs[deviceId] = ctx.Err()
s.logger.Warn("Failed to send event to client",
zap.String("client_id", deviceId),
zap.Error(ctx.Err()))
case <-conn.closeSignal:
errs[deviceId] = fmt.Errorf("connection closed")
s.logger.Warn("Failed to send event to client",
zap.String("client_id", deviceId),
zap.Error(fmt.Errorf("connection closed")))
}
conn, exists := s.connections[deviceID]
if !exists {
return fmt.Errorf("no connection for device %s", deviceID)
}
return errs, nil
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("can't marshal event: %w", err)
}
select {
case conn.channel <- data:
// Message sent successfully
case <-conn.closeSignal:
return fmt.Errorf("connection closed")
default:
return fmt.Errorf("connection buffer full")
}
return nil
}
func (s *Service) Close(_ context.Context) error {

View File

@ -0,0 +1,8 @@
package sse
import "github.com/android-sms-gateway/client-go/smsgateway"
type Event struct {
Type smsgateway.PushEventType `json:"event"`
Data map[string]string `json:"data"`
}

View File

@ -6,7 +6,7 @@ import (
"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/db"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/events"
"github.com/capcom6/go-helpers/slices"
"go.uber.org/fx"
"go.uber.org/zap"
@ -20,7 +20,7 @@ type ServiceParams struct {
Webhooks *Repository
DevicesSvc *devices.Service
PushSvc *push.Service
EventsSvc *events.Service
Logger *zap.Logger
}
@ -31,18 +31,21 @@ type Service struct {
webhooks *Repository
devicesSvc *devices.Service
pushSvc *push.Service
eventsSvc *events.Service
logger *zap.Logger
}
func NewService(params ServiceParams) *Service {
return &Service{
idgen: params.IDGen,
webhooks: params.Webhooks,
idgen: params.IDGen,
webhooks: params.Webhooks,
devicesSvc: params.DevicesSvc,
pushSvc: params.PushSvc,
logger: params.Logger,
eventsSvc: params.EventsSvc,
logger: params.Logger,
}
}
@ -119,7 +122,7 @@ func (s *Service) Delete(userID string, filters ...SelectFilter) error {
// notifyDevices asynchronously notifies all the user's devices.
func (s *Service) notifyDevices(userID string, deviceID *string) {
go func(userID string, deviceID *string) {
if err := s.pushSvc.Notify(userID, deviceID, push.NewWebhooksUpdatedEvent()); err != nil {
if err := s.eventsSvc.Notify(userID, deviceID, events.NewWebhooksUpdatedEvent()); err != nil {
s.logger.Error("can't notify devices", zap.Error(err))
}
}(userID, deviceID)

View File

@ -14,7 +14,7 @@ Authorization: Bearer {{mobileToken}}
###
POST {{baseUrl}}/device HTTP/1.1
# Authorization: Bearer 123456789
# Authorization: Basic {{credentials}}
Authorization: Basic {{credentials}}
# Authorization: Code 065379
Content-Type: application/json