Skip to content

Commit

Permalink
feat(ARCO-283): add option to store data in cache for one hash
Browse files Browse the repository at this point in the history
  • Loading branch information
pawellewandowski98 committed Nov 15, 2024
1 parent cf3b643 commit e886abc
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 49 deletions.
20 changes: 11 additions & 9 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
)

var (
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToScan = errors.New("failed to scan cache")
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToScan = errors.New("failed to scan cache")
ErrCacheFailedToGetCount = errors.New("failed to get count from cache")
)

type Store interface {
Get(key string) ([]byte, error)
GetAllWithPrefix(prefix string) (map[string][]byte, error)
Set(key string, value []byte, ttl time.Duration) error
Del(keys ...string) error
Get(hash *string, key string) ([]byte, error)
GetAllForHash(hash string) (map[string][]byte, error)
Set(hash *string, key string, value []byte, ttl time.Duration) error
Del(hash *string, keys ...string) error
CountElementsForHash(hash string) (int64, error)
}
102 changes: 81 additions & 21 deletions internal/cache/in_memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"errors"
"sync"
"time"
)
Expand All @@ -13,8 +14,27 @@ func NewMemoryStore() *MemoryStore {
return &MemoryStore{}
}

// Get retrieves a value by key. It returns an error if the key does not exist.
func (s *MemoryStore) Get(key string) ([]byte, error) {
// Get retrieves a value by key and hash (if given)
func (s *MemoryStore) Get(hash *string, key string) ([]byte, error) {
if hash != nil {
hashValue, found := s.data.Load(hash)
if !found {
return nil, ErrCacheNotFound
}

hashMap, ok := hashValue.(map[string][]byte)
if !ok {
return nil, ErrCacheFailedToGet
}

fieldValue, exists := hashMap[key]
if !exists {
return nil, ErrCacheNotFound
}

return fieldValue, nil
}

value, found := s.data.Load(key)
if !found {
return nil, ErrCacheNotFound
Expand All @@ -29,33 +49,73 @@ func (s *MemoryStore) Get(key string) ([]byte, error) {
}

// Set stores a key-value pair, ignoring the ttl parameter.
func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error {
func (s *MemoryStore) Set(hash *string, key string, value []byte, _ time.Duration) error {
if hash != nil {
raw, _ := s.data.LoadOrStore(*hash, make(map[string][]byte))

hashMap, ok := raw.(map[string][]byte)
if !ok {
return ErrCacheFailedToSet
}

hashMap[key] = value

s.data.Store(*hash, hashMap)
return nil
}

s.data.Store(key, value)
return nil
}

// Del removes a key from the store.
func (s *MemoryStore) Del(keys ...string) error {
func (s *MemoryStore) Del(hash *string, keys ...string) error {
if hash != nil {
hashValue, found := s.data.Load(*hash)
if !found {
return ErrCacheNotFound
}

hashMap, ok := hashValue.(map[string][]byte)
if !ok {
return errors.Join(ErrCacheFailedToDel, ErrCacheFailedToGet)
}

for _, k := range keys {
delete(hashMap, k)
}

s.data.Store(*hash, hashMap)
return nil
}

for _, k := range keys {
s.data.Delete(k)
}
return nil
}
func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) {
keys := make(map[string][]byte)
s.data.Range(func(k, v interface{}) bool {
key, ok := k.(string)
if !ok {
return false
}
if key[:len(prefix)] == prefix {
value, ok := v.([]byte)
if !ok {
return false
}
keys[key] = value
}
return true
})
return keys, nil

// GetAllForHash retrieves all key-value pairs for a specific hash.
func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) {
hashValue, found := s.data.Load(hash)
if !found {
return nil, ErrCacheNotFound
}

hashMap, ok := hashValue.(map[string][]byte)
if !ok {
return nil, ErrCacheFailedToGet
}

return hashMap, nil
}

// CountElementsForHash returns the number of elements in a hash in memory.
func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) {
hashMap, err := s.GetAllForHash(hash)
if err != nil {
return 0, err
}

return int64(len(hashMap)), nil
}
64 changes: 56 additions & 8 deletions internal/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,54 @@ func NewRedisStore(ctx context.Context, c redis.UniversalClient) *RedisStore {
}
}

// Get retrieves a value by key.
func (r *RedisStore) Get(key string) ([]byte, error) {
result, err := r.client.Get(r.ctx, key).Result()
// Get retrieves a value by key and hash (if given).
func (r *RedisStore) Get(hash *string, key string) ([]byte, error) {
var result string
var err error

if hash == nil {
result, err = r.client.Get(r.ctx, key).Result()
} else {
result, err = r.client.HGet(r.ctx, *hash, key).Result()
}

if errors.Is(err, redis.Nil) {
return nil, ErrCacheNotFound
} else if err != nil {
return nil, errors.Join(ErrCacheFailedToGet, err)
}

return []byte(result), nil
}

// Set stores a value with a TTL.
func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error {
err := r.client.Set(r.ctx, key, value, ttl).Err()
// Set stores a value with a TTL for a specific hash (if given).
func (r *RedisStore) Set(hash *string, key string, value []byte, ttl time.Duration) error {
var err error

if hash == nil {
err = r.client.Set(r.ctx, key, value, ttl).Err()
} else {
err = r.client.HSet(r.ctx, *hash, key, value).Err()
}

if err != nil {
return errors.Join(ErrCacheFailedToSet, err)
}

return nil
}

// Del removes a value by key.
func (r *RedisStore) Del(keys ...string) error {
result, err := r.client.Del(r.ctx, keys...).Result()
func (r *RedisStore) Del(hash *string, keys ...string) error {
var result int64
var err error

if hash == nil {
result, err = r.client.Del(r.ctx, keys...).Result()
} else {
result, err = r.client.HDel(r.ctx, *hash, keys...).Result()
}

if err != nil {
return errors.Join(ErrCacheFailedToDel, err)
}
Expand Down Expand Up @@ -83,3 +108,26 @@ func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error)
}
return results, nil
}

// GetAllForHash retrieves all key-value pairs for a specific hash.
func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) {
values, err := r.client.HGetAll(r.ctx, hash).Result()
if err != nil {
return nil, errors.Join(ErrCacheFailedToGet, err)
}

result := make(map[string][]byte)
for field, value := range values {
result[field] = []byte(value)
}
return result, nil
}

// CountElementsForHash returns the number of elements in a hash.
func (r *RedisStore) CountElementsForHash(hash string) (int64, error) {
count, err := r.client.HLen(r.ctx, hash).Result()
if err != nil {
return 0, errors.Join(ErrCacheFailedToGetCount, err)
}
return count, nil
}
24 changes: 23 additions & 1 deletion internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,28 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}

statusUpdateCount, err := p.getStatusUpdateCount()
if err != nil {
p.logger.Error("failed to get status update count", slog.String("err", err.Error()))
return
}

if statusUpdateCount >= p.processStatusUpdatesBatchSize {
statusUpdatesMap, err := p.getAllTransactionStatuses()
if err != nil {
p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error()))
return
}

