diff --git a/api/requests.http b/api/requests.http index ac28024..2d922d2 100644 --- a/api/requests.http +++ b/api/requests.http @@ -42,12 +42,12 @@ Content-Type: application/json [ { - "id": "8GHr-tUz6JBRFpTUXDV4w", - "state": "Pending", + "id": "-rnEaUz7KObDdokPrzKpM", + "state": "Delivered", "recipients": [ { - "phoneNumber": "79990001234", - "state": "Pending" + "phoneNumber": "{{phone}}", + "state": "Delivered" } ] } diff --git a/configs/config.example.yml b/configs/config.example.yml index afcc0a4..bf5e2cb 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -16,3 +16,6 @@ fcm: { ... } +tasks: + hashing: + interval_seconds: 15 diff --git a/internal/config/config.go b/internal/config/config.go index d820e58..a43a78d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ type Config struct { HTTP HTTP `yaml:"http"` Database Database `yaml:"database"` FCM FCMConfig `yaml:"fcm"` + Tasks Tasks `yaml:"tasks"` } type HTTP struct { @@ -24,6 +25,14 @@ type FCMConfig struct { CredentialsJSON string `yaml:"credentials_json"` } +type Tasks struct { + Hashing HashingTask `yaml:"hashing"` +} + +type HashingTask struct { + IntervalSeconds uint16 `yaml:"interval_seconds"` +} + var defaultConfig = Config{ HTTP: HTTP{ Listen: ":3000", @@ -40,4 +49,9 @@ var defaultConfig = Config{ FCM: FCMConfig{ CredentialsJSON: "", }, + Tasks: Tasks{ + Hashing: HashingTask{ + IntervalSeconds: uint16(15 * 60), + }, + }, } diff --git a/internal/config/module.go b/internal/config/module.go index c9da284..442f409 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -1,10 +1,13 @@ package config import ( + "time" + "github.com/capcom6/sms-gateway/internal/infra/config" "github.com/capcom6/sms-gateway/internal/infra/db" "github.com/capcom6/sms-gateway/internal/infra/http" "github.com/capcom6/sms-gateway/internal/sms-gateway/services" + "github.com/capcom6/sms-gateway/internal/sms-gateway/tasks" "go.uber.org/fx" "go.uber.org/zap" ) @@ -41,4 +44,9 @@ var Module = fx.Module( CredentialsJSON: cfg.FCM.CredentialsJSON, } }), + fx.Provide(func(cfg Config) tasks.HashingTaskConfig { + return tasks.HashingTaskConfig{ + Interval: time.Duration(cfg.Tasks.Hashing.IntervalSeconds) * time.Second, + } + }), ) diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index 15585b8..c00cdbd 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -11,6 +11,7 @@ import ( "github.com/capcom6/sms-gateway/internal/sms-gateway/models" "github.com/capcom6/sms-gateway/internal/sms-gateway/repositories" "github.com/capcom6/sms-gateway/internal/sms-gateway/services" + "github.com/capcom6/sms-gateway/internal/sms-gateway/tasks" "go.uber.org/fx" "go.uber.org/fx/fxevent" "go.uber.org/zap" @@ -28,6 +29,7 @@ var Module = fx.Module( repositories.Module, models.Module, db.Module, + tasks.Module, ) func Run() { diff --git a/internal/sms-gateway/models/migrations/mysql/20231208173729_hashed_messages.sql b/internal/sms-gateway/models/migrations/mysql/20231208173729_hashed_messages.sql new file mode 100644 index 0000000..3e88dcc --- /dev/null +++ b/internal/sms-gateway/models/migrations/mysql/20231208173729_hashed_messages.sql @@ -0,0 +1,13 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE `messages` +ADD `is_hashed` tinyint(1) unsigned NOT NULL DEFAULT false; +-- +goose StatementEnd +-- +goose StatementBegin +CREATE INDEX `idx_messages_is_hashed` USING HASH ON `messages` (`is_hashed`); +-- +goose StatementEnd +--- +-- +goose Down +-- +goose StatementBegin +ALTER TABLE `messages` DROP `is_hashed`; +-- +goose StatementEnd \ No newline at end of file diff --git a/internal/sms-gateway/models/models.go b/internal/sms-gateway/models/models.go index b3b86b3..180394c 100644 --- a/internal/sms-gateway/models/models.go +++ b/internal/sms-gateway/models/models.go @@ -51,6 +51,8 @@ type Message struct { SimNumber *uint8 `gorm:"type:tinyint(1) unsigned"` WithDeliveryReport bool `gorm:"not null;type:tinyint(1) unsigned"` + IsHashed bool `gorm:"not null;type:tinyint(1) unsigned;default:0"` + Device Device `gorm:"foreignKey:DeviceID;constraint:OnDelete:CASCADE"` Recipients []MessageRecipient `gorm:"foreignKey:MessageID;constraint:OnDelete:CASCADE"` diff --git a/internal/sms-gateway/repositories/messages.go b/internal/sms-gateway/repositories/messages.go index 3d0b159..7163988 100644 --- a/internal/sms-gateway/repositories/messages.go +++ b/internal/sms-gateway/repositories/messages.go @@ -1,13 +1,16 @@ package repositories import ( + "database/sql" + "errors" + "github.com/capcom6/sms-gateway/internal/sms-gateway/models" "gorm.io/gorm" ) -var ( - ErrMessageNotFound = gorm.ErrRecordNotFound -) +const HashingLockName = "36444143-1ace-4dbf-891c-cc505911497e" + +var ErrMessageNotFound = gorm.ErrRecordNotFound type MessagesRepository struct { db *gorm.DB @@ -66,6 +69,35 @@ func (r *MessagesRepository) UpdateState(message *models.Message) error { }) } +func (r *MessagesRepository) HashProcessed() error { + return r.db.Transaction(func(tx *gorm.DB) error { + hasLock := sql.NullBool{} + lockRow := tx.Raw("SELECT GET_LOCK(?, 1)", HashingLockName).Row() + err := lockRow.Scan(&hasLock) + if err != nil { + return err + } + + if !hasLock.Valid || !hasLock.Bool { + return errors.New("failed to acquire lock") + } + defer tx.Exec("SELECT RELEASE_LOCK(?)", HashingLockName) + + err = tx.Model(&models.MessageRecipient{}). + Where("message_id IN (?)", tx.Model(&models.Message{}).Select("id").Where("is_hashed = ? AND state <> ?", false, models.MessageStatePending)). + Update("phone_number", gorm.Expr("LEFT(SHA2(phone_number, 256), 16)")). + Error + if err != nil { + return err + } + + return tx.Model(&models.Message{}). + Where("is_hashed = ? AND state <> ?", false, models.MessageStatePending). + Updates(map[string]interface{}{"is_hashed": true, "message": gorm.Expr("SHA2(message, 256)")}). + Error + }) +} + func NewMessagesRepository(db *gorm.DB) *MessagesRepository { return &MessagesRepository{ db: db, diff --git a/internal/sms-gateway/services/messages.go b/internal/sms-gateway/services/messages.go index 48fffc6..3b80237 100644 --- a/internal/sms-gateway/services/messages.go +++ b/internal/sms-gateway/services/messages.go @@ -2,6 +2,7 @@ package services import ( "context" + "crypto/sha256" "errors" "fmt" "log" @@ -85,7 +86,7 @@ func (s *MessagesService) UpdateState(deviceID string, message smsgateway.Messag } existing.State = models.MessageState(message.State) - existing.Recipients = s.recipientsStateToModel(message.Recipients) + existing.Recipients = s.recipientsStateToModel(message.Recipients, existing.IsHashed) return s.Messages.UpdateState(&existing) } @@ -163,6 +164,10 @@ func (s *MessagesService) Enqeue(device models.Device, message smsgateway.Messag return state, nil } +func (s *MessagesService) HashProcessed() error { + return s.Messages.HashProcessed() +} + func (s *MessagesService) filterTimeouted(messages []models.Message) []models.Message { result := make([]models.Message, 0, len(messages)) for _, v := range messages { @@ -202,12 +207,13 @@ func (s *MessagesService) recipientsToModel(input []string) []models.MessageReci return output } -func (s *MessagesService) recipientsStateToModel(input []smsgateway.RecipientState) []models.MessageRecipient { +func (s *MessagesService) recipientsStateToModel(input []smsgateway.RecipientState, hash bool) []models.MessageRecipient { output := make([]models.MessageRecipient, len(input)) for i, v := range input { phoneNumber := v.PhoneNumber if len(phoneNumber) > 0 && phoneNumber[0] != '+' { + // compatibility with Android app before 1.1.1 phoneNumber = "+" + phoneNumber } @@ -215,6 +221,10 @@ func (s *MessagesService) recipientsStateToModel(input []smsgateway.RecipientSta v.State = smsgateway.MessageStateProcessed } + if hash { + phoneNumber = fmt.Sprintf("%x", sha256.Sum256([]byte(phoneNumber)))[:16] + } + output[i] = models.MessageRecipient{ PhoneNumber: phoneNumber, State: models.MessageState(v.State), diff --git a/internal/sms-gateway/services/messages_test.go b/internal/sms-gateway/services/messages_test.go index 7714f72..b2aa266 100644 --- a/internal/sms-gateway/services/messages_test.go +++ b/internal/sms-gateway/services/messages_test.go @@ -11,6 +11,7 @@ import ( func TestMessagesService_recipientsStateToModel(t *testing.T) { type args struct { input []smsgateway.RecipientState + hash bool } tests := []struct { name string @@ -56,6 +57,26 @@ func TestMessagesService_recipientsStateToModel(t *testing.T) { }, }, }, + { + name: "With hashing", + s: &MessagesService{}, + args: args{ + input: []smsgateway.RecipientState{ + { + PhoneNumber: "+79990001234", + State: "", + }, + }, + hash: true, + }, + want: []models.MessageRecipient{ + { + MessageID: 0, + PhoneNumber: "62d17792b45c5307", + State: "", + }, + }, + }, { name: "Empty phone", s: &MessagesService{}, @@ -78,7 +99,7 @@ func TestMessagesService_recipientsStateToModel(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := tt.s.recipientsStateToModel(tt.args.input); !reflect.DeepEqual(got, tt.want) { + if got := tt.s.recipientsStateToModel(tt.args.input, tt.args.hash); !reflect.DeepEqual(got, tt.want) { t.Errorf("MessagesService.recipientsStateToModel() = %v, want %v", got, tt.want) } }) diff --git a/internal/sms-gateway/tasks/hashing.go b/internal/sms-gateway/tasks/hashing.go new file mode 100644 index 0000000..0ebe5d5 --- /dev/null +++ b/internal/sms-gateway/tasks/hashing.go @@ -0,0 +1,52 @@ +package tasks + +import ( + "context" + "time" + + "github.com/capcom6/sms-gateway/internal/sms-gateway/services" + "go.uber.org/fx" + "go.uber.org/zap" +) + +type HashingTaskConfig struct { + Interval time.Duration +} + +type HashingTaskParams struct { + fx.In + + MessagesSvc *services.MessagesService + Config HashingTaskConfig + Logger *zap.Logger +} + +type HashingTask struct { + MessagesSvc *services.MessagesService + Config HashingTaskConfig + Logger *zap.Logger +} + +func (t *HashingTask) Run(ctx context.Context) { + t.Logger.Info("Starting hashing task...") + for { + select { + case <-ctx.Done(): + t.Logger.Info("Stopping hashing task...") + return + case <-time.After(t.Config.Interval): + t.Logger.Debug("Hashing messages...") + if err := t.MessagesSvc.HashProcessed(); err != nil { + t.Logger.Error("Failed to hash processed messages", zap.Error(err)) + } + } + } +} + +func NewHashingTask(params HashingTaskParams) *HashingTask { + return &HashingTask{ + MessagesSvc: params.MessagesSvc, + Config: params.Config, + Logger: params.Logger, + } +} diff --git a/internal/sms-gateway/tasks/module.go b/internal/sms-gateway/tasks/module.go new file mode 100644 index 0000000..c0df981 --- /dev/null +++ b/internal/sms-gateway/tasks/module.go @@ -0,0 +1,37 @@ +package tasks + +import ( + "context" + + "go.uber.org/fx" + "go.uber.org/zap" +) + +var Module = fx.Module( + "tasks", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("tasks") + }), + fx.Provide( + NewHashingTask, + fx.Private, + ), + fx.Invoke( + func(lc fx.Lifecycle, task *HashingTask) error { + ctx, cancel := context.WithCancel(context.Background()) + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go task.Run(ctx) + return nil + }, + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + + return nil + }, + ), +)