Added: hashing interval from config

This commit is contained in:
Aleksandr Soloshenko 2023-12-14 23:37:41 +07:00
parent 1e958591cb
commit 8ab1a6a7c2
5 changed files with 35 additions and 3 deletions

View File

@ -16,3 +16,6 @@ fcm:
{
...
}
tasks:
hashing:
interval_seconds: 15

View File

@ -4,6 +4,7 @@ type Config struct {
HTTP HTTP `yaml:"http"`
Database Database `yaml:"database"`
FCM FCMConfig `yaml:"fcm"`
Tasks Tasks `yaml:"tasks"`
}
type HTTP struct {
@ -24,6 +25,14 @@ type FCMConfig struct {
CredentialsJSON string `yaml:"credentials_json"`
}
type Tasks struct {
Hashing HashingTask `yaml:"hashing"`
}
type HashingTask struct {
IntervalSeconds uint16 `yaml:"interval_seconds"`
}
var defaultConfig = Config{
HTTP: HTTP{
Listen: ":3000",
@ -40,4 +49,9 @@ var defaultConfig = Config{
FCM: FCMConfig{
CredentialsJSON: "",
},
Tasks: Tasks{
Hashing: HashingTask{
IntervalSeconds: uint16(15 * 60),
},
},
}

View File

@ -1,10 +1,13 @@
package config
import (
"time"
"github.com/capcom6/sms-gateway/internal/infra/config"
"github.com/capcom6/sms-gateway/internal/infra/db"
"github.com/capcom6/sms-gateway/internal/infra/http"
"github.com/capcom6/sms-gateway/internal/sms-gateway/services"
"github.com/capcom6/sms-gateway/internal/sms-gateway/tasks"
"go.uber.org/fx"
"go.uber.org/zap"
)
@ -41,4 +44,9 @@ var Module = fx.Module(
CredentialsJSON: cfg.FCM.CredentialsJSON,
}
}),
fx.Provide(func(cfg Config) tasks.HashingTaskConfig {
return tasks.HashingTaskConfig{
Interval: time.Duration(cfg.Tasks.Hashing.IntervalSeconds) * time.Second,
}
}),
)

View File

@ -72,7 +72,7 @@ func (r *MessagesRepository) UpdateState(message *models.Message) error {
func (r *MessagesRepository) HashProcessed() error {
return r.db.Transaction(func(tx *gorm.DB) error {
hasLock := sql.NullBool{}
lockRow := tx.Raw("SELECT GET_LOCK('?', 1)", HashingLockName).Row()
lockRow := tx.Raw("SELECT GET_LOCK(?, 1)", HashingLockName).Row()
err := lockRow.Scan(&hasLock)
if err != nil {
return err
@ -81,7 +81,7 @@ func (r *MessagesRepository) HashProcessed() error {
if !hasLock.Valid || !hasLock.Bool {
return errors.New("failed to acquire lock")
}
defer tx.Exec("SELECT RELEASE_LOCK('?')", HashingLockName)
defer tx.Exec("SELECT RELEASE_LOCK(?)", HashingLockName)
err = tx.Model(&models.MessageRecipient{}).
Where("message_id IN (?)", tx.Model(&models.Message{}).Select("id").Where("is_hashed = ? AND state <> ?", false, models.MessageStatePending)).

View File

@ -9,15 +9,21 @@ import (
"go.uber.org/zap"
)
type HashingTaskConfig struct {
Interval time.Duration
}
type HashingTaskParams struct {
fx.In
MessagesSvc *services.MessagesService
Config HashingTaskConfig
Logger *zap.Logger
}
type HashingTask struct {
MessagesSvc *services.MessagesService
Config HashingTaskConfig
Logger *zap.Logger
}
@ -28,7 +34,7 @@ func (t *HashingTask) Run(ctx context.Context) {
case <-ctx.Done():
t.Logger.Info("Stopping hashing task...")
return
case <-time.After(15 * time.Minute):
case <-time.After(t.Config.Interval):
t.Logger.Debug("Hashing messages...")
if err := t.MessagesSvc.HashProcessed(); err != nil {
t.Logger.Error("Failed to hash processed messages", zap.Error(err))
@ -40,6 +46,7 @@ func (t *HashingTask) Run(ctx context.Context) {
func NewHashingTask(params HashingTaskParams) *HashingTask {
return &HashingTask{
MessagesSvc: params.MessagesSvc,
Config: params.Config,
Logger: params.Logger,
}
}