Skip to content

Commit

Permalink
feat(ARCO-283): add transaction for redis operations
Browse files Browse the repository at this point in the history
  • Loading branch information
pawellewandowski98 committed Nov 19, 2024
1 parent 60353e7 commit be2cd5c
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 20 deletions.
14 changes: 8 additions & 6 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
)

var (
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToScan = errors.New("failed to scan cache")
ErrCacheFailedToGetCount = errors.New("failed to get count from cache")
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToScan = errors.New("failed to scan cache")
ErrCacheFailedToGetCount = errors.New("failed to get count from cache")
ErrCacheFailedToExecuteTx = errors.New("failed to execute transaction")
)

type Store interface {
Get(hash *string, key string) ([]byte, error)
GetAllForHash(hash string) (map[string][]byte, error)
GetAllForHashAndDelete(hash string) (map[string][]byte, error)
Set(hash *string, key string, value []byte, ttl time.Duration) error
Del(hash *string, keys ...string) error
CountElementsForHash(hash string) (int64, error)
Expand Down
15 changes: 15 additions & 0 deletions internal/cache/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) {
return hashMap, nil
}

// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and deletes the hash.
func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) {
hashMap, err := s.GetAllForHash(hash)
if err != nil {
return nil, err
}

err = s.Del(nil, hash)
if err != nil {
return nil, err
}

return hashMap, nil
}

// CountElementsForHash returns the number of elements in a hash in memory.
func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) {
hashMap, err := s.GetAllForHash(hash)
Expand Down
21 changes: 21 additions & 0 deletions internal/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) {
return result, nil
}

// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction.
func (r *RedisStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) {
tx := r.client.TxPipeline()

getAllCmd := tx.HGetAll(r.ctx, hash)
tx.Del(r.ctx, hash)

_, err := tx.Exec(r.ctx)
if err != nil {
return nil, errors.Join(ErrCacheFailedToExecuteTx, err)
}

getAllCmdResult := getAllCmd.Val()

result := make(map[string][]byte)
for field, value := range getAllCmdResult {
result[field] = []byte(value)
}
return result, nil
}

// CountElementsForHash returns the number of elements in a hash.
func (r *RedisStore) CountElementsForHash(hash string) (int64, error) {
count, err := r.client.HLen(r.ctx, hash).Result()
Expand Down
31 changes: 19 additions & 12 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,27 +427,27 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
}

if statusUpdateCount >= p.processStatusUpdatesBatchSize {
statusUpdatesMap, err := p.getAllTransactionStatuses()
err := p.checkAndUpdate(ctx)
if err != nil {
p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error()))
return
p.logger.Error("failed to check and update statuses", slog.String("err", err.Error()))
}

p.checkAndUpdate(ctx, statusUpdatesMap)

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap, err := p.getAllTransactionStatuses()
statusUpdateCount, err := p.getStatusUpdateCount()
if err != nil {
p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error()))
p.logger.Error("failed to get status update count", slog.String("err", err.Error()))
return
}

if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(ctx, statusUpdatesMap)
if statusUpdateCount > 0 {
err := p.checkAndUpdate(ctx)
if err != nil {
p.logger.Error("failed to check and update statuses", slog.String("err", err.Error()))
}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
Expand All @@ -458,12 +458,17 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
}()
}

func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusUpdateMap) {
func (p *Processor) checkAndUpdate(ctx context.Context) error {
ctx, span := tracing.StartTracing(ctx, "checkAndUpdate", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)

statusUpdatesMap, err := p.getAndDeleteAllTransactionStatuses()
if err != nil {
return err
}

if len(statusUpdatesMap) == 0 {
return
return nil
}

statusUpdates := make([]store.UpdateStatus, 0, len(statusUpdatesMap))
Expand All @@ -477,7 +482,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU
}
}

err := p.statusUpdateWithCallback(ctx, statusUpdates, doubleSpendUpdates)
err = p.statusUpdateWithCallback(ctx, statusUpdates, doubleSpendUpdates)
if err != nil {
p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error()))
}
Expand All @@ -486,6 +491,8 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU
if err != nil {
p.logger.Error("failed to clear status update cache", slog.String("err", err.Error()))
}

return nil
}

func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat
return &status, nil
}

func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) {
func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error) {
statuses := make(StatusUpdateMap)
keys, err := p.cacheStore.GetAllForHash(CacheStatusUpdateHash)
keys, err := p.cacheStore.GetAllForHashAndDelete(CacheStatusUpdateHash)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit be2cd5c

Please sign in to comment.