diff --git a/api/requests.http b/api/requests.http index 061c1bc..7d4c9d8 100644 --- a/api/requests.http +++ b/api/requests.http @@ -73,3 +73,13 @@ Content-Type: application/json ] } ] + +### +POST {{baseUrl}}/api/upstream/v1/push HTTP/1.1 +Content-Type: application/json + +[ + { + "token": "eTxx88nfSla87gZuJcW5mS:APA91bHGxVgSqqRtxwFHD1q9em5Oa6xSP4gO_OZRrqOoP1wjf_7UMfXKsc4uws6rWkqn73jYCc1owyATB1v61mqak4ntpqtmRkNtTey7NQXa0Wz3uQZBWY-Ecbn2rWG2VJRihOzXRId-" + } +] \ No newline at end of file diff --git a/internal/config/module.go b/internal/config/module.go index 9bb11fd..d6da29f 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -6,6 +6,7 @@ import ( "github.com/capcom6/go-infra-fx/config" "github.com/capcom6/go-infra-fx/db" "github.com/capcom6/go-infra-fx/http" + "github.com/capcom6/sms-gateway/internal/sms-gateway/handlers" "github.com/capcom6/sms-gateway/internal/sms-gateway/modules/auth" "github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push" "github.com/capcom6/sms-gateway/internal/sms-gateway/tasks" @@ -54,11 +55,6 @@ var Module = fx.Module( Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second, Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second, } - // return services.PushServiceConfig{ - // CredentialsJSON: cfg.FCM.CredentialsJSON, - // Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second, - // Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second, - // } }), fx.Provide(func(cfg Config) tasks.HashingTaskConfig { return tasks.HashingTaskConfig{ @@ -71,4 +67,9 @@ var Module = fx.Module( PrivateToken: cfg.Gateway.PrivateToken, } }), + fx.Provide(func(cfg Config) handlers.Config { + return handlers.Config{ + GatewayMode: handlers.GatewayMode(cfg.Gateway.Mode), + } + }), ) diff --git a/internal/sms-gateway/handlers/config.go b/internal/sms-gateway/handlers/config.go new file mode 100644 index 0000000..e63f842 --- /dev/null +++ b/internal/sms-gateway/handlers/config.go @@ -0,0 +1,12 @@ +package handlers + +type GatewayMode string + +const ( + GatewayModePrivate GatewayMode = "private" + GatewayModePublic GatewayMode = "public" +) + +type Config struct { + GatewayMode GatewayMode +} diff --git a/internal/sms-gateway/handlers/module.go b/internal/sms-gateway/handlers/module.go index 34683c4..64906bc 100644 --- a/internal/sms-gateway/handlers/module.go +++ b/internal/sms-gateway/handlers/module.go @@ -15,5 +15,6 @@ var Module = fx.Module( http.AsRootHandler(newRootHandler), http.AsApiHandler(newThirdPartyHandler), http.AsApiHandler(newMobileHandler), + http.AsApiHandler(newUpstreamHandler), ), ) diff --git a/internal/sms-gateway/handlers/upstream.go b/internal/sms-gateway/handlers/upstream.go new file mode 100644 index 0000000..190886c --- /dev/null +++ b/internal/sms-gateway/handlers/upstream.go @@ -0,0 +1,77 @@ +package handlers + +import ( + "time" + + "github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push" + "github.com/capcom6/sms-gateway/pkg/smsgateway" + "github.com/go-playground/validator/v10" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/limiter" + "go.uber.org/fx" + "go.uber.org/zap" +) + +type upstreamHandler struct { + Handler + + config Config + pushSvc *push.Service +} + +type upstreamHandlerParams struct { + fx.In + + Config Config + PushSvc *push.Service + + Logger *zap.Logger + Validator *validator.Validate +} + +func newUpstreamHandler(params upstreamHandlerParams) *upstreamHandler { + return &upstreamHandler{ + Handler: Handler{Logger: params.Logger, Validator: params.Validator}, + config: params.Config, + pushSvc: params.PushSvc, + } +} + +func (h *upstreamHandler) postPush(c *fiber.Ctx) error { + req := smsgateway.UpstreamPushRequest{} + + if err := c.BodyParser(&req); err != nil { + return fiber.NewError(fiber.StatusBadRequest, err.Error()) + } + + if len(req) == 0 { + return fiber.NewError(fiber.StatusBadRequest, "Empty request") + } + + for _, v := range req { + if err := h.validateStruct(v); err != nil { + return err + } + + if err := h.pushSvc.Enqueue(c.Context(), v.Token, map[string]string{}); err != nil { + h.Logger.Error("Can't push message", zap.Error(err)) + } + } + + return c.SendStatus(fiber.StatusAccepted) +} + +func (h *upstreamHandler) Register(router fiber.Router) { + // register only in public mode + if h.config.GatewayMode != GatewayModePublic { + return + } + + router = router.Group("/upstream/v1") + + router.Post("/push", limiter.New(limiter.Config{ + Max: 5, + Expiration: 60 * time.Second, + LimiterMiddleware: limiter.SlidingWindow{}, + }), h.postPush) +} diff --git a/internal/sms-gateway/modules/auth/service.go b/internal/sms-gateway/modules/auth/service.go index 37f3179..67e35ef 100644 --- a/internal/sms-gateway/modules/auth/service.go +++ b/internal/sms-gateway/modules/auth/service.go @@ -16,6 +16,17 @@ type Config struct { PrivateToken string } +type Params struct { + fx.In + + Config Config + + Users *repositories.UsersRepository + Devices *repositories.DevicesRepository + + Logger *zap.Logger +} + type Service struct { config Config @@ -27,6 +38,18 @@ type Service struct { idgen func() string } +func New(params Params) *Service { + idgen, _ := nanoid.Standard(21) + + return &Service{ + config: params.Config, + users: params.Users, + devices: params.Devices, + logger: params.Logger.Named("Service"), + idgen: idgen, + } +} + func (s *Service) RegisterUser(login, password string) (models.User, error) { user := models.User{ ID: login, @@ -93,26 +116,3 @@ func (s *Service) AuthorizeUser(username, password string) (models.User, error) return user, crypto.CompareBCryptHash(user.PasswordHash, password) } - -type Params struct { - fx.In - - Config Config - - Users *repositories.UsersRepository - Devices *repositories.DevicesRepository - - Logger *zap.Logger -} - -func New(params Params) *Service { - idgen, _ := nanoid.Standard(21) - - return &Service{ - config: params.Config, - users: params.Users, - devices: params.Devices, - logger: params.Logger.Named("Service"), - idgen: idgen, - } -} diff --git a/internal/sms-gateway/modules/push/fcm/client.go b/internal/sms-gateway/modules/push/fcm/client.go index b31effc..729947e 100644 --- a/internal/sms-gateway/modules/push/fcm/client.go +++ b/internal/sms-gateway/modules/push/fcm/client.go @@ -2,6 +2,7 @@ package fcm import ( "context" + "errors" "fmt" "sync" @@ -51,16 +52,23 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, address string, payload map[string]string) error { - _, err := c.client.Send(ctx, &messaging.Message{ - Data: payload, - Android: &messaging.AndroidConfig{ - Priority: "high", - }, - Token: address, - }) +func (c *Client) Send(ctx context.Context, messages map[string]map[string]string) error { + errs := make([]error, 0, len(messages)) + for address, payload := range messages { + _, err := c.client.Send(ctx, &messaging.Message{ + Data: payload, + Android: &messaging.AndroidConfig{ + Priority: "high", + }, + Token: address, + }) - return err + if err != nil { + errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err)) + } + } + + return errors.Join(errs...) } func (c *Client) Close(ctx context.Context) error { diff --git a/internal/sms-gateway/modules/push/module.go b/internal/sms-gateway/modules/push/module.go index 170ac30..9a0b917 100644 --- a/internal/sms-gateway/modules/push/module.go +++ b/internal/sms-gateway/modules/push/module.go @@ -2,8 +2,10 @@ package push import ( "context" + "errors" "github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push/fcm" + "github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push/upstream" "go.uber.org/fx" "go.uber.org/zap" ) @@ -14,22 +16,29 @@ var Module = fx.Module( return log.Named("push") }), fx.Provide( - func(cfg Config, lc fx.Lifecycle) (client, error) { - client, err := fcm.New(cfg.ClientOptions) + func(cfg Config, lc fx.Lifecycle) (c client, err error) { + if cfg.Mode == ModeFCM { + c, err = fcm.New(cfg.ClientOptions) + } else if cfg.Mode == ModeUpstream { + c, err = upstream.New(cfg.ClientOptions) + } else { + return nil, errors.New("invalid push mode") + } + if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - return client.Open(ctx) + return c.Open(ctx) }, OnStop: func(ctx context.Context) error { - return client.Close(ctx) + return c.Close(ctx) }, }) - return client, nil + return c, nil }, ), fx.Provide( diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index 7ca5e85..d69cc0f 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -84,16 +84,9 @@ func (s *Service) sendAll(ctx context.Context) { } s.logger.Info("Sending messages", zap.Int("count", len(targets))) - for token, data := range targets { - singleCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) - if err := s.sendSingle(singleCtx, token, data); err != nil { - s.logger.Error("Can't send message", zap.String("token", token), zap.Error(err)) - } - cancel() + ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) + if err := s.client.Send(ctx, targets); err != nil { + s.logger.Error("Can't send messages", zap.Error(err)) } -} - -// sendSingle sends a single message to the specified token -func (s *Service) sendSingle(ctx context.Context, token string, data map[string]string) error { - return s.client.Send(ctx, token, data) + cancel() } diff --git a/internal/sms-gateway/modules/push/types.go b/internal/sms-gateway/modules/push/types.go index 4f8581c..a072dcd 100644 --- a/internal/sms-gateway/modules/push/types.go +++ b/internal/sms-gateway/modules/push/types.go @@ -11,6 +11,6 @@ const ( type client interface { Open(ctx context.Context) error - Send(ctx context.Context, address string, payload map[string]string) error + Send(ctx context.Context, messages map[string]map[string]string) error Close(ctx context.Context) error } diff --git a/internal/sms-gateway/modules/push/upstream/client.go b/internal/sms-gateway/modules/push/upstream/client.go new file mode 100644 index 0000000..b22a48e --- /dev/null +++ b/internal/sms-gateway/modules/push/upstream/client.go @@ -0,0 +1,88 @@ +package upstream + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + + "github.com/capcom6/sms-gateway/pkg/smsgateway" +) + +const BASE_URL = "https://sms.capcom.me/api/upstream/v1" + +type Client struct { + options map[string]string + + client *http.Client + mux sync.Mutex +} + +func New(options map[string]string) (*Client, error) { + return &Client{ + options: options, + }, nil +} + +func (c *Client) Open(ctx context.Context) error { + c.mux.Lock() + defer c.mux.Unlock() + + if c.client != nil { + return nil + } + + c.client = &http.Client{} + + return nil +} + +func (c *Client) Send(ctx context.Context, messages map[string]map[string]string) error { + payload := make(smsgateway.UpstreamPushRequest, 0, len(messages)) + + for address := range messages { + payload = append(payload, smsgateway.PushNotification{ + Token: address, + }) + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("can't marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes)) + if err != nil { + return fmt.Errorf("can't create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("can't send request: %w", err) + } + + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode >= 400 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} + +func (c *Client) Close(ctx context.Context) error { + c.mux.Lock() + defer c.mux.Unlock() + + c.client = nil + + return nil +} diff --git a/pkg/smsgateway/domain.go b/pkg/smsgateway/domain.go index 062b612..0202fdf 100644 --- a/pkg/smsgateway/domain.go +++ b/pkg/smsgateway/domain.go @@ -60,3 +60,7 @@ type RecipientState struct { State ProcessState `json:"state" validate:"required" example:"Pending"` // Состояние Error *string `json:"error,omitempty" example:"timeout"` // Ошибка } + +type PushNotification struct { + Token string `json:"token" validate:"required" example:"PyDmBQZZXYmyxMwED8Fzy"` +} diff --git a/pkg/smsgateway/requests.go b/pkg/smsgateway/requests.go index ca56a70..381ff22 100644 --- a/pkg/smsgateway/requests.go +++ b/pkg/smsgateway/requests.go @@ -11,3 +11,5 @@ type MobileUpdateRequest struct { Id string `json:"id" example:"QslD_GefqiYV6RQXdkM6V"` // Идентификатор, если есть PushToken string `json:"pushToken" validate:"omitempty,max=256" example:"gHz-T6NezDlOfllr7F-Be"` // Токен для отправки PUSH-уведомлений } + +type UpstreamPushRequest = []PushNotification