diff --git a/api/requests.http b/api/requests.http index ade3f74..b0f3a0a 100644 --- a/api/requests.http +++ b/api/requests.http @@ -33,10 +33,10 @@ Content-Type: application/json Authorization: Basic {{credentials}} { - "message": "17wc9/ZRf1l84LHkEK3hgA==.aH1XrMHAeMyF4PeiavV3dk8o2fP0nSo92IqseLQfg14=", + "message": "$aes-256-cbc/pbkdf2-sha1$i=75000$pb+tpPcF0nabV46wDeDMig==$ucdVkMrRYLQ0LAeoXQsWhrD36I9nnop8rRIh3dNmBhvg7Wc4Cwu3h9Petvp1dN3x", "ttl": 600, "phoneNumbers": [ - "xkQeXzSDFj2xP6JBUMK0pA==.PfUHEa9QZv8h7JnUoBlmWw==" + "$aes-256-cbc/pbkdf2-sha1$i=75000$ZWdiSMvGWJo/jixYGk+s+w==$tfP6sEeC4r/ux/JAglincQ==" ], "simNumber": 1, "withDeliveryReport": true, diff --git a/api/swagger.json b/api/swagger.json index 8258851..bb35043 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -97,10 +97,16 @@ } ], "responses": { - "201": { + "202": { "description": "Сообщение поставлено в очередь", "schema": { "$ref": "#/definitions/smsgateway.MessageState" + }, + "headers": { + "Location": { + "type": "string", + "description": "URL для получения состояния сообщения" + } } }, "400": { @@ -340,6 +346,11 @@ "maxLength": 36, "example": "PyDmBQZZXYmyxMwED8Fzy" }, + "isEncrypted": { + "description": "Зашифровано", + "type": "boolean", + "example": true + }, "message": { "description": "Текст сообщения", "type": "string", @@ -390,6 +401,11 @@ "maxLength": 36, "example": "PyDmBQZZXYmyxMwED8Fzy" }, + "isEncrypted": { + "description": "Зашифровано", + "type": "boolean", + "example": false + }, "isHashed": { "description": "Хэшировано", "type": "boolean", @@ -511,6 +527,7 @@ "phoneNumber": { "description": "Номер телефона или первые 16 символов SHA256", "type": "string", + "maxLength": 128, "minLength": 10, "example": "79990001234" }, diff --git a/api/swagger.yaml b/api/swagger.yaml index 8a0a0c2..589b454 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -19,6 +19,10 @@ definitions: example: PyDmBQZZXYmyxMwED8Fzy maxLength: 36 type: string + isEncrypted: + description: Зашифровано + example: true + type: boolean message: description: Текст сообщения example: Hello World! @@ -58,6 +62,10 @@ definitions: example: PyDmBQZZXYmyxMwED8Fzy maxLength: 36 type: string + isEncrypted: + description: Зашифровано + example: false + type: boolean isHashed: description: Хэшировано example: false @@ -150,6 +158,7 @@ definitions: phoneNumber: description: Номер телефона или первые 16 символов SHA256 example: "79990001234" + maxLength: 128 minLength: 10 type: string state: @@ -219,8 +228,12 @@ paths: produces: - application/json responses: - "201": + "202": description: Сообщение поставлено в очередь + headers: + Location: + description: URL для получения состояния сообщения + type: string schema: $ref: '#/definitions/smsgateway.MessageState' "400": diff --git a/configs/config.example.yml b/configs/config.example.yml index 0ad3f4b..f9a9a49 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -14,6 +14,8 @@ fcm: { ... } + timeout_seconds: 1 + debounce_seconds: 1 tasks: hashing: interval_seconds: 15 diff --git a/internal/config/config.go b/internal/config/config.go index a43a78d..353c49c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -23,6 +23,8 @@ type Database struct { type FCMConfig struct { CredentialsJSON string `yaml:"credentials_json"` + DebounceSeconds uint16 `yaml:"debounce_seconds"` + TimeoutSeconds uint16 `yaml:"timeout_seconds"` } type Tasks struct { diff --git a/internal/config/module.go b/internal/config/module.go index 0a2b288..2fee934 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -42,6 +42,8 @@ var Module = fx.Module( fx.Provide(func(cfg Config) services.PushServiceConfig { return services.PushServiceConfig{ CredentialsJSON: cfg.FCM.CredentialsJSON, + Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second, + Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second, } }), fx.Provide(func(cfg Config) tasks.HashingTaskConfig { diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index a396e17..70344fc 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -11,7 +11,6 @@ import ( "github.com/capcom6/go-infra-fx/validator" appconfig "github.com/capcom6/sms-gateway/internal/config" "github.com/capcom6/sms-gateway/internal/sms-gateway/handlers" - "github.com/capcom6/sms-gateway/internal/sms-gateway/models" "github.com/capcom6/sms-gateway/internal/sms-gateway/repositories" "github.com/capcom6/sms-gateway/internal/sms-gateway/services" "github.com/capcom6/sms-gateway/internal/sms-gateway/tasks" @@ -30,7 +29,6 @@ var Module = fx.Module( handlers.Module, services.Module, repositories.Module, - models.Module, db.Module, tasks.Module, ) @@ -57,6 +55,7 @@ type StartParams struct { Server *http.Server HashingTask *tasks.HashingTask + PushService *services.PushService } func Start(p StartParams) error { @@ -65,12 +64,17 @@ func Start(p StartParams) error { p.LC.Append(fx.Hook{ OnStart: func(_ context.Context) error { wg.Add(1) - go func() { defer wg.Done() p.HashingTask.Run(ctx) }() + wg.Add(1) + go func() { + defer wg.Done() + p.PushService.Run(ctx) + }() + wg.Add(1) go func() { defer wg.Done() diff --git a/internal/sms-gateway/handlers/3rdparty.go b/internal/sms-gateway/handlers/3rdparty.go index 67b73bf..1273cd9 100644 --- a/internal/sms-gateway/handlers/3rdparty.go +++ b/internal/sms-gateway/handlers/3rdparty.go @@ -13,6 +13,10 @@ import ( "go.uber.org/zap" ) +const ( + route3rdPartyGetMessage = "3rdparty.get.message" +) + type thirdPartyHandler struct { Handler @@ -27,10 +31,11 @@ type thirdPartyHandler struct { // @Accept json // @Produce json // @Param request body smsgateway.Message true "Сообщение" -// @Success 201 {object} smsgateway.MessageState "Сообщение поставлено в очередь" +// @Success 202 {object} smsgateway.MessageState "Сообщение поставлено в очередь" // @Failure 401 {object} smsgateway.ErrorResponse "Ошибка авторизации" // @Failure 400 {object} smsgateway.ErrorResponse "Некорректный запрос" // @Failure 500 {object} smsgateway.ErrorResponse "Внутренняя ошибка сервера" +// @Header 202 {string} Location "URL для получения состояния сообщения" // @Router /3rdparty/v1/message [post] // // Поставить сообщение в очередь @@ -55,7 +60,16 @@ func (h *thirdPartyHandler) postMessage(user models.User, c *fiber.Ctx) error { return err } - return c.Status(fiber.StatusCreated).JSON(state) + location, err := c.GetRouteURL(route3rdPartyGetMessage, fiber.Map{ + "id": state.ID, + }) + if err != nil { + h.Logger.Error("Failed to get route URL", zap.String("route", route3rdPartyGetMessage), zap.Error(err)) + } else { + c.Location(location) + } + + return c.Status(fiber.StatusAccepted).JSON(state) } // @Summary Получить состояние сообщения @@ -111,7 +125,7 @@ func (h *thirdPartyHandler) Register(router fiber.Router) { })) router.Post("/message", h.authorize(h.postMessage)) - router.Get("/message/:id", h.authorize(h.getMessage)) + router.Get("/message/:id", h.authorize(h.getMessage)).Name(route3rdPartyGetMessage) } func newThirdPartyHandler(logger *zap.Logger, validator *validator.Validate, authSvc *services.AuthService, messagesSvc *services.MessagesService) *thirdPartyHandler { diff --git a/internal/sms-gateway/handlers/module.go b/internal/sms-gateway/handlers/module.go index f6eea1e..34683c4 100644 --- a/internal/sms-gateway/handlers/module.go +++ b/internal/sms-gateway/handlers/module.go @@ -3,10 +3,14 @@ package handlers import ( "github.com/capcom6/go-infra-fx/http" "go.uber.org/fx" + "go.uber.org/zap" ) var Module = fx.Module( "handlers", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("handlers") + }), fx.Provide( http.AsRootHandler(newRootHandler), http.AsApiHandler(newThirdPartyHandler), diff --git a/internal/sms-gateway/models/module.go b/internal/sms-gateway/models/module.go index 6167608..de6510f 100644 --- a/internal/sms-gateway/models/module.go +++ b/internal/sms-gateway/models/module.go @@ -2,11 +2,6 @@ package models import ( "github.com/capcom6/go-infra-fx/db" - "go.uber.org/fx" -) - -var Module = fx.Module( - "models", ) func init() { diff --git a/internal/sms-gateway/repositories/module.go b/internal/sms-gateway/repositories/module.go index 4b53fe9..89b1343 100644 --- a/internal/sms-gateway/repositories/module.go +++ b/internal/sms-gateway/repositories/module.go @@ -1,9 +1,15 @@ package repositories -import "go.uber.org/fx" +import ( + "go.uber.org/fx" + "go.uber.org/zap" +) var Module = fx.Module( "repositories", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("repositories") + }), fx.Provide( NewDevicesRepository, NewMessagesRepository, diff --git a/internal/sms-gateway/services/messages.go b/internal/sms-gateway/services/messages.go index 08a8fe9..ac46252 100644 --- a/internal/sms-gateway/services/messages.go +++ b/internal/sms-gateway/services/messages.go @@ -4,7 +4,6 @@ import ( "context" "crypto/sha256" "fmt" - "log" "time" "github.com/capcom6/sms-gateway/internal/sms-gateway/models" @@ -14,6 +13,8 @@ import ( "github.com/capcom6/sms-gateway/pkg/types" "github.com/jaevor/go-nanoid" "github.com/nyaruka/phonenumbers" + "go.uber.org/fx" + "go.uber.org/zap" ) const ( @@ -26,19 +27,29 @@ func (e ErrValidation) Error() string { return string(e) } +type MessagesServiceParams struct { + fx.In + + Messages *repositories.MessagesRepository + PushSvc *PushService + Logger *zap.Logger +} + type MessagesService struct { Messages *repositories.MessagesRepository PushSvc *PushService + Logger *zap.Logger idgen func() string } -func NewMessagesService(pushSvc *PushService, messages *repositories.MessagesRepository) *MessagesService { +func NewMessagesService(params MessagesServiceParams) *MessagesService { idgen, _ := nanoid.Standard(21) return &MessagesService{ - Messages: messages, - PushSvc: pushSvc, + Messages: params.Messages, + PushSvc: params.PushSvc, + Logger: params.Logger, idgen: idgen, } } @@ -168,8 +179,8 @@ func (s *MessagesService) Enqeue(device models.Device, message smsgateway.Messag ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := s.PushSvc.Send(ctx, token, map[string]string{}); err != nil { - log.Printf("failed to send push to %s: %v", *device.PushToken, err) + if err := s.PushSvc.Enqueue(ctx, token, map[string]string{}); err != nil { + s.Logger.Error("Can't enqueue message", zap.String("token", token), zap.Error(err)) } }(*device.PushToken) diff --git a/internal/sms-gateway/services/module.go b/internal/sms-gateway/services/module.go index 6484802..0f1f471 100644 --- a/internal/sms-gateway/services/module.go +++ b/internal/sms-gateway/services/module.go @@ -1,9 +1,15 @@ package services -import "go.uber.org/fx" +import ( + "go.uber.org/fx" + "go.uber.org/zap" +) var Module = fx.Module( "services", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("services") + }), fx.Provide( NewAuthService, NewMessagesService, diff --git a/internal/sms-gateway/services/push.go b/internal/sms-gateway/services/push.go index 64eb2b3..4e58441 100644 --- a/internal/sms-gateway/services/push.go +++ b/internal/sms-gateway/services/push.go @@ -3,30 +3,54 @@ package services import ( "context" "sync" + "time" firebase "firebase.google.com/go/v4" "firebase.google.com/go/v4/messaging" + "github.com/capcom6/sms-gateway/pkg/types/cache" + "go.uber.org/fx" + "go.uber.org/zap" "google.golang.org/api/option" ) +type PushServiceParams struct { + fx.In + + Config PushServiceConfig + Logger *zap.Logger +} + type PushService struct { - CredentialsJSON string + Config PushServiceConfig + + Logger *zap.Logger client *messaging.Client mux sync.Mutex + + cache *cache.Cache[map[string]string] } type PushServiceConfig struct { CredentialsJSON string + Debounce time.Duration + Timeout time.Duration } -func NewPushService(config PushServiceConfig) *PushService { +// NewPushService creates a new PushService. +func NewPushService(params PushServiceParams) *PushService { + if params.Config.Timeout == 0 { + params.Config.Timeout = time.Second + } + return &PushService{ - CredentialsJSON: config.CredentialsJSON, + Config: params.Config, + Logger: params.Logger, + cache: cache.New[map[string]string](), } } -// init +// init initializes the FCM client. func (s *PushService) init(ctx context.Context) (err error) { s.mux.Lock() defer s.mux.Unlock() @@ -35,7 +59,7 @@ func (s *PushService) init(ctx context.Context) (err error) { return } - opt := option.WithCredentialsJSON([]byte(s.CredentialsJSON)) + opt := option.WithCredentialsJSON([]byte(s.Config.CredentialsJSON)) var app *firebase.App app, err = firebase.NewApp(ctx, nil, opt) @@ -49,12 +73,30 @@ func (s *PushService) init(ctx context.Context) (err error) { return } -// send -func (s *PushService) Send(ctx context.Context, token string, data map[string]string) error { +// sendAll sends messages to all targets from the cache after initializing the service. +func (s *PushService) sendAll(ctx context.Context) { if err := s.init(ctx); err != nil { - return err + s.Logger.Error("Can't init push service", zap.Error(err)) + return } + targets := s.cache.Drain() + if len(targets) == 0 { + return + } + + s.Logger.Info("Sending messages", zap.Int("count", len(targets))) + for token, data := range targets { + singleCtx, cancel := context.WithTimeout(ctx, s.Config.Timeout) + if err := s.sendSingle(singleCtx, token, data); err != nil { + s.Logger.Error("Can't send message", zap.String("token", token), zap.Error(err)) + } + cancel() + } +} + +// sendSingle sends a single message to the specified token +func (s *PushService) sendSingle(ctx context.Context, token string, data map[string]string) error { _, err := s.client.Send(ctx, &messaging.Message{ Data: data, Android: &messaging.AndroidConfig{ @@ -65,3 +107,33 @@ func (s *PushService) Send(ctx context.Context, token string, data map[string]st return err } + +// Run runs the service with the provided context if a debounce is set. +func (s *PushService) Run(ctx context.Context) { + if s.Config.Debounce == 0 { + return + } + + ticker := time.NewTicker(s.Config.Debounce) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.sendAll(ctx) + } + } +} + +// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0. +func (s *PushService) Enqueue(ctx context.Context, token string, data map[string]string) error { + s.cache.Set(token, data) + + if s.Config.Debounce == 0 { + s.sendAll(ctx) + } + + return nil +} diff --git a/pkg/types/cache/cache.go b/pkg/types/cache/cache.go new file mode 100644 index 0000000..416394c --- /dev/null +++ b/pkg/types/cache/cache.go @@ -0,0 +1,32 @@ +package cache + +import "sync" + +type Cache[T any] struct { + storage map[string]T + mux sync.RWMutex +} + +func New[T any]() *Cache[T] { + return &Cache[T]{ + storage: make(map[string]T), + } +} + +func (c *Cache[T]) Set(key string, value T) { + c.mux.Lock() + defer c.mux.Unlock() + + c.storage[key] = value +} + +func (c *Cache[T]) Drain() map[string]T { + c.mux.Lock() + defer c.mux.Unlock() + + storage := c.storage + + c.storage = make(map[string]T) + + return storage +} diff --git a/pkg/types/cache/cache_test.go b/pkg/types/cache/cache_test.go new file mode 100644 index 0000000..3029046 --- /dev/null +++ b/pkg/types/cache/cache_test.go @@ -0,0 +1,35 @@ +package cache + +import ( + "testing" +) + +func TestCacheSetAndGet(t *testing.T) { + cache := New[string]() + + key := "myKey" + value := "myValue" + + cache.Set(key, value) + + if cache.storage[key] != value { + t.Errorf("Set failed: expected value %v, got %v", value, cache.storage[key]) + } +} + +func TestCacheDrain(t *testing.T) { + cache := New[string]() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + drained := cache.Drain() + + if len(drained) != 2 || drained["key1"] != "value1" || drained["key2"] != "value2" { + t.Errorf("Drain failed: expected map[key1:value1 key2:value2], got %v", drained) + } + + if len(cache.storage) != 0 { + t.Errorf("Drain failed: expected empty cache storage, got %v", cache.storage) + } +}