[push] add send retries

This commit is contained in:
Aleksandr Soloshenko 2025-05-11 10:32:38 +07:00 committed by Aleksandr
parent 7f35b0114b
commit 4e6b4e7f28
8 changed files with 160 additions and 39 deletions

View File

@ -68,12 +68,12 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error {
return err
}
event := push.Event{
Event: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
Data: v.Data,
}
event := push.NewEvent(
anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
v.Data,
)
if err := h.pushSvc.Enqueue(v.Token, &event); err != nil {
if err := h.pushSvc.Enqueue(v.Token, event); err != nil {
h.Logger.Error("Can't push message", zap.Error(err))
}
}

View File

@ -0,0 +1,22 @@
package push
import "time"
const (
maxRetries = 3
blacklistTimeout = 15 * time.Minute
)
type RetryOutcome string
const (
RetryOutcomeRetried RetryOutcome = "retried"
RetryOutcomeMaxAttempts RetryOutcome = "max_attempts"
)
type BlacklistOperation string
const (
BlacklistOperationAdded BlacklistOperation = "added"
BlacklistOperationSkipped BlacklistOperation = "skipped"
)

View File

@ -7,22 +7,30 @@ import (
)
type Event struct {
Event smsgateway.PushEventType
Data map[string]string
event smsgateway.PushEventType
data map[string]string
}
func (e *Event) Event() smsgateway.PushEventType {
return e.event
}
func (e *Event) Data() map[string]string {
return e.data
}
func (e *Event) Map() map[string]string {
json, _ := json.Marshal(e.Data)
json, _ := json.Marshal(e.data)
return map[string]string{
"event": string(e.Event),
"event": string(e.event),
"data": string(json),
}
}
func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event {
return &Event{
Event: event,
Data: data,
event: event,
data: data,
}
}

View File

@ -2,7 +2,6 @@ package fcm
import (
"context"
"errors"
"fmt"
"sync"
@ -53,8 +52,8 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
errs := make([]error, 0, len(messages))
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
errs := make(map[string]error, len(messages))
for address, payload := range messages {
_, err := c.client.Send(ctx, &messaging.Message{
Data: payload.Map(),
@ -65,11 +64,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
})
if err != nil {
errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err))
errs[address] = fmt.Errorf("can't send message to %s: %w", address, err)
}
}
return errors.Join(errs...)
return errs, nil
}
func (c *Client) Close(ctx context.Context) error {

View File

@ -7,6 +7,7 @@ import (
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/capcom6/go-helpers/cache"
"github.com/capcom6/go-helpers/maps"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -38,9 +39,12 @@ type Service struct {
client client
cache *cache.Cache[domain.Event]
cache *cache.Cache[eventWrapper]
blacklist *cache.Cache[struct{}]
enqueuedCounter *prometheus.CounterVec
enqueuedCounter *prometheus.CounterVec
retriesCounter *prometheus.CounterVec
blacklistCounter *prometheus.CounterVec
logger *zap.Logger
}
@ -60,12 +64,34 @@ func New(params Params) *Service {
Help: "Total number of messages enqueued",
}, []string{"event"})
retriesCounter := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "push",
Name: "retries_total",
Help: "Total retry attempts",
}, []string{"outcome"})
blacklistCounter := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "push",
Name: "blacklist_total",
Help: "Blacklist operations",
}, []string{"operation"})
return &Service{
config: params.Config,
client: params.Client,
cache: cache.New[domain.Event](cache.Config{}),
enqueuedCounter: enqueuedCounter,
logger: params.Logger,
config: params.Config,
client: params.Client,
cache: cache.New[eventWrapper](cache.Config{}),
blacklist: cache.New[struct{}](cache.Config{
TTL: blacklistTimeout,
}),
enqueuedCounter: enqueuedCounter,
retriesCounter: retriesCounter,
blacklistCounter: blacklistCounter,
logger: params.Logger,
}
}
@ -86,11 +112,23 @@ func (s *Service) Run(ctx context.Context) {
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
func (s *Service) Enqueue(token string, event *domain.Event) error {
if err := s.cache.Set(token, *event); err != nil {
if _, err := s.blacklist.Get(token); err == nil {
s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc()
s.logger.Debug("Skipping blacklisted token", zap.String("token", token))
return nil
}
wrapper := eventWrapper{
token: token,
event: event,
retries: 0,
}
if err := s.cache.Set(token, wrapper); err != nil {
return fmt.Errorf("can't add message to cache: %w", err)
}
s.enqueuedCounter.WithLabelValues(string(event.Event)).Inc()
s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc()
return nil
}
@ -102,10 +140,48 @@ func (s *Service) sendAll(ctx context.Context) {
return
}
s.logger.Info("Sending messages", zap.Int("count", len(targets)))
messages := maps.MapValues(targets, func(w eventWrapper) domain.Event {
return *w.event
})
s.logger.Info("Sending messages", zap.Int("count", len(messages)))
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
if err := s.client.Send(ctx, targets); err != nil {
s.logger.Error("Can't send messages", zap.Error(err))
defer cancel()
errs, err := s.client.Send(ctx, messages)
if len(errs) == 0 && err == nil {
s.logger.Info("Messages sent successfully", zap.Int("count", len(messages)))
return
}
if err != nil {
s.logger.Error("Can't send messages", zap.Error(err))
return
}
for token, sendErr := range errs {
s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token))
wrapper := targets[token]
wrapper.retries++
if wrapper.retries >= maxRetries {
if err := s.blacklist.Set(token, struct{}{}); err != nil {
s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err))
}
s.blacklistCounter.WithLabelValues(string(BlacklistOperationAdded)).Inc()
s.retriesCounter.WithLabelValues(string(RetryOutcomeMaxAttempts)).Inc()
s.logger.Warn("Retries exceeded, blacklisting token",
zap.String("token", token),
zap.Duration("ttl", blacklistTimeout))
continue
}
if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil {
s.logger.Info("Can't set message to cache", zap.Error(setErr))
}
s.retriesCounter.WithLabelValues(string(RetryOutcomeRetried)).Inc()
}
cancel()
}

