[push] unify broadcast notifications

This commit is contained in:
Aleksandr Soloshenko 2025-05-22 21:00:19 +07:00 committed by Aleksandr
parent dfa12a46f5
commit dfe1341ec8
2 changed files with 59 additions and 39 deletions

View File

@ -2,9 +2,11 @@ package push
import (
"context"
"errors"
"fmt"
"time"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/capcom6/go-helpers/cache"
"github.com/capcom6/go-helpers/maps"
@ -31,6 +33,8 @@ type Params struct {
Client client
DevicesSvc *devices.Service
Logger *zap.Logger
}
@ -39,6 +43,8 @@ type Service struct {
client client
devicesSvc *devices.Service
cache *cache.Cache[eventWrapper]
blacklist *cache.Cache[struct{}]
@ -82,6 +88,8 @@ func New(params Params) *Service {
config: params.Config,
client: params.Client,
devicesSvc: params.DevicesSvc,
cache: cache.New[eventWrapper](cache.Config{}),
blacklist: cache.New[struct{}](cache.Config{
TTL: blacklistTimeout,
@ -133,6 +141,50 @@ func (s *Service) Enqueue(token string, event *domain.Event) error {
return nil
}
func (s *Service) Notify(userID string, deviceID *string, event *domain.Event) error {
logFields := []zap.Field{
zap.String("user_id", userID),
}
if deviceID != nil {
logFields = append(logFields, zap.String("device_id", *deviceID))
}
s.logger.Info("Notifying devices", logFields...)
var filters []devices.SelectFilter
if deviceID != nil {
filters = []devices.SelectFilter{devices.WithID(*deviceID)}
}
devices, err := s.devicesSvc.Select(userID, filters...)
if err != nil {
s.logger.Error("Failed to select devices", append(logFields, zap.Error(err))...)
return fmt.Errorf("failed to select devices: %w", err)
}
if len(devices) == 0 {
s.logger.Info("No devices found", logFields...)
return nil
}
errs := make([]error, 0, len(devices))
for _, device := range devices {
if device.PushToken == nil {
s.logger.Info("Device has no push token", zap.String("user_id", userID), zap.String("device_id", device.ID))
continue
}
if err := s.Enqueue(*device.PushToken, event); err != nil {
s.logger.Error("Failed to send push notification", zap.String("user_id", userID), zap.String("device_id", device.ID), zap.Error(err))
errs = append(errs, err)
}
}
s.logger.Info("Notified devices", append(logFields, zap.Int("count", len(devices)))...)
return errors.Join(errs...)
}
// sendAll sends messages to all targets from the cache after initializing the service.
func (s *Service) sendAll(ctx context.Context) {
targets := s.cache.Drain()

View File

@ -98,7 +98,7 @@ func (s *Service) Replace(userID string, webhook smsgateway.Webhook) error {
return fmt.Errorf("can't replace webhook: %w", err)
}
go s.notifyDevices(userID, webhook.DeviceID)
s.notifyDevices(userID, webhook.DeviceID)
return nil
}
@ -111,48 +111,16 @@ func (s *Service) Delete(userID string, filters ...SelectFilter) error {
return fmt.Errorf("can't delete webhooks: %w", err)
}
go s.notifyDevices(userID, nil)
s.notifyDevices(userID, nil)
return nil
}
// notifyDevices sends a push notification to all devices associated with the given user.
// notifyDevices asynchronously notifies all the user's devices.
func (s *Service) notifyDevices(userID string, deviceID *string) {
logFields := []zap.Field{
zap.String("user_id", userID),
}
if deviceID != nil {
logFields = append(logFields, zap.String("device_id", *deviceID))
}
s.logger.Info("Notifying devices", logFields...)
var filters []devices.SelectFilter
if deviceID != nil {
filters = []devices.SelectFilter{devices.WithID(*deviceID)}
}
devices, err := s.devicesSvc.Select(userID, filters...)
if err != nil {
s.logger.Error("Failed to select devices", append(logFields, zap.Error(err))...)
return
}
if len(devices) == 0 {
s.logger.Info("No devices found", logFields...)
return
}
for _, device := range devices {
if device.PushToken == nil {
s.logger.Info("Device has no push token", zap.String("user_id", userID), zap.String("device_id", device.ID))
continue
go func(userID string, deviceID *string) {
if err := s.pushSvc.Notify(userID, deviceID, push.NewWebhooksUpdatedEvent()); err != nil {
s.logger.Error("can't notify devices", zap.Error(err))
}
if err := s.pushSvc.Enqueue(*device.PushToken, push.NewWebhooksUpdatedEvent()); err != nil {
s.logger.Error("Failed to send push notification", zap.String("user_id", userID), zap.String("device_id", device.ID), zap.Error(err))
}
}
s.logger.Info("Notified devices", append(logFields, zap.Int("count", len(devices)))...)
}(userID, deviceID)
}