mirror of
https://github.com/makayabou/asg-server.git
synced 2026-05-02 17:43:36 +02:00
[cache] optimize Redis implementation
This commit is contained in:
parent
a74d41f078
commit
6fad3c1e2d
10
internal/sms-gateway/cache/factory.go
vendored
10
internal/sms-gateway/cache/factory.go
vendored
@ -15,11 +15,11 @@ const (
|
||||
type Cache = cache.Cache
|
||||
|
||||
type Factory interface {
|
||||
New(name string) (cache.Cache, error)
|
||||
New(name string) (Cache, error)
|
||||
}
|
||||
|
||||
type factory struct {
|
||||
new func(name string) (cache.Cache, error)
|
||||
new func(name string) (Cache, error)
|
||||
}
|
||||
|
||||
func NewFactory(config Config) (Factory, error) {
|
||||
@ -35,7 +35,7 @@ func NewFactory(config Config) (Factory, error) {
|
||||
switch u.Scheme {
|
||||
case "memory":
|
||||
return &factory{
|
||||
new: func(name string) (cache.Cache, error) {
|
||||
new: func(name string) (Cache, error) {
|
||||
return cache.NewMemory(0), nil
|
||||
},
|
||||
}, nil
|
||||
@ -45,7 +45,7 @@ func NewFactory(config Config) (Factory, error) {
|
||||
return nil, fmt.Errorf("can't create redis client: %w", err)
|
||||
}
|
||||
return &factory{
|
||||
new: func(name string) (cache.Cache, error) {
|
||||
new: func(name string) (Cache, error) {
|
||||
return cache.NewRedis(client, name, 0), nil
|
||||
},
|
||||
}, nil
|
||||
@ -55,6 +55,6 @@ func NewFactory(config Config) (Factory, error) {
|
||||
}
|
||||
|
||||
// New implements Factory.
|
||||
func (f *factory) New(name string) (cache.Cache, error) {
|
||||
func (f *factory) New(name string) (Cache, error) {
|
||||
return f.new(keyPrefix + name)
|
||||
}
|
||||
|
||||
18
pkg/cache/memory_bench_test.go
vendored
18
pkg/cache/memory_bench_test.go
vendored
@ -179,6 +179,7 @@ func BenchmarkMemoryCache_ConcurrentReads(b *testing.B) {
|
||||
|
||||
for _, bm := range benchmarks {
|
||||
b.Run(bm.name, func(b *testing.B) {
|
||||
b.SetParallelism(bm.goroutines)
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
@ -206,6 +207,7 @@ func BenchmarkMemoryCache_ConcurrentWrites(b *testing.B) {
|
||||
|
||||
for _, bm := range benchmarks {
|
||||
b.Run(bm.name, func(b *testing.B) {
|
||||
b.SetParallelism(bm.goroutines)
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
@ -238,6 +240,7 @@ func BenchmarkMemoryCache_MixedWorkload(b *testing.B) {
|
||||
|
||||
for _, bm := range benchmarks {
|
||||
b.Run(bm.name, func(b *testing.B) {
|
||||
b.SetParallelism(bm.goroutines)
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
@ -287,6 +290,7 @@ func BenchmarkMemoryCache_Scaling(b *testing.B) {
|
||||
cache.Set(ctx, key, value)
|
||||
}
|
||||
|
||||
b.SetParallelism(bm.goroutines)
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
localI := 0
|
||||
@ -355,12 +359,17 @@ func BenchmarkMemoryCache_LargeValues(b *testing.B) {
|
||||
for i := range value {
|
||||
value[i] = byte(i % 256)
|
||||
}
|
||||
valueStr := string(value)
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
cache.Set(ctx, key, string(value))
|
||||
cache.Get(ctx, key)
|
||||
if err := cache.Set(ctx, key, valueStr); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
if _, err := cache.Get(ctx, key); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
@ -371,6 +380,7 @@ func BenchmarkMemoryCache_LargeValues(b *testing.B) {
|
||||
func BenchmarkMemoryCache_MemoryGrowth(b *testing.B) {
|
||||
cache := cache.NewMemory(0)
|
||||
ctx := context.Background()
|
||||
b.ReportAllocs()
|
||||
|
||||
sizes := []int{100, 1000, 10000, 100000}
|
||||
|
||||
@ -378,12 +388,12 @@ func BenchmarkMemoryCache_MemoryGrowth(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("%d_items", size), func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
// Clear cache
|
||||
cache.Drain(ctx)
|
||||
|
||||
// Add new items
|
||||
for j := 0; j < size; j++ {
|
||||
for j := range size {
|
||||
key := "key-" + strconv.Itoa(j)
|
||||
value := "value-" + strconv.Itoa(j)
|
||||
cache.Set(ctx, key, value)
|
||||
|
||||
18
pkg/cache/redis.go
vendored
18
pkg/cache/redis.go
vendored
@ -26,9 +26,8 @@ end
|
||||
hgetallAndDeleteScript = `
|
||||
local items = redis.call('HGETALL', KEYS[1])
|
||||
if #items > 0 then
|
||||
for i = 1, #items, 2 do
|
||||
redis.call('HDEL', KEYS[1], items[i])
|
||||
end
|
||||
local ok = pcall(redis.call, 'UNLINK', KEYS[1])
|
||||
if not ok then redis.call('DEL', KEYS[1]) end
|
||||
end
|
||||
return items
|
||||
`
|
||||
@ -128,14 +127,15 @@ func (r *redisCache) Set(ctx context.Context, key string, value string, opts ...
|
||||
}
|
||||
options.apply(opts...)
|
||||
|
||||
if err := r.client.HSet(ctx, r.key, key, value).Err(); err != nil {
|
||||
return fmt.Errorf("can't set cache item: %w", err)
|
||||
}
|
||||
|
||||
_, err := r.client.Pipelined(ctx, func(p redis.Pipeliner) error {
|
||||
p.HSet(ctx, r.key, key, value)
|
||||
if !options.validUntil.IsZero() {
|
||||
if err := r.client.HExpireAt(ctx, r.key, options.validUntil, key).Err(); err != nil {
|
||||
return fmt.Errorf("can't set cache item ttl: %w", err)
|
||||
p.HExpireAt(ctx, r.key, options.validUntil, key)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't set cache item: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user