[api/upstream] add upstream push endpoint

[push] send to upstream in private mode
This commit is contained in:
Aleksandr Soloshenko 2024-03-04 23:46:59 +07:00
parent 5b0c768051
commit 9c33f1af00
13 changed files with 259 additions and 54 deletions

View File

@ -73,3 +73,13 @@ Content-Type: application/json
]
}
]
###
POST {{baseUrl}}/api/upstream/v1/push HTTP/1.1
Content-Type: application/json
[
{
"token": "eTxx88nfSla87gZuJcW5mS:APA91bHGxVgSqqRtxwFHD1q9em5Oa6xSP4gO_OZRrqOoP1wjf_7UMfXKsc4uws6rWkqn73jYCc1owyATB1v61mqak4ntpqtmRkNtTey7NQXa0Wz3uQZBWY-Ecbn2rWG2VJRihOzXRId-"
}
]

View File

@ -6,6 +6,7 @@ import (
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/go-infra-fx/db"
"github.com/capcom6/go-infra-fx/http"
"github.com/capcom6/sms-gateway/internal/sms-gateway/handlers"
"github.com/capcom6/sms-gateway/internal/sms-gateway/modules/auth"
"github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push"
"github.com/capcom6/sms-gateway/internal/sms-gateway/tasks"
@ -54,11 +55,6 @@ var Module = fx.Module(
Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second,
Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second,
}
// return services.PushServiceConfig{
// CredentialsJSON: cfg.FCM.CredentialsJSON,
// Debounce: time.Duration(cfg.FCM.DebounceSeconds) * time.Second,
// Timeout: time.Duration(cfg.FCM.TimeoutSeconds) * time.Second,
// }
}),
fx.Provide(func(cfg Config) tasks.HashingTaskConfig {
return tasks.HashingTaskConfig{
@ -71,4 +67,9 @@ var Module = fx.Module(
PrivateToken: cfg.Gateway.PrivateToken,
}
}),
fx.Provide(func(cfg Config) handlers.Config {
return handlers.Config{
GatewayMode: handlers.GatewayMode(cfg.Gateway.Mode),
}
}),
)

View File

@ -0,0 +1,12 @@
package handlers
type GatewayMode string
const (
GatewayModePrivate GatewayMode = "private"
GatewayModePublic GatewayMode = "public"
)
type Config struct {
GatewayMode GatewayMode
}

View File

@ -15,5 +15,6 @@ var Module = fx.Module(
http.AsRootHandler(newRootHandler),
http.AsApiHandler(newThirdPartyHandler),
http.AsApiHandler(newMobileHandler),
http.AsApiHandler(newUpstreamHandler),
),
)

View File

@ -0,0 +1,77 @@
package handlers
import (
"time"
"github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push"
"github.com/capcom6/sms-gateway/pkg/smsgateway"
"github.com/go-playground/validator/v10"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/limiter"
"go.uber.org/fx"
"go.uber.org/zap"
)
type upstreamHandler struct {
Handler
config Config
pushSvc *push.Service
}
type upstreamHandlerParams struct {
fx.In
Config Config
PushSvc *push.Service
Logger *zap.Logger
Validator *validator.Validate
}
func newUpstreamHandler(params upstreamHandlerParams) *upstreamHandler {
return &upstreamHandler{
Handler: Handler{Logger: params.Logger, Validator: params.Validator},
config: params.Config,
pushSvc: params.PushSvc,
}
}
func (h *upstreamHandler) postPush(c *fiber.Ctx) error {
req := smsgateway.UpstreamPushRequest{}
if err := c.BodyParser(&req); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
if len(req) == 0 {
return fiber.NewError(fiber.StatusBadRequest, "Empty request")
}
for _, v := range req {
if err := h.validateStruct(v); err != nil {
return err
}
if err := h.pushSvc.Enqueue(c.Context(), v.Token, map[string]string{}); err != nil {
h.Logger.Error("Can't push message", zap.Error(err))
}
}
return c.SendStatus(fiber.StatusAccepted)
}
func (h *upstreamHandler) Register(router fiber.Router) {
// register only in public mode
if h.config.GatewayMode != GatewayModePublic {
return
}
router = router.Group("/upstream/v1")
router.Post("/push", limiter.New(limiter.Config{
Max: 5,
Expiration: 60 * time.Second,
LimiterMiddleware: limiter.SlidingWindow{},
}), h.postPush)
}

View File

