From 4e6b4e7f28886ed5553fc6c272ea8a43c7634794 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Sun, 11 May 2025 10:32:38 +0700 Subject: [PATCH] [push] add send retries --- internal/sms-gateway/handlers/upstream.go | 10 +- internal/sms-gateway/modules/push/consts.go | 22 ++++ .../sms-gateway/modules/push/domain/events.go | 20 ++-- .../sms-gateway/modules/push/fcm/client.go | 9 +- internal/sms-gateway/modules/push/service.go | 102 +++++++++++++++--- internal/sms-gateway/modules/push/types.go | 10 +- .../modules/push/upstream/client.go | 24 +++-- pkg/swagger/docs/requests.http | 2 +- 8 files changed, 160 insertions(+), 39 deletions(-) create mode 100644 internal/sms-gateway/modules/push/consts.go diff --git a/internal/sms-gateway/handlers/upstream.go b/internal/sms-gateway/handlers/upstream.go index 0060b83..903a467 100644 --- a/internal/sms-gateway/handlers/upstream.go +++ b/internal/sms-gateway/handlers/upstream.go @@ -68,12 +68,12 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error { return err } - event := push.Event{ - Event: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued), - Data: v.Data, - } + event := push.NewEvent( + anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued), + v.Data, + ) - if err := h.pushSvc.Enqueue(v.Token, &event); err != nil { + if err := h.pushSvc.Enqueue(v.Token, event); err != nil { h.Logger.Error("Can't push message", zap.Error(err)) } } diff --git a/internal/sms-gateway/modules/push/consts.go b/internal/sms-gateway/modules/push/consts.go new file mode 100644 index 0000000..768c352 --- /dev/null +++ b/internal/sms-gateway/modules/push/consts.go @@ -0,0 +1,22 @@ +package push + +import "time" + +const ( + maxRetries = 3 + blacklistTimeout = 15 * time.Minute +) + +type RetryOutcome string + +const ( + RetryOutcomeRetried RetryOutcome = "retried" + RetryOutcomeMaxAttempts RetryOutcome = "max_attempts" +) + +type BlacklistOperation string + +const ( + BlacklistOperationAdded BlacklistOperation = "added" + BlacklistOperationSkipped BlacklistOperation = "skipped" +) diff --git a/internal/sms-gateway/modules/push/domain/events.go b/internal/sms-gateway/modules/push/domain/events.go index a264a44..25f579d 100644 --- a/internal/sms-gateway/modules/push/domain/events.go +++ b/internal/sms-gateway/modules/push/domain/events.go @@ -7,22 +7,30 @@ import ( ) type Event struct { - Event smsgateway.PushEventType - Data map[string]string + event smsgateway.PushEventType + data map[string]string +} + +func (e *Event) Event() smsgateway.PushEventType { + return e.event +} + +func (e *Event) Data() map[string]string { + return e.data } func (e *Event) Map() map[string]string { - json, _ := json.Marshal(e.Data) + json, _ := json.Marshal(e.data) return map[string]string{ - "event": string(e.Event), + "event": string(e.event), "data": string(json), } } func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event { return &Event{ - Event: event, - Data: data, + event: event, + data: data, } } diff --git a/internal/sms-gateway/modules/push/fcm/client.go b/internal/sms-gateway/modules/push/fcm/client.go index 930b379..d06e2d2 100644 --- a/internal/sms-gateway/modules/push/fcm/client.go +++ b/internal/sms-gateway/modules/push/fcm/client.go @@ -2,7 +2,6 @@ package fcm import ( "context" - "errors" "fmt" "sync" @@ -53,8 +52,8 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error { - errs := make([]error, 0, len(messages)) +func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) { + errs := make(map[string]error, len(messages)) for address, payload := range messages { _, err := c.client.Send(ctx, &messaging.Message{ Data: payload.Map(), @@ -65,11 +64,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err }) if err != nil { - errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err)) + errs[address] = fmt.Errorf("can't send message to %s: %w", address, err) } } - return errors.Join(errs...) + return errs, nil } func (c *Client) Close(ctx context.Context) error { diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index ca08523..1bfbc40 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -7,6 +7,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" "github.com/capcom6/go-helpers/cache" + "github.com/capcom6/go-helpers/maps" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -38,9 +39,12 @@ type Service struct { client client - cache *cache.Cache[domain.Event] + cache *cache.Cache[eventWrapper] + blacklist *cache.Cache[struct{}] - enqueuedCounter *prometheus.CounterVec + enqueuedCounter *prometheus.CounterVec + retriesCounter *prometheus.CounterVec + blacklistCounter *prometheus.CounterVec logger *zap.Logger } @@ -60,12 +64,34 @@ func New(params Params) *Service { Help: "Total number of messages enqueued", }, []string{"event"}) + retriesCounter := promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "push", + Name: "retries_total", + Help: "Total retry attempts", + }, []string{"outcome"}) + + blacklistCounter := promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sms", + Subsystem: "push", + Name: "blacklist_total", + Help: "Blacklist operations", + }, []string{"operation"}) + return &Service{ - config: params.Config, - client: params.Client, - cache: cache.New[domain.Event](cache.Config{}), - enqueuedCounter: enqueuedCounter, - logger: params.Logger, + config: params.Config, + client: params.Client, + + cache: cache.New[eventWrapper](cache.Config{}), + blacklist: cache.New[struct{}](cache.Config{ + TTL: blacklistTimeout, + }), + + enqueuedCounter: enqueuedCounter, + retriesCounter: retriesCounter, + blacklistCounter: blacklistCounter, + + logger: params.Logger, } } @@ -86,11 +112,23 @@ func (s *Service) Run(ctx context.Context) { // Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0. func (s *Service) Enqueue(token string, event *domain.Event) error { - if err := s.cache.Set(token, *event); err != nil { + if _, err := s.blacklist.Get(token); err == nil { + s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc() + s.logger.Debug("Skipping blacklisted token", zap.String("token", token)) + return nil + } + + wrapper := eventWrapper{ + token: token, + event: event, + retries: 0, + } + + if err := s.cache.Set(token, wrapper); err != nil { return fmt.Errorf("can't add message to cache: %w", err) } - s.enqueuedCounter.WithLabelValues(string(event.Event)).Inc() + s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc() return nil } @@ -102,10 +140,48 @@ func (s *Service) sendAll(ctx context.Context) { return } - s.logger.Info("Sending messages", zap.Int("count", len(targets))) + messages := maps.MapValues(targets, func(w eventWrapper) domain.Event { + return *w.event + }) + + s.logger.Info("Sending messages", zap.Int("count", len(messages))) ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) - if err := s.client.Send(ctx, targets); err != nil { - s.logger.Error("Can't send messages", zap.Error(err)) + defer cancel() + + errs, err := s.client.Send(ctx, messages) + if len(errs) == 0 && err == nil { + s.logger.Info("Messages sent successfully", zap.Int("count", len(messages))) + return + } + + if err != nil { + s.logger.Error("Can't send messages", zap.Error(err)) + return + } + + for token, sendErr := range errs { + s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token)) + + wrapper := targets[token] + wrapper.retries++ + + if wrapper.retries >= maxRetries { + if err := s.blacklist.Set(token, struct{}{}); err != nil { + s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err)) + } + + s.blacklistCounter.WithLabelValues(string(BlacklistOperationAdded)).Inc() + s.retriesCounter.WithLabelValues(string(RetryOutcomeMaxAttempts)).Inc() + s.logger.Warn("Retries exceeded, blacklisting token", + zap.String("token", token), + zap.Duration("ttl", blacklistTimeout)) + continue + } + + if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil { + s.logger.Info("Can't set message to cache", zap.Error(setErr)) + } + + s.retriesCounter.WithLabelValues(string(RetryOutcomeRetried)).Inc() } - cancel() } diff --git a/internal/sms-gateway/modules/push/types.go b/internal/sms-gateway/modules/push/types.go index 8b85e3f..2ebacd4 100644 --- a/internal/sms-gateway/modules/push/types.go +++ b/internal/sms-gateway/modules/push/types.go @@ -11,6 +11,8 @@ import ( type Mode string type Event = domain.Event +var NewEvent = domain.NewEvent + const ( ModeFCM Mode = "fcm" ModeUpstream Mode = "upstream" @@ -18,10 +20,16 @@ const ( type client interface { Open(ctx context.Context) error - Send(ctx context.Context, messages map[string]domain.Event) error + Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) Close(ctx context.Context) error } +type eventWrapper struct { + token string + event *domain.Event + retries int +} + func NewMessageEnqueuedEvent() *domain.Event { return domain.NewEvent(smsgateway.PushMessageEnqueued, nil) } diff --git a/internal/sms-gateway/modules/push/upstream/client.go b/internal/sms-gateway/modules/push/upstream/client.go index 72c79a3..41f462b 100644 --- a/internal/sms-gateway/modules/push/upstream/client.go +++ b/internal/sms-gateway/modules/push/upstream/client.go @@ -11,6 +11,7 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain" + "github.com/capcom6/go-helpers/maps" ) const BASE_URL = "https://api.sms-gate.app/upstream/v1" @@ -41,25 +42,26 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error { +func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) { payload := make(smsgateway.UpstreamPushRequest, 0, len(messages)) for address, data := range messages { payload = append(payload, smsgateway.PushNotification{ Token: address, - Event: data.Event, - Data: data.Data, + Event: data.Event(), + Data: data.Data(), }) } payloadBytes, err := json.Marshal(payload) + if err != nil { - return fmt.Errorf("can't marshal payload: %w", err) + return nil, fmt.Errorf("can't marshal payload: %w", err) } req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes)) if err != nil { - return fmt.Errorf("can't create request: %w", err) + return nil, fmt.Errorf("can't create request: %w", err) } req.Header.Set("Content-Type", "application/json") @@ -67,7 +69,7 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err resp, err := c.client.Do(req) if err != nil { - return fmt.Errorf("can't send request: %w", err) + return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil } defer func() { @@ -76,10 +78,16 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err }() if resp.StatusCode >= 400 { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return c.mapErrors(messages, fmt.Errorf("unexpected status code: %d", resp.StatusCode)), nil } - return nil + return nil, nil +} + +func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error { + return maps.MapValues(messages, func(e domain.Event) error { + return err + }) } func (c *Client) Close(ctx context.Context) error { diff --git a/pkg/swagger/docs/requests.http b/pkg/swagger/docs/requests.http index 7d99959..fb6963a 100644 --- a/pkg/swagger/docs/requests.http +++ b/pkg/swagger/docs/requests.http @@ -21,7 +21,7 @@ Authorization: Basic {{credentials}} "{{phone}}" ], "withDeliveryReport": true, - "priority": 128, + "priority": 127, "simNumber": {{$randomInt 1 2}} }