View File

@ -11,6 +11,8 @@ import (
type Mode string
type Event = domain.Event
var NewEvent = domain.NewEvent
const (
ModeFCM Mode = "fcm"
ModeUpstream Mode = "upstream"
@ -18,10 +20,16 @@ const (
type client interface {
Open(ctx context.Context) error
Send(ctx context.Context, messages map[string]domain.Event) error
Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error)
Close(ctx context.Context) error
}
type eventWrapper struct {
token string
event *domain.Event
retries int
}
func NewMessageEnqueuedEvent() *domain.Event {
return domain.NewEvent(smsgateway.PushMessageEnqueued, nil)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/capcom6/go-helpers/maps"
)
const BASE_URL = "https://api.sms-gate.app/upstream/v1"
@ -41,25 +42,26 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
for address, data := range messages {
payload = append(payload, smsgateway.PushNotification{
Token: address,
Event: data.Event,
Data: data.Data,
Event: data.Event(),
Data: data.Data(),
})
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("can't marshal payload: %w", err)
return nil, fmt.Errorf("can't marshal payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("can't create request: %w", err)
return nil, fmt.Errorf("can't create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
@ -67,7 +69,7 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("can't send request: %w", err)
return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil
}
defer func() {
@ -76,10 +78,16 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
}()
if resp.StatusCode >= 400 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
return c.mapErrors(messages, fmt.Errorf("unexpected status code: %d", resp.StatusCode)), nil
}
return nil
return nil, nil
}
func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error {
return maps.MapValues(messages, func(e domain.Event) error {
return err
})
}
func (c *Client) Close(ctx context.Context) error {

View File

@ -21,7 +21,7 @@ Authorization: Basic {{credentials}}
"{{phone}}"
],
"withDeliveryReport": true,
"priority": 128,
"priority": 127,
"simNumber": {{$randomInt 1 2}}
}