@ -16,6 +16,17 @@ type Config struct {
PrivateToken string
}
type Params struct {
fx.In
Config Config
Users *repositories.UsersRepository
Devices *repositories.DevicesRepository
Logger *zap.Logger
}
type Service struct {
config Config
@ -27,6 +38,18 @@ type Service struct {
idgen func() string
}
func New(params Params) *Service {
idgen, _ := nanoid.Standard(21)
return &Service{
config: params.Config,
users: params.Users,
devices: params.Devices,
logger: params.Logger.Named("Service"),
idgen: idgen,
}
}
func (s *Service) RegisterUser(login, password string) (models.User, error) {
user := models.User{
ID: login,
@ -93,26 +116,3 @@ func (s *Service) AuthorizeUser(username, password string) (models.User, error)
return user, crypto.CompareBCryptHash(user.PasswordHash, password)
}
type Params struct {
fx.In
Config Config
Users *repositories.UsersRepository
Devices *repositories.DevicesRepository
Logger *zap.Logger
}
func New(params Params) *Service {
idgen, _ := nanoid.Standard(21)
return &Service{
config: params.Config,
users: params.Users,
devices: params.Devices,
logger: params.Logger.Named("Service"),
idgen: idgen,
}
}

View File

@ -2,6 +2,7 @@ package fcm
import (
"context"
"errors"
"fmt"
"sync"
@ -51,16 +52,23 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}
func (c *Client) Send(ctx context.Context, address string, payload map[string]string) error {
_, err := c.client.Send(ctx, &messaging.Message{
Data: payload,
Android: &messaging.AndroidConfig{
Priority: "high",
},
Token: address,
})
func (c *Client) Send(ctx context.Context, messages map[string]map[string]string) error {
errs := make([]error, 0, len(messages))
for address, payload := range messages {
_, err := c.client.Send(ctx, &messaging.Message{
Data: payload,
Android: &messaging.AndroidConfig{
Priority: "high",
},
Token: address,
})
return err
if err != nil {
errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err))
}
}
return errors.Join(errs...)
}
func (c *Client) Close(ctx context.Context) error {

View File

@ -2,8 +2,10 @@ package push
import (
"context"
"errors"
"github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push/fcm"
"github.com/capcom6/sms-gateway/internal/sms-gateway/modules/push/upstream"
"go.uber.org/fx"
"go.uber.org/zap"
)
@ -14,22 +16,29 @@ var Module = fx.Module(
return log.Named("push")
}),
fx.Provide(
func(cfg Config, lc fx.Lifecycle) (client, error) {
client, err := fcm.New(cfg.ClientOptions)
func(cfg Config, lc fx.Lifecycle) (c client, err error) {
if cfg.Mode == ModeFCM {
c, err = fcm.New(cfg.ClientOptions)
} else if cfg.Mode == ModeUpstream {
c, err = upstream.New(cfg.ClientOptions)
} else {
return nil, errors.New("invalid push mode")
}
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return client.Open(ctx)
return c.Open(ctx)
},
OnStop: func(ctx context.Context) error {
return client.Close(ctx)
return c.Close(ctx)
},
})
return client, nil
return c, nil
},
),
fx.Provide(

View File

@ -84,16 +84,9 @@ func (s *Service) sendAll(ctx context.Context) {
}
s.logger.Info("Sending messages", zap.Int("count", len(targets)))
for token, data := range targets {
singleCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
if err := s.sendSingle(singleCtx, token, data); err != nil {
s.logger.Error("Can't send message", zap.String("token", token), zap.Error(err))
}
cancel()
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
if err := s.client.Send(ctx, targets); err != nil {
s.logger.Error("Can't send messages", zap.Error(err))
}
}
// sendSingle sends a single message to the specified token
func (s *Service) sendSingle(ctx context.Context, token string, data map[string]string) error {
return s.client.Send(ctx, token, data)
cancel()
}

View File

@ -11,6 +11,6 @@ const (
type client interface {
Open(ctx context.Context) error
Send(ctx context.Context, address string, payload map[string]string) error
Send(ctx context.Context, messages map[string]map[string]string) error
Close(ctx context.Context) error
}

View File

@ -0,0 +1,88 @@
package upstream
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"github.com/capcom6/sms-gateway/pkg/smsgateway"
)
const BASE_URL = "https://sms.capcom.me/api/upstream/v1"
type Client struct {
options map[string]string
client *http.Client
mux sync.Mutex
}
func New(options map[string]string) (*Client, error) {
return &Client{
options: options,
}, nil
}
func (c *Client) Open(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
if c.client != nil {
return nil
}
c.client = &http.Client{}
return nil
}
func (c *Client) Send(ctx context.Context, messages map[string]map[string]string) error {
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
for address := range messages {
payload = append(payload, smsgateway.PushNotification{
Token: address,
})
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("can't marshal payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("can't create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("can't send request: %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
if resp.StatusCode >= 400 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
func (c *Client) Close(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
c.client = nil
return nil
}

View File

@ -60,3 +60,7 @@ type RecipientState struct {
State ProcessState `json:"state" validate:"required" example:"Pending"` // Состояние
Error *string `json:"error,omitempty" example:"timeout"` // Ошибка
}
type PushNotification struct {
Token string `json:"token" validate:"required" example:"PyDmBQZZXYmyxMwED8Fzy"`
}

View File

@ -11,3 +11,5 @@ type MobileUpdateRequest struct {
Id string `json:"id" example:"QslD_GefqiYV6RQXdkM6V"` // Идентификатор, если есть
PushToken string `json:"pushToken" validate:"omitempty,max=256" example:"gHz-T6NezDlOfllr7F-Be"` // Токен для отправки PUSH-уведомлений
}
type UpstreamPushRequest = []PushNotification