Merge pull request #30 from capcom6/feature/push-debounce

Added: push notifications debounce
This commit is contained in:
capcom6 2024-02-06 19:49:44 +07:00 committed by GitHub
commit cc41bfe5c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 246 additions and 31 deletions

View File

@ -33,10 +33,10 @@ Content-Type: application/json
Authorization: Basic {{credentials}}
{
"message": "17wc9/ZRf1l84LHkEK3hgA==.aH1XrMHAeMyF4PeiavV3dk8o2fP0nSo92IqseLQfg14=",
"message": "$aes-256-cbc/pbkdf2-sha1$i=75000$pb+tpPcF0nabV46wDeDMig==$ucdVkMrRYLQ0LAeoXQsWhrD36I9nnop8rRIh3dNmBhvg7Wc4Cwu3h9Petvp1dN3x",
"ttl": 600,
"phoneNumbers": [
"xkQeXzSDFj2xP6JBUMK0pA==.PfUHEa9QZv8h7JnUoBlmWw=="
"$aes-256-cbc/pbkdf2-sha1$i=75000$ZWdiSMvGWJo/jixYGk+s+w==$tfP6sEeC4r/ux/JAglincQ=="
],
"simNumber": 1,
"withDeliveryReport": true,

View File

@ -97,10 +97,16 @@
}
],
"responses": {
"201": {
"202": {
"description": "Сообщение поставлено в очередь",
"schema": {
"$ref": "#/definitions/smsgateway.MessageState"
},
"headers": {
"Location": {
"type": "string",
"description": "URL для получения состояния сообщения"
}
}
},
"400": {
@ -340,6 +346,11 @@
"maxLength": 36,
"example": "PyDmBQZZXYmyxMwED8Fzy"
},
"isEncrypted": {
"description": "Зашифровано",
"type": "boolean",
"example": true
},
"message": {
"description": "Текст сообщения",
"type": "string",
@ -390,6 +401,11 @@
"maxLength": 36,
"example": "PyDmBQZZXYmyxMwED8Fzy"
},
"isEncrypted": {
"description": "Зашифровано",
"type": "boolean",
"example": false
},
"isHashed": {
"description": "Хэшировано",
"type": "boolean",
@ -511,6 +527,7 @@
"phoneNumber": {
"description": "Номер телефона или первые 16 символов SHA256",
"type": "string",
"maxLength": 128,
"minLength": 10,
"example": "79990001234"
},

View File

@ -19,6 +19,10 @@ definitions:
example: PyDmBQZZXYmyxMwED8Fzy
maxLength: 36
type: string
isEncrypted:
description: Зашифровано
example: true
type: boolean
message:
description: Текст сообщения
example: Hello World!
@ -58,6 +62,10 @@ definitions:
example: PyDmBQZZXYmyxMwED8Fzy
maxLength: 36
type: string
isEncrypted:
description: Зашифровано
example: false
type: boolean
isHashed:
description: Хэшировано
example: false
@ -150,6 +158,7 @@ definitions:
phoneNumber:
description: Номер телефона или первые 16 символов SHA256
example: "79990001234"
maxLength: 128
minLength: 10
type: string
state:
@ -219,8 +228,12 @@ paths:
produces:
- application/json
responses:
"201":
"202":
description: Сообщение поставлено в очередь
headers:
Location:
description: URL для получения состояния сообщения
type: string
schema:
$ref: '#/definitions/smsgateway.MessageState'
"400":

View File

@ -14,6 +14,8 @@ fcm:
{
...
}
timeout_seconds: 1
debounce_seconds: 1
tasks:
hashing:
interval_seconds: 15

View File

@ -23,6 +23,8 @@ type Database struct {
type FCMConfig struct {
CredentialsJSON string `yaml:"credentials_json"`
DebounceSeconds uint16 `yaml:"debounce_seconds"`
TimeoutSeconds uint16 `yaml:"timeout_seconds"`
}
type Tasks struct {

View File

@ -42,6 +42,8 @@ var Module = fx.Module(
fx.Provide(func(cfg Config) services.PushServiceConfig {
return services.PushServiceConfig{
CredentialsJSON: cfg.FCM.CredentialsJSON,
Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second,
Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second,
}
}),
fx.Provide(func(cfg Config) tasks.HashingTaskConfig {

View File

@ -11,7 +11,6 @@ import (
"github.com/capcom6/go-infra-fx/validator"
appconfig "github.com/capcom6/sms-gateway/internal/config"
"github.com/capcom6/sms-gateway/internal/sms-gateway/handlers"
"github.com/capcom6/sms-gateway/internal/sms-gateway/models"
"github.com/capcom6/sms-gateway/internal/sms-gateway/repositories"
"github.com/capcom6/sms-gateway/internal/sms-gateway/services"
"github.com/capcom6/sms-gateway/internal/sms-gateway/tasks"
@ -30,7 +29,6 @@ var Module = fx.Module(
handlers.Module,
services.Module,
repositories.Module,
models.Module,
db.Module,
tasks.Module,
)
@ -57,6 +55,7 @@ type StartParams struct {
Server *http.Server
HashingTask *tasks.HashingTask
PushService *services.PushService
}
func Start(p StartParams) error {
@ -65,12 +64,17 @@ func Start(p StartParams) error {
p.LC.Append(fx.Hook{
OnStart: func(_ context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
p.HashingTask.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
p.PushService.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()

View File

@ -13,6 +13,10 @@ import (
"go.uber.org/zap"
)
const (
route3rdPartyGetMessage = "3rdparty.get.message"
)
type thirdPartyHandler struct {
Handler
@ -27,10 +31,11 @@ type thirdPartyHandler struct {
// @Accept json
// @Produce json
// @Param request body smsgateway.Message true "Сообщение"
// @Success 201 {object} smsgateway.MessageState "Сообщение поставлено в очередь"
// @Success 202 {object} smsgateway.MessageState "Сообщение поставлено в очередь"
// @Failure 401 {object} smsgateway.ErrorResponse "Ошибка авторизации"
// @Failure 400 {object} smsgateway.ErrorResponse "Некорректный запрос"
// @Failure 500 {object} smsgateway.ErrorResponse "Внутренняя ошибка сервера"
// @Header 202 {string} Location "URL для получения состояния сообщения"
// @Router /3rdparty/v1/message [post]
//
// Поставить сообщение в очередь
@ -55,7 +60,16 @@ func (h *thirdPartyHandler) postMessage(user models.User, c *fiber.Ctx) error {
return err
}
return c.Status(fiber.StatusCreated).JSON(state)
location, err := c.GetRouteURL(route3rdPartyGetMessage, fiber.Map{
"id": state.ID,
})
if err != nil {
h.Logger.Error("Failed to get route URL", zap.String("route", route3rdPartyGetMessage), zap.Error(err))
} else {
c.Location(location)
}
return c.Status(fiber.StatusAccepted).JSON(state)
}
// @Summary Получить состояние сообщения
@ -111,7 +125,7 @@ func (h *thirdPartyHandler) Register(router fiber.Router) {
}))
router.Post("/message", h.authorize(h.postMessage))
router.Get("/message/:id", h.authorize(h.getMessage))
router.Get("/message/:id", h.authorize(h.getMessage)).Name(route3rdPartyGetMessage)
}
func newThirdPartyHandler(logger *zap.Logger, validator *validator.Validate, authSvc *services.AuthService, messagesSvc *services.MessagesService) *thirdPartyHandler {

View File

@ -3,10 +3,14 @@ package handlers
import (
"github.com/capcom6/go-infra-fx/http"
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"handlers",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("handlers")
}),
fx.Provide(
http.AsRootHandler(newRootHandler),
http.AsApiHandler(newThirdPartyHandler),

View File

@ -2,11 +2,6 @@ package models
import (
"github.com/capcom6/go-infra-fx/db"
"go.uber.org/fx"
)
var Module = fx.Module(
"models",
)
func init() {

View File

@ -1,9 +1,15 @@
package repositories
import "go.uber.org/fx"
import (
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"repositories",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("repositories")
}),
fx.Provide(
NewDevicesRepository,
NewMessagesRepository,

View File

@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"fmt"
"log"
"time"
"github.com/capcom6/sms-gateway/internal/sms-gateway/models"
@ -14,6 +13,8 @@ import (
"github.com/capcom6/sms-gateway/pkg/types"
"github.com/jaevor/go-nanoid"
"github.com/nyaruka/phonenumbers"
"go.uber.org/fx"
"go.uber.org/zap"
)
const (
@ -26,19 +27,29 @@ func (e ErrValidation) Error() string {
return string(e)
}
type MessagesServiceParams struct {
fx.In
Messages *repositories.MessagesRepository
PushSvc *PushService
Logger *zap.Logger
}
type MessagesService struct {
Messages *repositories.MessagesRepository
PushSvc *PushService
Logger *zap.Logger
idgen func() string
}
func NewMessagesService(pushSvc *PushService, messages *repositories.MessagesRepository) *MessagesService {
func NewMessagesService(params MessagesServiceParams) *MessagesService {
idgen, _ := nanoid.Standard(21)
return &MessagesService{
Messages: messages,
PushSvc: pushSvc,
Messages: params.Messages,
PushSvc: params.PushSvc,
Logger: params.Logger,
idgen: idgen,
}
}
@ -168,8 +179,8 @@ func (s *MessagesService) Enqeue(device models.Device, message smsgateway.Messag
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.PushSvc.Send(ctx, token, map[string]string{}); err != nil {
log.Printf("failed to send push to %s: %v", *device.PushToken, err)
if err := s.PushSvc.Enqueue(ctx, token, map[string]string{}); err != nil {
s.Logger.Error("Can't enqueue message", zap.String("token", token), zap.Error(err))
}
}(*device.PushToken)

View File

@ -1,9 +1,15 @@
package services
import "go.uber.org/fx"
import (
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"services",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("services")
}),
fx.Provide(
NewAuthService,
NewMessagesService,

View File

@ -3,30 +3,54 @@ package services
import (
"context"
"sync"
"time"
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
"github.com/capcom6/sms-gateway/pkg/types/cache"
"go.uber.org/fx"
"go.uber.org/zap"
"google.golang.org/api/option"
)
type PushServiceParams struct {
fx.In
Config PushServiceConfig
Logger *zap.Logger
}
type PushService struct {
CredentialsJSON string
Config PushServiceConfig
Logger *zap.Logger
client *messaging.Client
mux sync.Mutex
cache *cache.Cache[map[string]string]
}
type PushServiceConfig struct {
CredentialsJSON string
Debounce time.Duration
Timeout time.Duration
}
func NewPushService(config PushServiceConfig) *PushService {
// NewPushService creates a new PushService.
func NewPushService(params PushServiceParams) *PushService {
if params.Config.Timeout == 0 {
params.Config.Timeout = time.Second
}
return &PushService{
CredentialsJSON: config.CredentialsJSON,
Config: params.Config,
Logger: params.Logger,
cache: cache.New[map[string]string](),
}
}
// init
// init initializes the FCM client.
func (s *PushService) init(ctx context.Context) (err error) {
s.mux.Lock()
defer s.mux.Unlock()
@ -35,7 +59,7 @@ func (s *PushService) init(ctx context.Context) (err error) {
return
}
opt := option.WithCredentialsJSON([]byte(s.CredentialsJSON))
opt := option.WithCredentialsJSON([]byte(s.Config.CredentialsJSON))
var app *firebase.App
app, err = firebase.NewApp(ctx, nil, opt)
@ -49,12 +73,30 @@ func (s *PushService) init(ctx context.Context) (err error) {
return
}
// send
func (s *PushService) Send(ctx context.Context, token string, data map[string]string) error {
// sendAll sends messages to all targets from the cache after initializing the service.
func (s *PushService) sendAll(ctx context.Context) {
if err := s.init(ctx); err != nil {
return err
s.Logger.Error("Can't init push service", zap.Error(err))
return
}
targets := s.cache.Drain()
if len(targets) == 0 {
return
}
s.Logger.Info("Sending messages", zap.Int("count", len(targets)))
for token, data := range targets {
singleCtx, cancel := context.WithTimeout(ctx, s.Config.Timeout)
if err := s.sendSingle(singleCtx, token, data); err != nil {
s.Logger.Error("Can't send message", zap.String("token", token), zap.Error(err))
}
cancel()
}
}
// sendSingle sends a single message to the specified token
func (s *PushService) sendSingle(ctx context.Context, token string, data map[string]string) error {
_, err := s.client.Send(ctx, &messaging.Message{
Data: data,
Android: &messaging.AndroidConfig{
@ -65,3 +107,33 @@ func (s *PushService) Send(ctx context.Context, token string, data map[string]st
return err
}
// Run runs the service with the provided context if a debounce is set.
func (s *PushService) Run(ctx context.Context) {
if s.Config.Debounce == 0 {
return
}
ticker := time.NewTicker(s.Config.Debounce)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.sendAll(ctx)
}
}
}
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
func (s *PushService) Enqueue(ctx context.Context, token string, data map[string]string) error {
s.cache.Set(token, data)
if s.Config.Debounce == 0 {
s.sendAll(ctx)
}
return nil
}

32
pkg/types/cache/cache.go vendored Normal file
View File

@ -0,0 +1,32 @@
package cache
import "sync"
type Cache[T any] struct {
storage map[string]T
mux sync.RWMutex
}
func New[T any]() *Cache[T] {
return &Cache[T]{
storage: make(map[string]T),
}
}
func (c *Cache[T]) Set(key string, value T) {
c.mux.Lock()
defer c.mux.Unlock()
c.storage[key] = value
}
func (c *Cache[T]) Drain() map[string]T {
c.mux.Lock()
defer c.mux.Unlock()
storage := c.storage
c.storage = make(map[string]T)
return storage
}

35
pkg/types/cache/cache_test.go vendored Normal file
View File

@ -0,0 +1,35 @@
package cache
import (
"testing"
)
func TestCacheSetAndGet(t *testing.T) {
cache := New[string]()
key := "myKey"
value := "myValue"
cache.Set(key, value)
if cache.storage[key] != value {
t.Errorf("Set failed: expected value %v, got %v", value, cache.storage[key])
}
}
func TestCacheDrain(t *testing.T) {
cache := New[string]()
cache.Set("key1", "value1")
cache.Set("key2", "value2")
drained := cache.Drain()
if len(drained) != 2 || drained["key1"] != "value1" || drained["key2"] != "value2" {
t.Errorf("Drain failed: expected map[key1:value1 key2:value2], got %v", drained)
}
if len(cache.storage) != 0 {
t.Errorf("Drain failed: expected empty cache storage, got %v", cache.storage)
}
}