fmt.Println("statusUpdatesMap: ", statusUpdatesMap)

p.checkAndUpdate(ctx, statusUpdatesMap)

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap, err := p.getAllTransactionStatuses()
if err != nil {
Expand Down Expand Up @@ -464,7 +486,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU

err = p.clearCache(statusUpdatesMap)
if err != nil {
p.logger.Error("failed to clear status update map", slog.String("err", err.Error()))
p.logger.Error("failed to clear status update cache", slog.String("err", err.Error()))
}
}

Expand Down
26 changes: 16 additions & 10 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ import (

type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus

const (
CacheStatusUpdateKey = "status-updates"
CacheTxPrefix = "tx-"
)
var CacheStatusUpdateHash = "status-update"

var (
ErrFailedToSerialize = errors.New("failed to serialize value")
Expand Down Expand Up @@ -52,15 +49,15 @@ func (p *Processor) setTransactionStatus(status store.UpdateStatus) error {
return errors.Join(ErrFailedToSerialize, err)
}

err = p.cacheStore.Set(CacheTxPrefix+status.Hash.String(), bytes, processStatusUpdatesIntervalDefault)
err = p.cacheStore.Set(&CacheStatusUpdateHash, status.Hash.String(), bytes, processStatusUpdatesIntervalDefault)
if err != nil {
return err
}
return nil
}

func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) {
bytes, err := p.cacheStore.Get(CacheTxPrefix + hash.String())
bytes, err := p.cacheStore.Get(&CacheStatusUpdateHash, hash.String())
if err != nil {
return nil, err
}
Expand All @@ -76,13 +73,13 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat

func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) {
statuses := make(StatusUpdateMap)
keys, err := p.cacheStore.GetAllWithPrefix(CacheTxPrefix)
keys, err := p.cacheStore.GetAllForHash(CacheStatusUpdateHash)
if err != nil {
return nil, err
}

for key, value := range keys {
hash, err := chainhash.NewHashFromStr(key[len(CacheTxPrefix):])
hash, err := chainhash.NewHashFromStr(key)
if err != nil {
return nil, err
}
Expand All @@ -102,10 +99,19 @@ func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) {
func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error {
keys := make([]string, len(updateStatusMap))
for k := range updateStatusMap {
keys = append(keys, CacheTxPrefix+k.String())
keys = append(keys, k.String())
}

return p.cacheStore.Del(&CacheStatusUpdateHash, keys...)
}

func (p *Processor) getStatusUpdateCount() (int, error) {
count, err := p.cacheStore.CountElementsForHash(CacheStatusUpdateHash)
if err != nil {
return 0, err
}

return p.cacheStore.Del(keys...)
return int(count), nil
}

func shouldUpdateStatus(new, found store.UpdateStatus) bool {
Expand Down

0 comments on commit e886abc

Please sign in to comment.