From be2cd5cd2964927e9c8b2f9e7bd01256e55e43c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Tue, 19 Nov 2024 09:31:58 +0100 Subject: [PATCH] feat(ARCO-283): add transaction for redis operations --- internal/cache/cache.go | 14 ++++++----- internal/cache/in_memory.go | 15 ++++++++++++ internal/cache/redis.go | 21 +++++++++++++++++ internal/metamorph/processor.go | 31 +++++++++++++++---------- internal/metamorph/processor_helpers.go | 4 ++-- 5 files changed, 65 insertions(+), 20 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index ef6e501d2..5c8ab9034 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,17 +6,19 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") - ErrCacheFailedToScan = errors.New("failed to scan cache") - ErrCacheFailedToGetCount = errors.New("failed to get count from cache") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") + ErrCacheFailedToGetCount = errors.New("failed to get count from cache") + ErrCacheFailedToExecuteTx = errors.New("failed to execute transaction") ) type Store interface { Get(hash *string, key string) ([]byte, error) GetAllForHash(hash string) (map[string][]byte, error) + GetAllForHashAndDelete(hash string) (map[string][]byte, error) Set(hash *string, key string, value []byte, ttl time.Duration) error Del(hash *string, keys ...string) error CountElementsForHash(hash string) (int64, error) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index bf4c05a83..2140d3474 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -110,6 +110,21 @@ func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) { return hashMap, nil } +// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and deletes the hash. +func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { + hashMap, err := s.GetAllForHash(hash) + if err != nil { + return nil, err + } + + err = s.Del(nil, hash) + if err != nil { + return nil, err + } + + return hashMap, nil +} + // CountElementsForHash returns the number of elements in a hash in memory. func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) { hashMap, err := s.GetAllForHash(hash) diff --git a/internal/cache/redis.go b/internal/cache/redis.go index fc12b6226..259345579 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -93,6 +93,27 @@ func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { return result, nil } +// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction. +func (r *RedisStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { + tx := r.client.TxPipeline() + + getAllCmd := tx.HGetAll(r.ctx, hash) + tx.Del(r.ctx, hash) + + _, err := tx.Exec(r.ctx) + if err != nil { + return nil, errors.Join(ErrCacheFailedToExecuteTx, err) + } + + getAllCmdResult := getAllCmd.Val() + + result := make(map[string][]byte) + for field, value := range getAllCmdResult { + result[field] = []byte(value) + } + return result, nil +} + // CountElementsForHash returns the number of elements in a hash. func (r *RedisStore) CountElementsForHash(hash string) (int64, error) { count, err := r.client.HLen(r.ctx, hash).Result() diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 9ad85b5f5..48ee3184d 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -427,27 +427,27 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { } if statusUpdateCount >= p.processStatusUpdatesBatchSize { - statusUpdatesMap, err := p.getAllTransactionStatuses() + err := p.checkAndUpdate(ctx) if err != nil { - p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) - return + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) } - p.checkAndUpdate(ctx, statusUpdatesMap) - // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: - statusUpdatesMap, err := p.getAllTransactionStatuses() + statusUpdateCount, err := p.getStatusUpdateCount() if err != nil { - p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) + p.logger.Error("failed to get status update count", slog.String("err", err.Error())) return } - if len(statusUpdatesMap) > 0 { - p.checkAndUpdate(ctx, statusUpdatesMap) + if statusUpdateCount > 0 { + err := p.checkAndUpdate(ctx) + if err != nil { + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) + } // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -458,12 +458,17 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { }() } -func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusUpdateMap) { +func (p *Processor) checkAndUpdate(ctx context.Context) error { ctx, span := tracing.StartTracing(ctx, "checkAndUpdate", p.tracingEnabled, p.tracingAttributes...) defer tracing.EndTracing(span) + statusUpdatesMap, err := p.getAndDeleteAllTransactionStatuses() + if err != nil { + return err + } + if len(statusUpdatesMap) == 0 { - return + return nil } statusUpdates := make([]store.UpdateStatus, 0, len(statusUpdatesMap)) @@ -477,7 +482,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU } } - err := p.statusUpdateWithCallback(ctx, statusUpdates, doubleSpendUpdates) + err = p.statusUpdateWithCallback(ctx, statusUpdates, doubleSpendUpdates) if err != nil { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } @@ -486,6 +491,8 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU if err != nil { p.logger.Error("failed to clear status update cache", slog.String("err", err.Error())) } + + return nil } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 0c09c11f6..affb95315 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -71,9 +71,9 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat return &status, nil } -func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { +func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error) { statuses := make(StatusUpdateMap) - keys, err := p.cacheStore.GetAllForHash(CacheStatusUpdateHash) + keys, err := p.cacheStore.GetAllForHashAndDelete(CacheStatusUpdateHash) if err != nil { return nil, err }