From 137a106d74ac10c0ec3101c428d02f19bc1e9e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 09:32:11 +0100 Subject: [PATCH 01/26] feat(ARCO-283): remove freecache, combine methods with cache implementation --- cmd/arc/services/cache.go | 6 +-- config/config.go | 9 ++-- config/defaults.go | 7 +-- config/example_config.yaml | 4 +- internal/cache/freecache.go | 50 ------------------- internal/metamorph/health_check_test.go | 2 - .../double_spend_integration_test.go | 6 +-- internal/metamorph/processor.go | 7 ++- internal/metamorph/processor_helpers.go | 36 ++++++------- internal/metamorph/processor_test.go | 36 ++++--------- internal/metamorph/stats_collector_test.go | 6 +-- 11 files changed, 41 insertions(+), 128 deletions(-) delete mode 100644 internal/cache/freecache.go diff --git a/cmd/arc/services/cache.go b/cmd/arc/services/cache.go index 96440184f..1ac37bcda 100644 --- a/cmd/arc/services/cache.go +++ b/cmd/arc/services/cache.go @@ -6,7 +6,6 @@ import ( "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/config" - "github.com/coocood/freecache" "github.com/go-redis/redis/v8" ) @@ -15,9 +14,8 @@ var ErrCacheUnknownType = errors.New("unknown cache type") // NewCacheStore creates a new CacheStore based on the provided configuration. func NewCacheStore(cacheConfig *config.CacheConfig) (cache.Store, error) { switch cacheConfig.Engine { - case config.FreeCache: - cacheSize := freecache.NewCache(cacheConfig.Freecache.Size) - return cache.NewFreecacheStore(cacheSize), nil + case config.InternalCache: + return nil, nil case config.Redis: c := redis.NewClient(&redis.Options{ Addr: cacheConfig.Redis.Addr, diff --git a/config/config.go b/config/config.go index 171ddb8e5..423686716 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,8 @@ import ( ) const ( - FreeCache = "freecache" - Redis = "redis" + InternalCache = "internal" + Redis = "redis" ) type ArcConfig struct { @@ -137,9 +137,8 @@ type PostgresConfig struct { } type CacheConfig struct { - Engine string `mapstructure:"engine"` - Freecache *FreeCacheConfig `mapstructure:"freecache"` - Redis *RedisConfig `mapstructure:"redis"` + Engine string `mapstructure:"engine"` + Redis *RedisConfig `mapstructure:"redis"` } type FreeCacheConfig struct { diff --git a/config/defaults.go b/config/defaults.go index ca902819d..8a5844958 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -198,11 +198,8 @@ func getCallbackerConfig() *CallbackerConfig { func getCacheConfig() *CacheConfig { return &CacheConfig{ - Engine: FreeCache, - Freecache: &FreeCacheConfig{ - Size: 10 * 1024 * 1024, // Default size 10MB. - }, - Redis: &RedisConfig{ + Engine: InternalCache, // use internal cache + Redis: &RedisConfig{ // example of Redis config Addr: "localhost:6379", Password: "", DB: 0, diff --git a/config/example_config.yaml b/config/example_config.yaml index 281a953f9..0b56f7d41 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -48,9 +48,7 @@ broadcasting: # settings for connection to nodes p2p: 18335 cache: - engine: freecache # cache engine - one of freecache | redis - freecache: # freecache configuration - size: 10000000 # size of cache + engine: redis redis: addr: "localhost:6379" password: "" diff --git a/internal/cache/freecache.go b/internal/cache/freecache.go deleted file mode 100644 index a84460a54..000000000 --- a/internal/cache/freecache.go +++ /dev/null @@ -1,50 +0,0 @@ -package cache - -import ( - "errors" - "time" - - "github.com/coocood/freecache" -) - -// FreecacheStore is an implementation of CacheStore using freecache. -type FreecacheStore struct { - cache *freecache.Cache -} - -// NewFreecacheStore initializes a FreecacheStore. -func NewFreecacheStore(c *freecache.Cache) *FreecacheStore { - return &FreecacheStore{ - cache: c, - } -} - -// Get retrieves a value by key. -func (f *FreecacheStore) Get(key string) ([]byte, error) { - value, err := f.cache.Get([]byte(key)) - if err != nil { - if errors.Is(err, freecache.ErrNotFound) { - return nil, ErrCacheNotFound - } - return nil, errors.Join(ErrCacheFailedToGet, err) - } - return value, nil -} - -// Set stores a value with a TTL. -func (f *FreecacheStore) Set(key string, value []byte, ttl time.Duration) error { - err := f.cache.Set([]byte(key), value, int(ttl.Seconds())) - if err != nil { - return errors.Join(ErrCacheFailedToSet, err) - } - return nil -} - -// Del removes a value by key. -func (f *FreecacheStore) Del(key string) error { - success := f.cache.Del([]byte(key)) - if !success { - return ErrCacheNotFound - } - return nil -} diff --git a/internal/metamorph/health_check_test.go b/internal/metamorph/health_check_test.go index 4f6b6b84b..2dca75711 100644 --- a/internal/metamorph/health_check_test.go +++ b/internal/metamorph/health_check_test.go @@ -14,8 +14,6 @@ import ( storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks" ) -const baseCacheSize = 100 * 1024 * 1024 - func TestCheck(t *testing.T) { tt := []struct { name string diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 7a8e04f13..1f36b0faa 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/coocood/freecache" _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" "github.com/libsv/go-p2p/chaincfg/chainhash" @@ -32,7 +31,6 @@ import ( "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -124,11 +122,9 @@ func TestDoubleSpendDetection(t *testing.T) { require.NoError(t, err) defer metamorphStore.Close(context.Background()) - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, + processor, err := metamorph.NewProcessor(metamorphStore, nil, pm, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index c8dfb9e17..00c646d30 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -417,7 +417,11 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - updateStatusMap(statusUpdatesMap, statusUpdate) + statusUpdatesMap, err := p.updateStatusMap(statusUpdatesMap, statusUpdate) + if err != nil { + p.logger.Error("failed to update status", slog.String("err", err.Error())) + return + } if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { p.checkAndUpdate(ctx, statusUpdatesMap) @@ -429,6 +433,7 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { } case <-ticker.C: if len(statusUpdatesMap) > 0 { + statusUpdatesMap = p.getStatusUpdateMap(statusUpdatesMap) p.checkAndUpdate(ctx, statusUpdatesMap) statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index d50f1ba80..47d7f3bff 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -9,7 +9,7 @@ import ( "github.com/bitcoin-sv/arc/internal/metamorph/store" ) -//lint:file-ignore U1000 Ignore all unused code, functions are temporarily not used +type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus const CacheStatusUpdateKey = "status-updates" @@ -22,7 +22,8 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) { +func (p *Processor) updateStatusMap(statusUpdatesMap StatusUpdateMap, statusUpdate store.UpdateStatus) (StatusUpdateMap, error) { + statusUpdatesMap = p.getStatusUpdateMap(statusUpdatesMap) foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { @@ -32,30 +33,18 @@ func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, sta statusUpdatesMap[statusUpdate.Hash] = statusUpdate } -} - -func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused - statusUpdatesMap := p.getStatusUpdateMap() - - foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] - if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { - if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) + if p.cacheStore != nil { + err := p.setStatusUpdateMap(statusUpdatesMap) + if err != nil { + return statusUpdatesMap, err } - - statusUpdatesMap[statusUpdate.Hash] = statusUpdate - } - - err := p.setStatusUpdateMap(statusUpdatesMap) - if err != nil { - return nil, err } return statusUpdatesMap, nil } -func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { //nolint:unused +func (p *Processor) setStatusUpdateMap(statusUpdatesMap StatusUpdateMap) error { bytes, err := serializeStatusMap(statusUpdatesMap) if err != nil { return err @@ -68,7 +57,12 @@ func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store return nil } -func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { //nolint:unused +func (p *Processor) getStatusUpdateMap(statusUpdateMap StatusUpdateMap) StatusUpdateMap { + // if cache disabled, return the given map + if p.cacheStore == nil { + return statusUpdateMap + } + existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) if err == nil { @@ -79,7 +73,7 @@ func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { } // If the key doesn't exist or there was an error unmarshalling the value return new map - return make(map[chainhash.Hash]store.UpdateStatus) + return make(StatusUpdateMap) } func shouldUpdateStatus(new, found store.UpdateStatus) bool { diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index c37477281..f2e3461f4 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/coocood/freecache" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/stretchr/testify/assert" @@ -19,7 +18,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -38,8 +36,6 @@ func TestNewProcessor(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - tt := []struct { name string store store.MetamorphStore @@ -73,7 +69,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // when - sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, + sut, actualErr := metamorph.NewProcessor(tc.store, nil, tc.pm, nil, metamorph.WithCacheExpiryTime(time.Second*5), metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), ) @@ -125,12 +121,10 @@ func TestStartLockTransactions(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} // when - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) + sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) require.NoError(t, err) defer sut.Shutdown() sut.StartLockTransactions() @@ -228,8 +222,6 @@ func TestProcessTransaction(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ AnnounceTransactionFunc: func(txHash *chainhash.Hash, _ []p2p.PeerI) []p2p.PeerI { require.True(t, testdata.TX1Hash.IsEqual(txHash)) @@ -244,7 +236,7 @@ func TestProcessTransaction(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher)) + sut, err := metamorph.NewProcessor(s, nil, pm, nil, metamorph.WithMessageQueueClient(publisher)) require.NoError(t, err) require.Equal(t, 0, sut.GetProcessorMapSize()) @@ -490,8 +482,6 @@ func TestStartSendStatusForTransaction(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} callbackSender := &mocks.CallbackSenderMock{ @@ -502,7 +492,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { statusMessageChannel := make(chan *metamorph.PeerTxMessage, 10) - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender)) + sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender)) require.NoError(t, err) // when @@ -641,7 +631,6 @@ func TestStartProcessSubmittedTxs(t *testing.T) { }, SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) counter := 0 pm := &mocks.PeerManagerMock{ AnnounceTransactionFunc: func(txHash *chainhash.Hash, _ []p2p.PeerI) []p2p.PeerI { @@ -664,7 +653,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) { } const submittedTxsBuffer = 5 submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer) - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, + sut, err := metamorph.NewProcessor(s, nil, pm, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond), @@ -774,8 +763,6 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ RequestTransactionFunc: func(_ *chainhash.Hash) p2p.PeerI { return nil @@ -792,7 +779,7 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithMaxRetries(10), @@ -873,10 +860,9 @@ func TestStartProcessMinedCallbacks(t *testing.T) { callbackSender := &mocks.CallbackSenderMock{ SendCallbackFunc: func(_ context.Context, _ *store.Data) {}, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) sut, err := metamorph.NewProcessor( metamorphStore, - cStore, + nil, pm, nil, metamorph.WithMinedTxsChan(minedTxsChan), @@ -950,8 +936,6 @@ func TestProcessorHealth(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ AddPeerFunc: func(_ p2p.PeerI) error { return nil @@ -967,7 +951,7 @@ func TestProcessorHealth(t *testing.T) { ShutdownFunc: func() {}, } - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithNow(func() time.Time { return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC) @@ -1019,8 +1003,6 @@ func TestStart(t *testing.T) { return 0, nil }} - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} var subscribeMinedTxsFunction func([]byte) error @@ -1045,7 +1027,7 @@ func TestStart(t *testing.T) { submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2) minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2) - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, metamorph.WithMessageQueueClient(mqClient), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithMinedTxsChan(minedTxsChan), diff --git a/internal/metamorph/stats_collector_test.go b/internal/metamorph/stats_collector_test.go index 7bd40efab..2dd35de12 100644 --- a/internal/metamorph/stats_collector_test.go +++ b/internal/metamorph/stats_collector_test.go @@ -3,8 +3,6 @@ package metamorph_test import ( "context" "errors" - "github.com/bitcoin-sv/arc/internal/cache" - "github.com/coocood/freecache" "log/slog" "os" "testing" @@ -51,11 +49,9 @@ func TestStartCollectStats(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - processor, err := metamorph.NewProcessor(mtmStore, cStore, pm, nil, + processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), metamorph.WithStatCollectionInterval(10*time.Millisecond), ) From 71ab858af72f906921f87e39a6b0fde237b47042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 09:33:39 +0100 Subject: [PATCH 02/26] fix(ARCO-283): fix linter error --- internal/metamorph/processor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 00c646d30..ed99e0833 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -417,12 +417,14 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - statusUpdatesMap, err := p.updateStatusMap(statusUpdatesMap, statusUpdate) + actualUpdateStatusMap, err := p.updateStatusMap(statusUpdatesMap, statusUpdate) if err != nil { p.logger.Error("failed to update status", slog.String("err", err.Error())) return } + statusUpdatesMap = actualUpdateStatusMap + if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { p.checkAndUpdate(ctx, statusUpdatesMap) statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} From d80f8fbec953e78e425f0a02952281501ce31f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 10:18:10 +0100 Subject: [PATCH 03/26] refactor(ARCO-283): use struct instead of plain map --- internal/metamorph/processor.go | 8 ++++---- internal/metamorph/processor_helpers.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index ed99e0833..3005650a1 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -409,7 +409,7 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { go func() { defer p.waitGroup.Done() - statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{} + statusUpdatesMap := StatusUpdateMap{} for { select { @@ -427,7 +427,7 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} + statusUpdatesMap = StatusUpdateMap{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -437,7 +437,7 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { if len(statusUpdatesMap) > 0 { statusUpdatesMap = p.getStatusUpdateMap(statusUpdatesMap) p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} + statusUpdatesMap = StatusUpdateMap{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -448,7 +448,7 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { }() } -func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) { +func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusUpdateMap) { var err error ctx, span := tracing.StartTracing(ctx, "checkAndUpdate", p.tracingEnabled, p.tracingAttributes...) defer func() { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 47d7f3bff..4ee16a1d1 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -130,7 +130,7 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } -func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { //nolint:unused +func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { //nolint:unused serializeMap := make(map[string]store.UpdateStatus) for k, v := range updateStatusMap { serializeMap[k.String()] = v @@ -143,9 +143,9 @@ func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ( return bytes, nil } -func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused +func deserializeStatusMap(data []byte) (StatusUpdateMap, error) { //nolint:unused serializeMap := make(map[string]store.UpdateStatus) - updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus) + updateStatusMap := make(StatusUpdateMap) err := json.Unmarshal(data, &serializeMap) if err != nil { From ccc9822f5572833817d25cddf303b0a5e73136e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 12:55:10 +0100 Subject: [PATCH 04/26] feat(ARCO-283): clear cache after update --- internal/metamorph/processor.go | 5 +++++ internal/metamorph/processor_helpers.go | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 3005650a1..ad67d683a 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -474,6 +474,11 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU if err != nil { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } + + err = p.clearStatusUpdateMap() + if err != nil { + p.logger.Error("failed to clear status update map", slog.String("err", err.Error())) + } } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) (err error) { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 4ee16a1d1..2f856bd2a 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -76,6 +76,15 @@ func (p *Processor) getStatusUpdateMap(statusUpdateMap StatusUpdateMap) StatusUp return make(StatusUpdateMap) } +func (p *Processor) clearStatusUpdateMap() error { + if p.cacheStore == nil { + return nil + } + + return p.cacheStore.Del(CacheStatusUpdateKey) + +} + func shouldUpdateStatus(new, found store.UpdateStatus) bool { if new.Status > found.Status { return true From eb6b402b282d713954c4662a318c7b88e7e91145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 13:14:23 +0100 Subject: [PATCH 05/26] feat(ARCO-283): add in memory cache --- cmd/arc/services/cache.go | 4 +- config/config.go | 4 +- config/defaults.go | 2 +- internal/cache/in-memory.go | 50 +++++++++++++++++++++++++ internal/metamorph/processor.go | 14 ++----- internal/metamorph/processor_helpers.go | 19 +++------- 6 files changed, 65 insertions(+), 28 deletions(-) create mode 100644 internal/cache/in-memory.go diff --git a/cmd/arc/services/cache.go b/cmd/arc/services/cache.go index 1ac37bcda..9bcb09843 100644 --- a/cmd/arc/services/cache.go +++ b/cmd/arc/services/cache.go @@ -14,8 +14,8 @@ var ErrCacheUnknownType = errors.New("unknown cache type") // NewCacheStore creates a new CacheStore based on the provided configuration. func NewCacheStore(cacheConfig *config.CacheConfig) (cache.Store, error) { switch cacheConfig.Engine { - case config.InternalCache: - return nil, nil + case config.InMemory: + return cache.NewMemoryStore(), nil case config.Redis: c := redis.NewClient(&redis.Options{ Addr: cacheConfig.Redis.Addr, diff --git a/config/config.go b/config/config.go index 423686716..8342ba8c7 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,8 @@ import ( ) const ( - InternalCache = "internal" - Redis = "redis" + InMemory = "in-memory" + Redis = "redis" ) type ArcConfig struct { diff --git a/config/defaults.go b/config/defaults.go index 8a5844958..072942904 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -198,7 +198,7 @@ func getCallbackerConfig() *CallbackerConfig { func getCacheConfig() *CacheConfig { return &CacheConfig{ - Engine: InternalCache, // use internal cache + Engine: InMemory, // use in memory cache Redis: &RedisConfig{ // example of Redis config Addr: "localhost:6379", Password: "", diff --git a/internal/cache/in-memory.go b/internal/cache/in-memory.go new file mode 100644 index 000000000..1e3d01e8b --- /dev/null +++ b/internal/cache/in-memory.go @@ -0,0 +1,50 @@ +package cache + +import ( + "sync" + "time" +) + +type MemoryStore struct { + data map[string][]byte + mu sync.RWMutex +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ + data: make(map[string][]byte), + } +} + +// Get retrieves a value by key. It returns an error if the key does not exist. +func (s *MemoryStore) Get(key string) ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + value, found := s.data[key] + if !found { + return nil, ErrCacheNotFound + } + return value, nil +} + +// Set stores a key-value pair, ignoring the ttl parameter. +func (s *MemoryStore) Set(key string, value []byte, ttl time.Duration) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.data[key] = value + return nil +} + +// Del removes a key from the store. +func (s *MemoryStore) Del(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, found := s.data[key]; !found { + return ErrCacheNotFound + } + delete(s.data, key) + return nil +} diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index ad67d683a..cd71f1c65 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -409,35 +409,29 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { go func() { defer p.waitGroup.Done() - statusUpdatesMap := StatusUpdateMap{} - for { select { case <-p.ctx.Done(): return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - actualUpdateStatusMap, err := p.updateStatusMap(statusUpdatesMap, statusUpdate) + actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate) if err != nil { p.logger.Error("failed to update status", slog.String("err", err.Error())) return } - statusUpdatesMap = actualUpdateStatusMap - - if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = StatusUpdateMap{} + if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize { + p.checkAndUpdate(ctx, actualUpdateStatusMap) // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: + statusUpdatesMap := p.getStatusUpdateMap() if len(statusUpdatesMap) > 0 { - statusUpdatesMap = p.getStatusUpdateMap(statusUpdatesMap) p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = StatusUpdateMap{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 2f856bd2a..de96ff5d4 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -22,8 +22,8 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func (p *Processor) updateStatusMap(statusUpdatesMap StatusUpdateMap, statusUpdate store.UpdateStatus) (StatusUpdateMap, error) { - statusUpdatesMap = p.getStatusUpdateMap(statusUpdatesMap) +func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (StatusUpdateMap, error) { + statusUpdatesMap := p.getStatusUpdateMap() foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { @@ -34,11 +34,9 @@ func (p *Processor) updateStatusMap(statusUpdatesMap StatusUpdateMap, statusUpda statusUpdatesMap[statusUpdate.Hash] = statusUpdate } - if p.cacheStore != nil { - err := p.setStatusUpdateMap(statusUpdatesMap) - if err != nil { - return statusUpdatesMap, err - } + err := p.setStatusUpdateMap(statusUpdatesMap) + if err != nil { + return statusUpdatesMap, err } return statusUpdatesMap, nil @@ -57,12 +55,7 @@ func (p *Processor) setStatusUpdateMap(statusUpdatesMap StatusUpdateMap) error { return nil } -func (p *Processor) getStatusUpdateMap(statusUpdateMap StatusUpdateMap) StatusUpdateMap { - // if cache disabled, return the given map - if p.cacheStore == nil { - return statusUpdateMap - } - +func (p *Processor) getStatusUpdateMap() StatusUpdateMap { existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) if err == nil { From 9a3056e17ec71e138ff83c1875a1012f23c2b7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 13:33:06 +0100 Subject: [PATCH 06/26] fix(ARCO-283): fix linter errors --- internal/cache/in-memory.go | 2 +- internal/metamorph/processor_helpers.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/cache/in-memory.go b/internal/cache/in-memory.go index 1e3d01e8b..e6346dc7c 100644 --- a/internal/cache/in-memory.go +++ b/internal/cache/in-memory.go @@ -29,7 +29,7 @@ func (s *MemoryStore) Get(key string) ([]byte, error) { } // Set stores a key-value pair, ignoring the ttl parameter. -func (s *MemoryStore) Set(key string, value []byte, ttl time.Duration) error { +func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index de96ff5d4..c31a85e2b 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -75,7 +75,6 @@ func (p *Processor) clearStatusUpdateMap() error { } return p.cacheStore.Del(CacheStatusUpdateKey) - } func shouldUpdateStatus(new, found store.UpdateStatus) bool { From 9c85066f6f9bd1859abf1f9ff318f4532e3738dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 13 Nov 2024 16:28:48 +0100 Subject: [PATCH 07/26] fix(ARCO-283): fix PR comments --- internal/cache/{in-memory.go => in_memory.go} | 0 .../double_spend_integration_test.go | 4 ++- internal/metamorph/processor.go | 2 +- internal/metamorph/processor_helpers.go | 8 ----- internal/metamorph/processor_test.go | 34 ++++++++++++++----- 5 files changed, 29 insertions(+), 19 deletions(-) rename internal/cache/{in-memory.go => in_memory.go} (100%) diff --git a/internal/cache/in-memory.go b/internal/cache/in_memory.go similarity index 100% rename from internal/cache/in-memory.go rename to internal/cache/in_memory.go diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 1f36b0faa..dd6ce5ef6 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -123,8 +124,9 @@ func TestDoubleSpendDetection(t *testing.T) { defer metamorphStore.Close(context.Background()) pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() - processor, err := metamorph.NewProcessor(metamorphStore, nil, pm, statusMessageChannel, + processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index cd71f1c65..f0a802895 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -469,7 +469,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - err = p.clearStatusUpdateMap() + err = p.cacheStore.Del(CacheStatusUpdateKey) if err != nil { p.logger.Error("failed to clear status update map", slog.String("err", err.Error())) } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index c31a85e2b..01f2e7a29 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -69,14 +69,6 @@ func (p *Processor) getStatusUpdateMap() StatusUpdateMap { return make(StatusUpdateMap) } -func (p *Processor) clearStatusUpdateMap() error { - if p.cacheStore == nil { - return nil - } - - return p.cacheStore.Del(CacheStatusUpdateKey) -} - func shouldUpdateStatus(new, found store.UpdateStatus) bool { if new.Status > found.Status { return true diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index f2e3461f4..90bf3af83 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -18,6 +18,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -35,6 +36,7 @@ func TestNewProcessor(t *testing.T) { } pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() tt := []struct { name string @@ -69,7 +71,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // when - sut, actualErr := metamorph.NewProcessor(tc.store, nil, tc.pm, nil, + sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, metamorph.WithCacheExpiryTime(time.Second*5), metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), ) @@ -123,8 +125,10 @@ func TestStartLockTransactions(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + // when - sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) require.NoError(t, err) defer sut.Shutdown() sut.StartLockTransactions() @@ -230,13 +234,15 @@ func TestProcessTransaction(t *testing.T) { RequestTransactionFunc: func(_ *chainhash.Hash) p2p.PeerI { return nil }, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil }, } - sut, err := metamorph.NewProcessor(s, nil, pm, nil, metamorph.WithMessageQueueClient(publisher)) + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher)) require.NoError(t, err) require.Equal(t, 0, sut.GetProcessorMapSize()) @@ -484,6 +490,8 @@ func TestStartSendStatusForTransaction(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + callbackSender := &mocks.CallbackSenderMock{ SendCallbackFunc: func(_ context.Context, _ *store.Data) { callbackSent <- struct{}{} @@ -492,7 +500,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { statusMessageChannel := make(chan *metamorph.PeerTxMessage, 10) - sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender)) + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender)) require.NoError(t, err) // when @@ -646,6 +654,8 @@ func TestStartProcessSubmittedTxs(t *testing.T) { ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil @@ -653,7 +663,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) { } const submittedTxsBuffer = 5 submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer) - sut, err := metamorph.NewProcessor(s, nil, pm, nil, + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond), @@ -773,13 +783,15 @@ func TestProcessExpiredTransactions(t *testing.T) { ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil }, } - sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithMaxRetries(10), @@ -860,9 +872,10 @@ func TestStartProcessMinedCallbacks(t *testing.T) { callbackSender := &mocks.CallbackSenderMock{ SendCallbackFunc: func(_ context.Context, _ *store.Data) {}, } + cStore := cache.NewMemoryStore() sut, err := metamorph.NewProcessor( metamorphStore, - nil, + cStore, pm, nil, metamorph.WithMinedTxsChan(minedTxsChan), @@ -950,8 +963,9 @@ func TestProcessorHealth(t *testing.T) { }, ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() - sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithNow(func() time.Time { return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC) @@ -1005,6 +1019,8 @@ func TestStart(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + var subscribeMinedTxsFunction func([]byte) error var subscribeSubmitTxsFunction func([]byte) error mqClient := &mocks.MessageQueueClientMock{ @@ -1027,7 +1043,7 @@ func TestStart(t *testing.T) { submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2) minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2) - sut, err := metamorph.NewProcessor(metamorphStore, nil, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithMessageQueueClient(mqClient), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithMinedTxsChan(minedTxsChan), From 485328caf048075a21502c960d5ad1bfbcc5ad1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 14 Nov 2024 12:06:54 +0100 Subject: [PATCH 08/26] refactor(ARCO-283): use sync map in memory cache --- config/example_config.yaml | 4 ++-- internal/cache/cache.go | 9 +++++---- internal/cache/in_memory.go | 34 +++++++++++++--------------------- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/config/example_config.yaml b/config/example_config.yaml index 0b56f7d41..10b9b979c 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -48,8 +48,8 @@ broadcasting: # settings for connection to nodes p2p: 18335 cache: - engine: redis - redis: + engine: redis # cache engine - in-memory/redis + redis: # redis cache configuration in case that engine: redis addr: "localhost:6379" password: "" db: 1 diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 23e71ca7a..7a16f9ac4 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,10 +6,11 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToMarshalValue = errors.New("failed to marshal value") ) type Store interface { diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index e6346dc7c..6781f3bad 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -1,50 +1,42 @@ package cache import ( + "encoding/json" "sync" "time" ) type MemoryStore struct { - data map[string][]byte - mu sync.RWMutex + data sync.Map } func NewMemoryStore() *MemoryStore { - return &MemoryStore{ - data: make(map[string][]byte), - } + return &MemoryStore{} } // Get retrieves a value by key. It returns an error if the key does not exist. func (s *MemoryStore) Get(key string) ([]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - value, found := s.data[key] + value, found := s.data.Load(key) if !found { return nil, ErrCacheNotFound } - return value, nil + + bytes, err := json.Marshal(value) + if err != nil { + return nil, ErrCacheFailedToMarshalValue + } + + return bytes, nil } // Set stores a key-value pair, ignoring the ttl parameter. func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { - s.mu.Lock() - defer s.mu.Unlock() - - s.data[key] = value + s.data.Store(key, value) return nil } // Del removes a key from the store. func (s *MemoryStore) Del(key string) error { - s.mu.Lock() - defer s.mu.Unlock() - - if _, found := s.data[key]; !found { - return ErrCacheNotFound - } - delete(s.data, key) + s.data.Delete(key) return nil } From f49c7abe0ca9fc0b2e31fe0e7230547de7550122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 14 Nov 2024 12:49:10 +0100 Subject: [PATCH 09/26] test(ARCO-283): fix unit tests --- internal/cache/in_memory.go | 6 +----- internal/metamorph/processor_helpers.go | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 6781f3bad..9d5c327e7 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -1,7 +1,6 @@ package cache import ( - "encoding/json" "sync" "time" ) @@ -21,10 +20,7 @@ func (s *MemoryStore) Get(key string) ([]byte, error) { return nil, ErrCacheNotFound } - bytes, err := json.Marshal(value) - if err != nil { - return nil, ErrCacheFailedToMarshalValue - } + bytes := value.([]byte) return bytes, nil } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 01f2e7a29..e95bebe09 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -123,7 +123,7 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } -func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { //nolint:unused +func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { serializeMap := make(map[string]store.UpdateStatus) for k, v := range updateStatusMap { serializeMap[k.String()] = v @@ -136,7 +136,7 @@ func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { //nol return bytes, nil } -func deserializeStatusMap(data []byte) (StatusUpdateMap, error) { //nolint:unused +func deserializeStatusMap(data []byte) (StatusUpdateMap, error) { serializeMap := make(map[string]store.UpdateStatus) updateStatusMap := make(StatusUpdateMap) From c3f74ba8146c2a921ee6fe9db501a3d389c7f6c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 14 Nov 2024 16:29:16 +0100 Subject: [PATCH 10/26] feat(ARCO-283): save separate transactions in cache --- internal/cache/cache.go | 13 +-- internal/cache/in_memory.go | 17 +++- internal/cache/redis.go | 34 ++++++- internal/metamorph/processor.go | 19 ++-- internal/metamorph/processor_helpers.go | 122 +++++++++++++----------- 5 files changed, 126 insertions(+), 79 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7a16f9ac4..25d42b98a 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,15 +6,16 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") - ErrCacheFailedToMarshalValue = errors.New("failed to marshal value") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") ) type Store interface { Get(key string) ([]byte, error) + GetAllWithPrefix(prefix string) (map[string][]byte, error) Set(key string, value []byte, ttl time.Duration) error - Del(key string) error + Del(keys ...string) error } diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 9d5c327e7..35f45a78a 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -32,7 +32,20 @@ func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { } // Del removes a key from the store. -func (s *MemoryStore) Del(key string) error { - s.data.Delete(key) +func (s *MemoryStore) Del(keys ...string) error { + for _, k := range keys { + s.data.Delete(k) + } return nil } +func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { + keys := make(map[string][]byte) + s.data.Range(func(k, v interface{}) bool { + key := k.(string) + if key[:len(prefix)] == prefix { + keys[key] = v.([]byte) + } + return true + }) + return keys, nil +} diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 932f4ace0..eb5832b2c 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -43,8 +43,8 @@ func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error { } // Del removes a value by key. -func (r *RedisStore) Del(key string) error { - result, err := r.client.Del(r.ctx, key).Result() +func (r *RedisStore) Del(keys ...string) error { + result, err := r.client.Del(r.ctx, keys...).Result() if err != nil { return errors.Join(ErrCacheFailedToDel, err) } @@ -53,3 +53,33 @@ func (r *RedisStore) Del(key string) error { } return nil } + +// GetAllWithPrefix retrieves all key-value pairs that match a specific prefix. +func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { + var cursor uint64 + results := make(map[string][]byte) + + for { + keys, newCursor, err := r.client.Scan(r.ctx, cursor, prefix+"*", 10).Result() + if err != nil { + return nil, errors.Join(ErrCacheFailedToScan, err) + } + + for _, key := range keys { + value, err := r.client.Get(r.ctx, key).Result() + if errors.Is(err, redis.Nil) { + // Key has been removed between SCAN and GET, skip it + continue + } else if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + results[key] = []byte(value) + } + + cursor = newCursor + if cursor == 0 { + break + } + } + return results, nil +} diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index f0a802895..e0a7a4d4a 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -415,21 +415,18 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate) + err := p.updateStatusMap(statusUpdate) if err != nil { p.logger.Error("failed to update status", slog.String("err", err.Error())) return } - - if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(ctx, actualUpdateStatusMap) - - // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. - // This prevents unnecessary immediate updates and maintains the intended time interval between batches. - ticker.Reset(p.processStatusUpdatesInterval) - } case <-ticker.C: - statusUpdatesMap := p.getStatusUpdateMap() + statusUpdatesMap, err := p.getAllTransactionStatuses() + if err != nil { + p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) + return + } + if len(statusUpdatesMap) > 0 { p.checkAndUpdate(ctx, statusUpdatesMap) @@ -469,7 +466,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - err = p.cacheStore.Del(CacheStatusUpdateKey) + err = p.clearCache(statusUpdatesMap) if err != nil { p.logger.Error("failed to clear status update map", slog.String("err", err.Error())) } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index e95bebe09..2b47910a2 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -6,12 +6,16 @@ import ( "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph/store" ) type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus -const CacheStatusUpdateKey = "status-updates" +const ( + CacheStatusUpdateKey = "status-updates" + CacheTxPrefix = "tx-" +) var ( ErrFailedToSerialize = errors.New("failed to serialize value") @@ -22,51 +26,86 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (StatusUpdateMap, error) { - statusUpdatesMap := p.getStatusUpdateMap() - foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] +func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) error { + currentStatusUpdate, err := p.getTransactionStatus(statusUpdate.Hash) + if err != nil { + if errors.Is(err, cache.ErrCacheNotFound) { + // if record doesn't exist, save new one + return p.setTransactionStatus(statusUpdate) + } + return err + } - if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { + if shouldUpdateStatus(statusUpdate, *currentStatusUpdate) { if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) + statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs) } - - statusUpdatesMap[statusUpdate.Hash] = statusUpdate + // TODO: combine status history } - err := p.setStatusUpdateMap(statusUpdatesMap) + return p.setTransactionStatus(statusUpdate) +} + +func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { + bytes, err := json.Marshal(status) if err != nil { - return statusUpdatesMap, err + return errors.Join(ErrFailedToSerialize, err) } - return statusUpdatesMap, nil + err = p.cacheStore.Set(CacheTxPrefix+status.Hash.String(), bytes, processStatusUpdatesIntervalDefault) + if err != nil { + return err + } + return nil } -func (p *Processor) setStatusUpdateMap(statusUpdatesMap StatusUpdateMap) error { - bytes, err := serializeStatusMap(statusUpdatesMap) +func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) { + bytes, err := p.cacheStore.Get(CacheTxPrefix + hash.String()) if err != nil { - return err + return nil, err } - err = p.cacheStore.Set(CacheStatusUpdateKey, bytes, processStatusUpdatesIntervalDefault) + var status store.UpdateStatus + err = json.Unmarshal(bytes, &status) if err != nil { - return err + return nil, err } - return nil + + return &status, nil } -func (p *Processor) getStatusUpdateMap() StatusUpdateMap { - existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) +func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { + statuses := make(StatusUpdateMap) + keys, err := p.cacheStore.GetAllWithPrefix(CacheTxPrefix) + if err != nil { + return nil, err + } - if err == nil { - statusUpdatesMap, err := deserializeStatusMap(existingMap) - if err == nil { - return statusUpdatesMap + for key, value := range keys { + hash, err := chainhash.NewHashFromStr(key[len(CacheTxPrefix):]) + if err != nil { + return nil, err + } + + var status store.UpdateStatus + err = json.Unmarshal(value, &status) + if err != nil { + return nil, err } + + statuses[*hash] = status + } + + return statuses, nil +} + +func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error { + keys := make([]string, len(updateStatusMap)) + for k, _ := range updateStatusMap { + keys = append(keys, CacheTxPrefix+k.String()) } - // If the key doesn't exist or there was an error unmarshalling the value return new map - return make(StatusUpdateMap) + return p.cacheStore.Del(keys...) } func shouldUpdateStatus(new, found store.UpdateStatus) bool { @@ -122,36 +161,3 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } - -func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { - serializeMap := make(map[string]store.UpdateStatus) - for k, v := range updateStatusMap { - serializeMap[k.String()] = v - } - - bytes, err := json.Marshal(serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToSerialize, err) - } - return bytes, nil -} - -func deserializeStatusMap(data []byte) (StatusUpdateMap, error) { - serializeMap := make(map[string]store.UpdateStatus) - updateStatusMap := make(StatusUpdateMap) - - err := json.Unmarshal(data, &serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - - for k, v := range serializeMap { - hash, err := chainhash.NewHashFromStr(k) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - updateStatusMap[*hash] = v - } - - return updateStatusMap, nil -} From b1573064b1f7d07445ce91623157addfce45c870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 14 Nov 2024 17:05:58 +0100 Subject: [PATCH 11/26] fix(ARCO-283): fix build and linter errors --- internal/cache/in_memory.go | 16 +++++++++++++--- internal/metamorph/processor_helpers.go | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 35f45a78a..e410c71dd 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -20,7 +20,10 @@ func (s *MemoryStore) Get(key string) ([]byte, error) { return nil, ErrCacheNotFound } - bytes := value.([]byte) + bytes, ok := value.([]byte) + if !ok { + return nil, ErrCacheFailedToGet + } return bytes, nil } @@ -41,9 +44,16 @@ func (s *MemoryStore) Del(keys ...string) error { func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { keys := make(map[string][]byte) s.data.Range(func(k, v interface{}) bool { - key := k.(string) + key, ok := k.(string) + if !ok { + return false + } if key[:len(prefix)] == prefix { - keys[key] = v.([]byte) + value, ok := v.([]byte) + if !ok { + return false + } + keys[key] = value } return true }) diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 2b47910a2..da4941867 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -101,7 +101,7 @@ func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error { keys := make([]string, len(updateStatusMap)) - for k, _ := range updateStatusMap { + for k := range updateStatusMap { keys = append(keys, CacheTxPrefix+k.String()) } From b37d5ee3e55814472e29704ddca47285655ea1b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 15 Nov 2024 10:44:33 +0100 Subject: [PATCH 12/26] feat(ARCO-283): add option to store data in cache for one hash --- internal/cache/cache.go | 20 ++--- internal/cache/in_memory.go | 102 +++++++++++++++++++----- internal/cache/redis.go | 64 +++++++++++++-- internal/metamorph/processor.go | 24 +++++- internal/metamorph/processor_helpers.go | 26 +++--- 5 files changed, 187 insertions(+), 49 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 25d42b98a..ef6e501d2 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,16 +6,18 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") - ErrCacheFailedToScan = errors.New("failed to scan cache") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") + ErrCacheFailedToGetCount = errors.New("failed to get count from cache") ) type Store interface { - Get(key string) ([]byte, error) - GetAllWithPrefix(prefix string) (map[string][]byte, error) - Set(key string, value []byte, ttl time.Duration) error - Del(keys ...string) error + Get(hash *string, key string) ([]byte, error) + GetAllForHash(hash string) (map[string][]byte, error) + Set(hash *string, key string, value []byte, ttl time.Duration) error + Del(hash *string, keys ...string) error + CountElementsForHash(hash string) (int64, error) } diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index e410c71dd..bf4c05a83 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -1,6 +1,7 @@ package cache import ( + "errors" "sync" "time" ) @@ -13,8 +14,27 @@ func NewMemoryStore() *MemoryStore { return &MemoryStore{} } -// Get retrieves a value by key. It returns an error if the key does not exist. -func (s *MemoryStore) Get(key string) ([]byte, error) { +// Get retrieves a value by key and hash (if given) +func (s *MemoryStore) Get(hash *string, key string) ([]byte, error) { + if hash != nil { + hashValue, found := s.data.Load(hash) + if !found { + return nil, ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + fieldValue, exists := hashMap[key] + if !exists { + return nil, ErrCacheNotFound + } + + return fieldValue, nil + } + value, found := s.data.Load(key) if !found { return nil, ErrCacheNotFound @@ -29,33 +49,73 @@ func (s *MemoryStore) Get(key string) ([]byte, error) { } // Set stores a key-value pair, ignoring the ttl parameter. -func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { +func (s *MemoryStore) Set(hash *string, key string, value []byte, _ time.Duration) error { + if hash != nil { + raw, _ := s.data.LoadOrStore(*hash, make(map[string][]byte)) + + hashMap, ok := raw.(map[string][]byte) + if !ok { + return ErrCacheFailedToSet + } + + hashMap[key] = value + + s.data.Store(*hash, hashMap) + return nil + } + s.data.Store(key, value) return nil } // Del removes a key from the store. -func (s *MemoryStore) Del(keys ...string) error { +func (s *MemoryStore) Del(hash *string, keys ...string) error { + if hash != nil { + hashValue, found := s.data.Load(*hash) + if !found { + return ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return errors.Join(ErrCacheFailedToDel, ErrCacheFailedToGet) + } + + for _, k := range keys { + delete(hashMap, k) + } + + s.data.Store(*hash, hashMap) + return nil + } + for _, k := range keys { s.data.Delete(k) } return nil } -func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { - keys := make(map[string][]byte) - s.data.Range(func(k, v interface{}) bool { - key, ok := k.(string) - if !ok { - return false - } - if key[:len(prefix)] == prefix { - value, ok := v.([]byte) - if !ok { - return false - } - keys[key] = value - } - return true - }) - return keys, nil + +// GetAllForHash retrieves all key-value pairs for a specific hash. +func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) { + hashValue, found := s.data.Load(hash) + if !found { + return nil, ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + return hashMap, nil +} + +// CountElementsForHash returns the number of elements in a hash in memory. +func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) { + hashMap, err := s.GetAllForHash(hash) + if err != nil { + return 0, err + } + + return int64(len(hashMap)), nil } diff --git a/internal/cache/redis.go b/internal/cache/redis.go index eb5832b2c..2247567f5 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -22,29 +22,54 @@ func NewRedisStore(ctx context.Context, c redis.UniversalClient) *RedisStore { } } -// Get retrieves a value by key. -func (r *RedisStore) Get(key string) ([]byte, error) { - result, err := r.client.Get(r.ctx, key).Result() +// Get retrieves a value by key and hash (if given). +func (r *RedisStore) Get(hash *string, key string) ([]byte, error) { + var result string + var err error + + if hash == nil { + result, err = r.client.Get(r.ctx, key).Result() + } else { + result, err = r.client.HGet(r.ctx, *hash, key).Result() + } + if errors.Is(err, redis.Nil) { return nil, ErrCacheNotFound } else if err != nil { return nil, errors.Join(ErrCacheFailedToGet, err) } + return []byte(result), nil } -// Set stores a value with a TTL. -func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error { - err := r.client.Set(r.ctx, key, value, ttl).Err() +// Set stores a value with a TTL for a specific hash (if given). +func (r *RedisStore) Set(hash *string, key string, value []byte, ttl time.Duration) error { + var err error + + if hash == nil { + err = r.client.Set(r.ctx, key, value, ttl).Err() + } else { + err = r.client.HSet(r.ctx, *hash, key, value).Err() + } + if err != nil { return errors.Join(ErrCacheFailedToSet, err) } + return nil } // Del removes a value by key. -func (r *RedisStore) Del(keys ...string) error { - result, err := r.client.Del(r.ctx, keys...).Result() +func (r *RedisStore) Del(hash *string, keys ...string) error { + var result int64 + var err error + + if hash == nil { + result, err = r.client.Del(r.ctx, keys...).Result() + } else { + result, err = r.client.HDel(r.ctx, *hash, keys...).Result() + } + if err != nil { return errors.Join(ErrCacheFailedToDel, err) } @@ -83,3 +108,26 @@ func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) } return results, nil } + +// GetAllForHash retrieves all key-value pairs for a specific hash. +func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { + values, err := r.client.HGetAll(r.ctx, hash).Result() + if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + + result := make(map[string][]byte) + for field, value := range values { + result[field] = []byte(value) + } + return result, nil +} + +// CountElementsForHash returns the number of elements in a hash. +func (r *RedisStore) CountElementsForHash(hash string) (int64, error) { + count, err := r.client.HLen(r.ctx, hash).Result() + if err != nil { + return 0, errors.Join(ErrCacheFailedToGetCount, err) + } + return count, nil +} diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index e0a7a4d4a..45764146f 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -420,6 +420,28 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { p.logger.Error("failed to update status", slog.String("err", err.Error())) return } + + statusUpdateCount, err := p.getStatusUpdateCount() + if err != nil { + p.logger.Error("failed to get status update count", slog.String("err", err.Error())) + return + } + + if statusUpdateCount >= p.processStatusUpdatesBatchSize { + statusUpdatesMap, err := p.getAllTransactionStatuses() + if err != nil { + p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) + return + } + + fmt.Println("statusUpdatesMap: ", statusUpdatesMap) + + p.checkAndUpdate(ctx, statusUpdatesMap) + + // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. + // This prevents unnecessary immediate updates and maintains the intended time interval between batches. + ticker.Reset(p.processStatusUpdatesInterval) + } case <-ticker.C: statusUpdatesMap, err := p.getAllTransactionStatuses() if err != nil { @@ -468,7 +490,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU err = p.clearCache(statusUpdatesMap) if err != nil { - p.logger.Error("failed to clear status update map", slog.String("err", err.Error())) + p.logger.Error("failed to clear status update cache", slog.String("err", err.Error())) } } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index da4941867..0c09c11f6 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -12,10 +12,7 @@ import ( type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus -const ( - CacheStatusUpdateKey = "status-updates" - CacheTxPrefix = "tx-" -) +var CacheStatusUpdateHash = "status-update" var ( ErrFailedToSerialize = errors.New("failed to serialize value") @@ -52,7 +49,7 @@ func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { return errors.Join(ErrFailedToSerialize, err) } - err = p.cacheStore.Set(CacheTxPrefix+status.Hash.String(), bytes, processStatusUpdatesIntervalDefault) + err = p.cacheStore.Set(&CacheStatusUpdateHash, status.Hash.String(), bytes, processStatusUpdatesIntervalDefault) if err != nil { return err } @@ -60,7 +57,7 @@ func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { } func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) { - bytes, err := p.cacheStore.Get(CacheTxPrefix + hash.String()) + bytes, err := p.cacheStore.Get(&CacheStatusUpdateHash, hash.String()) if err != nil { return nil, err } @@ -76,13 +73,13 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { statuses := make(StatusUpdateMap) - keys, err := p.cacheStore.GetAllWithPrefix(CacheTxPrefix) + keys, err := p.cacheStore.GetAllForHash(CacheStatusUpdateHash) if err != nil { return nil, err } for key, value := range keys { - hash, err := chainhash.NewHashFromStr(key[len(CacheTxPrefix):]) + hash, err := chainhash.NewHashFromStr(key) if err != nil { return nil, err } @@ -102,10 +99,19 @@ func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error { keys := make([]string, len(updateStatusMap)) for k := range updateStatusMap { - keys = append(keys, CacheTxPrefix+k.String()) + keys = append(keys, k.String()) + } + + return p.cacheStore.Del(&CacheStatusUpdateHash, keys...) +} + +func (p *Processor) getStatusUpdateCount() (int, error) { + count, err := p.cacheStore.CountElementsForHash(CacheStatusUpdateHash) + if err != nil { + return 0, err } - return p.cacheStore.Del(keys...) + return int(count), nil } func shouldUpdateStatus(new, found store.UpdateStatus) bool { From 44f425fa8c3ffb36fb5df0556e447c0cbf8eed8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 15 Nov 2024 10:57:51 +0100 Subject: [PATCH 13/26] chore(ARCO-283): remove unused code --- internal/cache/redis.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 2247567f5..fc12b6226 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -79,36 +79,6 @@ func (r *RedisStore) Del(hash *string, keys ...string) error { return nil } -// GetAllWithPrefix retrieves all key-value pairs that match a specific prefix. -func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { - var cursor uint64 - results := make(map[string][]byte) - - for { - keys, newCursor, err := r.client.Scan(r.ctx, cursor, prefix+"*", 10).Result() - if err != nil { - return nil, errors.Join(ErrCacheFailedToScan, err) - } - - for _, key := range keys { - value, err := r.client.Get(r.ctx, key).Result() - if errors.Is(err, redis.Nil) { - // Key has been removed between SCAN and GET, skip it - continue - } else if err != nil { - return nil, errors.Join(ErrCacheFailedToGet, err) - } - results[key] = []byte(value) - } - - cursor = newCursor - if cursor == 0 { - break - } - } - return results, nil -} - // GetAllForHash retrieves all key-value pairs for a specific hash. func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { values, err := r.client.HGetAll(r.ctx, hash).Result() From 6019cefcedef0d6d7509adfbe7519ee7baa767d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 15 Nov 2024 11:24:49 +0100 Subject: [PATCH 14/26] chore(ARCO-283): remove log --- internal/metamorph/processor.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 45764146f..5a99933fa 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -434,8 +434,6 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { return } - fmt.Println("statusUpdatesMap: ", statusUpdatesMap) - p.checkAndUpdate(ctx, statusUpdatesMap) // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. From 6996f76f384d941cceb86cb5bd626498e4c8a759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Tue, 19 Nov 2024 09:31:58 +0100 Subject: [PATCH 15/26] feat(ARCO-283): add transaction for redis operations --- internal/cache/cache.go | 14 +++++++----- internal/cache/in_memory.go | 15 +++++++++++++ internal/cache/redis.go | 21 ++++++++++++++++++ internal/metamorph/processor.go | 29 +++++++++++++++---------- internal/metamorph/processor_helpers.go | 4 ++-- 5 files changed, 64 insertions(+), 19 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index ef6e501d2..5c8ab9034 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,17 +6,19 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") - ErrCacheFailedToScan = errors.New("failed to scan cache") - ErrCacheFailedToGetCount = errors.New("failed to get count from cache") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") + ErrCacheFailedToGetCount = errors.New("failed to get count from cache") + ErrCacheFailedToExecuteTx = errors.New("failed to execute transaction") ) type Store interface { Get(hash *string, key string) ([]byte, error) GetAllForHash(hash string) (map[string][]byte, error) + GetAllForHashAndDelete(hash string) (map[string][]byte, error) Set(hash *string, key string, value []byte, ttl time.Duration) error Del(hash *string, keys ...string) error CountElementsForHash(hash string) (int64, error) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index bf4c05a83..2140d3474 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -110,6 +110,21 @@ func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) { return hashMap, nil } +// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and deletes the hash. +func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { + hashMap, err := s.GetAllForHash(hash) + if err != nil { + return nil, err + } + + err = s.Del(nil, hash) + if err != nil { + return nil, err + } + + return hashMap, nil +} + // CountElementsForHash returns the number of elements in a hash in memory. func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) { hashMap, err := s.GetAllForHash(hash) diff --git a/internal/cache/redis.go b/internal/cache/redis.go index fc12b6226..259345579 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -93,6 +93,27 @@ func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { return result, nil } +// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction. +func (r *RedisStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { + tx := r.client.TxPipeline() + + getAllCmd := tx.HGetAll(r.ctx, hash) + tx.Del(r.ctx, hash) + + _, err := tx.Exec(r.ctx) + if err != nil { + return nil, errors.Join(ErrCacheFailedToExecuteTx, err) + } + + getAllCmdResult := getAllCmd.Val() + + result := make(map[string][]byte) + for field, value := range getAllCmdResult { + result[field] = []byte(value) + } + return result, nil +} + // CountElementsForHash returns the number of elements in a hash. func (r *RedisStore) CountElementsForHash(hash string) (int64, error) { count, err := r.client.HLen(r.ctx, hash).Result() diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 5a99933fa..db6fede3f 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -428,27 +428,27 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { } if statusUpdateCount >= p.processStatusUpdatesBatchSize { - statusUpdatesMap, err := p.getAllTransactionStatuses() + err := p.checkAndUpdate(ctx) if err != nil { - p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) - return + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) } - p.checkAndUpdate(ctx, statusUpdatesMap) - // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: - statusUpdatesMap, err := p.getAllTransactionStatuses() + statusUpdateCount, err := p.getStatusUpdateCount() if err != nil { - p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) + p.logger.Error("failed to get status update count", slog.String("err", err.Error())) return } - if len(statusUpdatesMap) > 0 { - p.checkAndUpdate(ctx, statusUpdatesMap) + if statusUpdateCount > 0 { + err := p.checkAndUpdate(ctx) + if err != nil { + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) + } // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -459,15 +459,20 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { }() } -func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusUpdateMap) { +func (p *Processor) checkAndUpdate(ctx context.Context) error { var err error ctx, span := tracing.StartTracing(ctx, "checkAndUpdate", p.tracingEnabled, p.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() + statusUpdatesMap, err := p.getAndDeleteAllTransactionStatuses() + if err != nil { + return err + } + if len(statusUpdatesMap) == 0 { - return + return nil } statusUpdates := make([]store.UpdateStatus, 0, len(statusUpdatesMap)) @@ -490,6 +495,8 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU if err != nil { p.logger.Error("failed to clear status update cache", slog.String("err", err.Error())) } + + return nil } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) (err error) { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 0c09c11f6..affb95315 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -71,9 +71,9 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat return &status, nil } -func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { +func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error) { statuses := make(StatusUpdateMap) - keys, err := p.cacheStore.GetAllForHash(CacheStatusUpdateHash) + keys, err := p.cacheStore.GetAllForHashAndDelete(CacheStatusUpdateHash) if err != nil { return nil, err } From 00495df11f8e5c3512deefdc141f8e3716a603c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Tue, 19 Nov 2024 09:51:00 +0100 Subject: [PATCH 16/26] chore(ARCO-283): check if key exists when counting records --- internal/cache/in_memory.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 2140d3474..8f6b11113 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -129,6 +129,9 @@ func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, er func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) { hashMap, err := s.GetAllForHash(hash) if err != nil { + if errors.Is(err, ErrCacheNotFound) { + return 0, nil + } return 0, err } From 73fdadca3515b41e605c5eabd9f3ad53c437726a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Tue, 19 Nov 2024 10:52:47 +0100 Subject: [PATCH 17/26] chore(ARCO-283): remove clear cache --- internal/metamorph/processor.go | 5 ----- internal/metamorph/processor_helpers.go | 9 --------- 2 files changed, 14 deletions(-) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index db6fede3f..5c2642b76 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -491,11 +491,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context) error { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - err = p.clearCache(statusUpdatesMap) - if err != nil { - p.logger.Error("failed to clear status update cache", slog.String("err", err.Error())) - } - return nil } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index affb95315..717d912b9 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -96,15 +96,6 @@ func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error return statuses, nil } -func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error { - keys := make([]string, len(updateStatusMap)) - for k := range updateStatusMap { - keys = append(keys, k.String()) - } - - return p.cacheStore.Del(&CacheStatusUpdateHash, keys...) -} - func (p *Processor) getStatusUpdateCount() (int, error) { count, err := p.cacheStore.CountElementsForHash(CacheStatusUpdateHash) if err != nil { From 1988ea396edf16486719083118598139be9342d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 20 Nov 2024 10:06:51 +0100 Subject: [PATCH 18/26] feat(ARCO-283): split cache methods --- internal/cache/cache.go | 16 ++-- internal/cache/in_memory.go | 121 ++++++++++++------------ internal/cache/redis.go | 84 +++++++++------- internal/metamorph/processor_helpers.go | 8 +- 4 files changed, 127 insertions(+), 102 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 5c8ab9034..f3b09ef3e 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -16,10 +16,14 @@ var ( ) type Store interface { - Get(hash *string, key string) ([]byte, error) - GetAllForHash(hash string) (map[string][]byte, error) - GetAllForHashAndDelete(hash string) (map[string][]byte, error) - Set(hash *string, key string, value []byte, ttl time.Duration) error - Del(hash *string, keys ...string) error - CountElementsForHash(hash string) (int64, error) + Get(key string) ([]byte, error) + Set(key string, value []byte, ttl time.Duration) error + Del(keys ...string) error + + MapGet(hash string, key string) ([]byte, error) + MapGetAll(hash string) (map[string][]byte, error) + MapSet(hash string, key string, value []byte) error + MapDel(hash string, keys ...string) error + MapLen(hash string) (int64, error) + MapExtractAll(hash string) (map[string][]byte, error) } diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 8f6b11113..71b15b6fb 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -14,27 +14,8 @@ func NewMemoryStore() *MemoryStore { return &MemoryStore{} } -// Get retrieves a value by key and hash (if given) -func (s *MemoryStore) Get(hash *string, key string) ([]byte, error) { - if hash != nil { - hashValue, found := s.data.Load(hash) - if !found { - return nil, ErrCacheNotFound - } - - hashMap, ok := hashValue.(map[string][]byte) - if !ok { - return nil, ErrCacheFailedToGet - } - - fieldValue, exists := hashMap[key] - if !exists { - return nil, ErrCacheNotFound - } - - return fieldValue, nil - } - +// Get retrieves a value by key. +func (s *MemoryStore) Get(key string) ([]byte, error) { value, found := s.data.Load(key) if !found { return nil, ErrCacheNotFound @@ -49,54 +30,76 @@ func (s *MemoryStore) Get(hash *string, key string) ([]byte, error) { } // Set stores a key-value pair, ignoring the ttl parameter. -func (s *MemoryStore) Set(hash *string, key string, value []byte, _ time.Duration) error { - if hash != nil { - raw, _ := s.data.LoadOrStore(*hash, make(map[string][]byte)) +func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { + s.data.Store(key, value) + return nil +} - hashMap, ok := raw.(map[string][]byte) - if !ok { - return ErrCacheFailedToSet - } +// Del removes a key from the store. +func (s *MemoryStore) Del(keys ...string) error { + for _, k := range keys { + s.data.Delete(k) + } + return nil +} - hashMap[key] = value +// MapGet retrieves a value by key and hash. +func (s *MemoryStore) MapGet(hash string, key string) ([]byte, error) { + hashValue, found := s.data.Load(hash) + if !found { + return nil, ErrCacheNotFound + } - s.data.Store(*hash, hashMap) - return nil + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet } - s.data.Store(key, value) - return nil + fieldValue, exists := hashMap[key] + if !exists { + return nil, ErrCacheNotFound + } + + return fieldValue, nil } -// Del removes a key from the store. -func (s *MemoryStore) Del(hash *string, keys ...string) error { - if hash != nil { - hashValue, found := s.data.Load(*hash) - if !found { - return ErrCacheNotFound - } +// MapSet stores a key-value pair for specific hash. +func (s *MemoryStore) MapSet(hash string, key string, value []byte) error { + raw, _ := s.data.LoadOrStore(hash, make(map[string][]byte)) - hashMap, ok := hashValue.(map[string][]byte) - if !ok { - return errors.Join(ErrCacheFailedToDel, ErrCacheFailedToGet) - } + hashMap, ok := raw.(map[string][]byte) + if !ok { + return ErrCacheFailedToSet + } - for _, k := range keys { - delete(hashMap, k) - } + hashMap[key] = value - s.data.Store(*hash, hashMap) - return nil + s.data.Store(hash, hashMap) + return nil +} + +// MapDel removes a value by key in specific hash. +func (s *MemoryStore) MapDel(hash string, keys ...string) error { + hashValue, found := s.data.Load(hash) + if !found { + return ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return errors.Join(ErrCacheFailedToDel, ErrCacheFailedToGet) } for _, k := range keys { - s.data.Delete(k) + delete(hashMap, k) } + + s.data.Store(hash, hashMap) return nil } -// GetAllForHash retrieves all key-value pairs for a specific hash. -func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) { +// MapGetAll retrieves all key-value pairs for a specific hash. +func (s *MemoryStore) MapGetAll(hash string) (map[string][]byte, error) { hashValue, found := s.data.Load(hash) if !found { return nil, ErrCacheNotFound @@ -110,14 +113,14 @@ func (s *MemoryStore) GetAllForHash(hash string) (map[string][]byte, error) { return hashMap, nil } -// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and deletes the hash. -func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { - hashMap, err := s.GetAllForHash(hash) +// MapExtractAll retrieves all key-value pairs for a specific hash and deletes the hash. +func (s *MemoryStore) MapExtractAll(hash string) (map[string][]byte, error) { + hashMap, err := s.MapGetAll(hash) if err != nil { return nil, err } - err = s.Del(nil, hash) + err = s.Del(hash) if err != nil { return nil, err } @@ -125,9 +128,9 @@ func (s *MemoryStore) GetAllForHashAndDelete(hash string) (map[string][]byte, er return hashMap, nil } -// CountElementsForHash returns the number of elements in a hash in memory. -func (s *MemoryStore) CountElementsForHash(hash string) (int64, error) { - hashMap, err := s.GetAllForHash(hash) +// MapLen returns the number of elements in a hash in memory. +func (s *MemoryStore) MapLen(hash string) (int64, error) { + hashMap, err := s.MapGetAll(hash) if err != nil { if errors.Is(err, ErrCacheNotFound) { return 0, nil diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 259345579..3bc851c88 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -22,16 +22,9 @@ func NewRedisStore(ctx context.Context, c redis.UniversalClient) *RedisStore { } } -// Get retrieves a value by key and hash (if given). -func (r *RedisStore) Get(hash *string, key string) ([]byte, error) { - var result string - var err error - - if hash == nil { - result, err = r.client.Get(r.ctx, key).Result() - } else { - result, err = r.client.HGet(r.ctx, *hash, key).Result() - } +// Get retrieves a value by key. +func (r *RedisStore) Get(key string) ([]byte, error) { + result, err := r.client.Get(r.ctx, key).Result() if errors.Is(err, redis.Nil) { return nil, ErrCacheNotFound @@ -42,15 +35,9 @@ func (r *RedisStore) Get(hash *string, key string) ([]byte, error) { return []byte(result), nil } -// Set stores a value with a TTL for a specific hash (if given). -func (r *RedisStore) Set(hash *string, key string, value []byte, ttl time.Duration) error { - var err error - - if hash == nil { - err = r.client.Set(r.ctx, key, value, ttl).Err() - } else { - err = r.client.HSet(r.ctx, *hash, key, value).Err() - } +// Set stores a value with a TTL for key. +func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error { + err := r.client.Set(r.ctx, key, value, ttl).Err() if err != nil { return errors.Join(ErrCacheFailedToSet, err) @@ -60,15 +47,45 @@ func (r *RedisStore) Set(hash *string, key string, value []byte, ttl time.Durati } // Del removes a value by key. -func (r *RedisStore) Del(hash *string, keys ...string) error { - var result int64 - var err error - - if hash == nil { - result, err = r.client.Del(r.ctx, keys...).Result() - } else { - result, err = r.client.HDel(r.ctx, *hash, keys...).Result() +func (r *RedisStore) Del(keys ...string) error { + result, err := r.client.Del(r.ctx, keys...).Result() + + if err != nil { + return errors.Join(ErrCacheFailedToDel, err) + } + if result == 0 { + return ErrCacheNotFound } + return nil +} + +// MapGet retrieves a value by key and hash (if given). +func (r *RedisStore) MapGet(hash string, key string) ([]byte, error) { + result, err := r.client.HGet(r.ctx, hash, key).Result() + + if errors.Is(err, redis.Nil) { + return nil, ErrCacheNotFound + } else if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + + return []byte(result), nil +} + +// MapSet stores a value for a specific hash. +func (r *RedisStore) MapSet(hash string, key string, value []byte) error { + err := r.client.HSet(r.ctx, hash, key, value).Err() + + if err != nil { + return errors.Join(ErrCacheFailedToSet, err) + } + + return nil +} + +// MapDel removes a value by key in specific hash. +func (r *RedisStore) MapDel(hash string, keys ...string) error { + result, err := r.client.HDel(r.ctx, hash, keys...).Result() if err != nil { return errors.Join(ErrCacheFailedToDel, err) @@ -76,11 +93,12 @@ func (r *RedisStore) Del(hash *string, keys ...string) error { if result == 0 { return ErrCacheNotFound } + return nil } -// GetAllForHash retrieves all key-value pairs for a specific hash. -func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { +// MapGetAll retrieves all key-value pairs for a specific hash. +func (r *RedisStore) MapGetAll(hash string) (map[string][]byte, error) { values, err := r.client.HGetAll(r.ctx, hash).Result() if err != nil { return nil, errors.Join(ErrCacheFailedToGet, err) @@ -93,8 +111,8 @@ func (r *RedisStore) GetAllForHash(hash string) (map[string][]byte, error) { return result, nil } -// GetAllForHashAndDelete retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction. -func (r *RedisStore) GetAllForHashAndDelete(hash string) (map[string][]byte, error) { +// MapExtractAll retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction. +func (r *RedisStore) MapExtractAll(hash string) (map[string][]byte, error) { tx := r.client.TxPipeline() getAllCmd := tx.HGetAll(r.ctx, hash) @@ -114,8 +132,8 @@ func (r *RedisStore) GetAllForHashAndDelete(hash string) (map[string][]byte, err return result, nil } -// CountElementsForHash returns the number of elements in a hash. -func (r *RedisStore) CountElementsForHash(hash string) (int64, error) { +// MapLen returns the number of elements in a hash. +func (r *RedisStore) MapLen(hash string) (int64, error) { count, err := r.client.HLen(r.ctx, hash).Result() if err != nil { return 0, errors.Join(ErrCacheFailedToGetCount, err) diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 717d912b9..72ce5a4bf 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -49,7 +49,7 @@ func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { return errors.Join(ErrFailedToSerialize, err) } - err = p.cacheStore.Set(&CacheStatusUpdateHash, status.Hash.String(), bytes, processStatusUpdatesIntervalDefault) + err = p.cacheStore.MapSet(CacheStatusUpdateHash, status.Hash.String(), bytes) if err != nil { return err } @@ -57,7 +57,7 @@ func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { } func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) { - bytes, err := p.cacheStore.Get(&CacheStatusUpdateHash, hash.String()) + bytes, err := p.cacheStore.MapGet(CacheStatusUpdateHash, hash.String()) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStat func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error) { statuses := make(StatusUpdateMap) - keys, err := p.cacheStore.GetAllForHashAndDelete(CacheStatusUpdateHash) + keys, err := p.cacheStore.MapExtractAll(CacheStatusUpdateHash) if err != nil { return nil, err } @@ -97,7 +97,7 @@ func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error } func (p *Processor) getStatusUpdateCount() (int, error) { - count, err := p.cacheStore.CountElementsForHash(CacheStatusUpdateHash) + count, err := p.cacheStore.MapLen(CacheStatusUpdateHash) if err != nil { return 0, err } From b94b62c1cb64ce2107e3175661972ed5b2066eb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Wed, 20 Nov 2024 10:18:19 +0100 Subject: [PATCH 19/26] refactor(ARCO-283): refactor in-memory cache method --- internal/cache/in_memory.go | 18 +++++++++--------- internal/cache/redis.go | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 71b15b6fb..527d81bee 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -43,7 +43,7 @@ func (s *MemoryStore) Del(keys ...string) error { return nil } -// MapGet retrieves a value by key and hash. +// MapGet retrieves a value by key and hash. Return err if hash or key not found. func (s *MemoryStore) MapGet(hash string, key string) ([]byte, error) { hashValue, found := s.data.Load(hash) if !found { @@ -98,7 +98,7 @@ func (s *MemoryStore) MapDel(hash string, keys ...string) error { return nil } -// MapGetAll retrieves all key-value pairs for a specific hash. +// MapGetAll retrieves all key-value pairs for a specific hash. Return err if hash not found. func (s *MemoryStore) MapGetAll(hash string) (map[string][]byte, error) { hashValue, found := s.data.Load(hash) if !found { @@ -113,16 +113,16 @@ func (s *MemoryStore) MapGetAll(hash string) (map[string][]byte, error) { return hashMap, nil } -// MapExtractAll retrieves all key-value pairs for a specific hash and deletes the hash. +// MapExtractAll retrieves all key-value pairs for a specific hash and deletes the hash. Return err if hash not found. func (s *MemoryStore) MapExtractAll(hash string) (map[string][]byte, error) { - hashMap, err := s.MapGetAll(hash) - if err != nil { - return nil, err + hashValue, found := s.data.LoadAndDelete(hash) + if !found { + return nil, ErrCacheNotFound } - err = s.Del(hash) - if err != nil { - return nil, err + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet } return hashMap, nil diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 3bc851c88..fb8865b8f 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -59,7 +59,7 @@ func (r *RedisStore) Del(keys ...string) error { return nil } -// MapGet retrieves a value by key and hash (if given). +// MapGet retrieves a value by key and hash (if given). Return err if hash or key not found. func (r *RedisStore) MapGet(hash string, key string) ([]byte, error) { result, err := r.client.HGet(r.ctx, hash, key).Result() @@ -97,7 +97,7 @@ func (r *RedisStore) MapDel(hash string, keys ...string) error { return nil } -// MapGetAll retrieves all key-value pairs for a specific hash. +// MapGetAll retrieves all key-value pairs for a specific hash. Return err if hash not found. func (r *RedisStore) MapGetAll(hash string) (map[string][]byte, error) { values, err := r.client.HGetAll(r.ctx, hash).Result() if err != nil { From 15b03a7df6ab5bea3a85095e3d4859e570e5c989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 21 Nov 2024 13:16:23 +0100 Subject: [PATCH 20/26] fix(ARCO-283): fix PR comments --- internal/cache/cache.go | 12 ++++---- internal/cache/in_memory.go | 40 ++++++++++++------------- internal/cache/redis.go | 38 +++++++++++------------ internal/metamorph/processor_helpers.go | 8 +++-- 4 files changed, 50 insertions(+), 48 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index f3b09ef3e..2198defe8 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -20,10 +20,10 @@ type Store interface { Set(key string, value []byte, ttl time.Duration) error Del(keys ...string) error - MapGet(hash string, key string) ([]byte, error) - MapGetAll(hash string) (map[string][]byte, error) - MapSet(hash string, key string, value []byte) error - MapDel(hash string, keys ...string) error - MapLen(hash string) (int64, error) - MapExtractAll(hash string) (map[string][]byte, error) + MapGet(hashsetKey string, field string) ([]byte, error) + MapGetAll(hashsetKey string) (map[string][]byte, error) + MapSet(hashsetKey string, field string, value []byte) error + MapDel(hashsetKey string, fields ...string) error + MapLen(hashsetKey string) (int64, error) + MapExtractAll(hashsetKey string) (map[string][]byte, error) } diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 527d81bee..3f5174282 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -43,9 +43,9 @@ func (s *MemoryStore) Del(keys ...string) error { return nil } -// MapGet retrieves a value by key and hash. Return err if hash or key not found. -func (s *MemoryStore) MapGet(hash string, key string) ([]byte, error) { - hashValue, found := s.data.Load(hash) +// MapGet retrieves a value by key and hashsetKey. Return err if hashsetKey or key not found. +func (s *MemoryStore) MapGet(hashsetKey string, key string) ([]byte, error) { + hashValue, found := s.data.Load(hashsetKey) if !found { return nil, ErrCacheNotFound } @@ -63,9 +63,9 @@ func (s *MemoryStore) MapGet(hash string, key string) ([]byte, error) { return fieldValue, nil } -// MapSet stores a key-value pair for specific hash. -func (s *MemoryStore) MapSet(hash string, key string, value []byte) error { - raw, _ := s.data.LoadOrStore(hash, make(map[string][]byte)) +// MapSet stores a key-value pair for specific hashsetKey. +func (s *MemoryStore) MapSet(hashsetKey string, key string, value []byte) error { + raw, _ := s.data.LoadOrStore(hashsetKey, make(map[string][]byte)) hashMap, ok := raw.(map[string][]byte) if !ok { @@ -74,13 +74,13 @@ func (s *MemoryStore) MapSet(hash string, key string, value []byte) error { hashMap[key] = value - s.data.Store(hash, hashMap) + s.data.Store(hashsetKey, hashMap) return nil } -// MapDel removes a value by key in specific hash. -func (s *MemoryStore) MapDel(hash string, keys ...string) error { - hashValue, found := s.data.Load(hash) +// MapDel removes a value by key in specific hashsetKey. +func (s *MemoryStore) MapDel(hashsetKey string, keys ...string) error { + hashValue, found := s.data.Load(hashsetKey) if !found { return ErrCacheNotFound } @@ -94,13 +94,13 @@ func (s *MemoryStore) MapDel(hash string, keys ...string) error { delete(hashMap, k) } - s.data.Store(hash, hashMap) + s.data.Store(hashsetKey, hashMap) return nil } -// MapGetAll retrieves all key-value pairs for a specific hash. Return err if hash not found. -func (s *MemoryStore) MapGetAll(hash string) (map[string][]byte, error) { - hashValue, found := s.data.Load(hash) +// MapGetAll retrieves all key-value pairs for a specific hashsetKey. Return err if hashsetKey not found. +func (s *MemoryStore) MapGetAll(hashsetKey string) (map[string][]byte, error) { + hashValue, found := s.data.Load(hashsetKey) if !found { return nil, ErrCacheNotFound } @@ -113,9 +113,9 @@ func (s *MemoryStore) MapGetAll(hash string) (map[string][]byte, error) { return hashMap, nil } -// MapExtractAll retrieves all key-value pairs for a specific hash and deletes the hash. Return err if hash not found. -func (s *MemoryStore) MapExtractAll(hash string) (map[string][]byte, error) { - hashValue, found := s.data.LoadAndDelete(hash) +// MapExtractAll retrieves all key-value pairs for a specific hashsetKey and deletes the hashsetKey. Return err if hashsetKey not found. +func (s *MemoryStore) MapExtractAll(hashsetKey string) (map[string][]byte, error) { + hashValue, found := s.data.LoadAndDelete(hashsetKey) if !found { return nil, ErrCacheNotFound } @@ -128,9 +128,9 @@ func (s *MemoryStore) MapExtractAll(hash string) (map[string][]byte, error) { return hashMap, nil } -// MapLen returns the number of elements in a hash in memory. -func (s *MemoryStore) MapLen(hash string) (int64, error) { - hashMap, err := s.MapGetAll(hash) +// MapLen returns the number of elements in a hashsetKey in memory. +func (s *MemoryStore) MapLen(hashsetKey string) (int64, error) { + hashMap, err := s.MapGetAll(hashsetKey) if err != nil { if errors.Is(err, ErrCacheNotFound) { return 0, nil diff --git a/internal/cache/redis.go b/internal/cache/redis.go index fb8865b8f..2e255843b 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -59,9 +59,9 @@ func (r *RedisStore) Del(keys ...string) error { return nil } -// MapGet retrieves a value by key and hash (if given). Return err if hash or key not found. -func (r *RedisStore) MapGet(hash string, key string) ([]byte, error) { - result, err := r.client.HGet(r.ctx, hash, key).Result() +// MapGet retrieves a value by key and hashsetKey (if given). Return err if hashsetKey or key not found. +func (r *RedisStore) MapGet(hashsetKey string, key string) ([]byte, error) { + result, err := r.client.HGet(r.ctx, hashsetKey, key).Result() if errors.Is(err, redis.Nil) { return nil, ErrCacheNotFound @@ -72,9 +72,9 @@ func (r *RedisStore) MapGet(hash string, key string) ([]byte, error) { return []byte(result), nil } -// MapSet stores a value for a specific hash. -func (r *RedisStore) MapSet(hash string, key string, value []byte) error { - err := r.client.HSet(r.ctx, hash, key, value).Err() +// MapSet stores a value for a specific hashsetKey. +func (r *RedisStore) MapSet(hashsetKey string, key string, value []byte) error { + err := r.client.HSet(r.ctx, hashsetKey, key, value).Err() if err != nil { return errors.Join(ErrCacheFailedToSet, err) @@ -83,9 +83,9 @@ func (r *RedisStore) MapSet(hash string, key string, value []byte) error { return nil } -// MapDel removes a value by key in specific hash. -func (r *RedisStore) MapDel(hash string, keys ...string) error { - result, err := r.client.HDel(r.ctx, hash, keys...).Result() +// MapDel removes a value by key in specific hashsetKey. +func (r *RedisStore) MapDel(hashsetKey string, keys ...string) error { + result, err := r.client.HDel(r.ctx, hashsetKey, keys...).Result() if err != nil { return errors.Join(ErrCacheFailedToDel, err) @@ -97,9 +97,9 @@ func (r *RedisStore) MapDel(hash string, keys ...string) error { return nil } -// MapGetAll retrieves all key-value pairs for a specific hash. Return err if hash not found. -func (r *RedisStore) MapGetAll(hash string) (map[string][]byte, error) { - values, err := r.client.HGetAll(r.ctx, hash).Result() +// MapGetAll retrieves all key-value pairs for a specific hashsetKey. Return err if hashsetKey not found. +func (r *RedisStore) MapGetAll(hashsetKey string) (map[string][]byte, error) { + values, err := r.client.HGetAll(r.ctx, hashsetKey).Result() if err != nil { return nil, errors.Join(ErrCacheFailedToGet, err) } @@ -111,12 +111,12 @@ func (r *RedisStore) MapGetAll(hash string) (map[string][]byte, error) { return result, nil } -// MapExtractAll retrieves all key-value pairs for a specific hash and remove them from cache, all in one transaction. -func (r *RedisStore) MapExtractAll(hash string) (map[string][]byte, error) { +// MapExtractAll retrieves all key-value pairs for a specific hashsetKey and remove them from cache, all in one transaction. +func (r *RedisStore) MapExtractAll(hashsetKey string) (map[string][]byte, error) { tx := r.client.TxPipeline() - getAllCmd := tx.HGetAll(r.ctx, hash) - tx.Del(r.ctx, hash) + getAllCmd := tx.HGetAll(r.ctx, hashsetKey) + tx.Del(r.ctx, hashsetKey) _, err := tx.Exec(r.ctx) if err != nil { @@ -132,9 +132,9 @@ func (r *RedisStore) MapExtractAll(hash string) (map[string][]byte, error) { return result, nil } -// MapLen returns the number of elements in a hash. -func (r *RedisStore) MapLen(hash string) (int64, error) { - count, err := r.client.HLen(r.ctx, hash).Result() +// MapLen returns the number of elements in a hashsetKey. +func (r *RedisStore) MapLen(hashsetKey string) (int64, error) { + count, err := r.client.HLen(r.ctx, hashsetKey).Result() if err != nil { return 0, errors.Join(ErrCacheFailedToGetCount, err) } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 72ce5a4bf..8d302009f 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -3,6 +3,7 @@ package metamorph import ( "encoding/json" "errors" + "log/slog" "github.com/libsv/go-p2p/chaincfg/chainhash" @@ -12,7 +13,7 @@ import ( type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus -var CacheStatusUpdateHash = "status-update" +var CacheStatusUpdateHash = "mtm-tx-status-update" var ( ErrFailedToSerialize = errors.New("failed to serialize value") @@ -81,13 +82,14 @@ func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error for key, value := range keys { hash, err := chainhash.NewHashFromStr(key) if err != nil { - return nil, err + p.logger.Error("failed to convert hash from key", slog.String("error", err.Error()), slog.String("key", key)) + continue } var status store.UpdateStatus err = json.Unmarshal(value, &status) if err != nil { - return nil, err + p.logger.Error("failed to unmarshal status", slog.String("error", err.Error()), slog.String("key", key)) } statuses[*hash] = status From 81a6db1f12803cd416f033e11b939ea23730bb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 21 Nov 2024 13:16:48 +0100 Subject: [PATCH 21/26] test(ARCO-283): add Redis integration tests --- internal/cache/redis_test.go | 206 ++++++++++++++++++++++++++++++++++ internal/test_utils/docker.go | 48 ++++++++ 2 files changed, 254 insertions(+) create mode 100644 internal/cache/redis_test.go diff --git a/internal/cache/redis_test.go b/internal/cache/redis_test.go new file mode 100644 index 000000000..cbc8f5d0b --- /dev/null +++ b/internal/cache/redis_test.go @@ -0,0 +1,206 @@ +package cache + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + + testutils "github.com/bitcoin-sv/arc/internal/test_utils" +) + +var ( + hostPort int + redisClient *redis.Client +) + +const ( + port = 6379 +) + +func TestMain(m *testing.M) { + os.Exit(testmain(m)) +} + +func testmain(m *testing.M) int { + pool, err := dockertest.NewPool("") + if err != nil { + log.Printf("failed to create pool: %v", err) + return 1 + } + + resource, resourcePort, err := testutils.RunRedis(pool, strconv.Itoa(port), "cache") + if err != nil { + log.Print(err) + return 1 + } + defer func() { + err = pool.Purge(resource) + if err != nil { + log.Fatalf("failed to purge pool: %v", err) + } + }() + + hostPort, err = strconv.Atoi(resourcePort) + if err != nil { + log.Fatalf("failed to convert port to int: %v", err) + } + + return m.Run() +} + +func setup() { + redisClient = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("localhost:%d", hostPort), + Password: "", + DB: 1, + }) + + _, err := redisClient.Ping(context.Background()).Result() + if err != nil { + log.Fatalln(err) + } +} + +func TestRedisClient(t *testing.T) { + t.Helper() + if testing.Short() { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + setup() + + redisStore := NewRedisStore(ctx, redisClient) + + t.Run("get/set", func(t *testing.T) { + // given + err := redisStore.Set("key", []byte("value"), 4*time.Second) + require.NoError(t, err) + + // when + res, err := redisStore.Get("key") + require.NoError(t, err) + + // then + require.Equal(t, "value", string(res)) + }) + + t.Run("del", func(t *testing.T) { + // given + err := redisStore.Set("key1", []byte("value1"), 4*time.Second) + require.NoError(t, err) + err = redisStore.Set("key2", []byte("value2"), 4*time.Second) + require.NoError(t, err) + err = redisStore.Set("key3", []byte("value3"), 4*time.Second) + require.NoError(t, err) + + // when + err = redisStore.Del([]string{"key1", "key2", "key3"}...) + + // then + require.NoError(t, err) + }) + + t.Run("map set/get", func(t *testing.T) { + // when + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + + res, err := redisStore.MapGet("hash", "key1") + require.NoError(t, err) + + // then + require.Equal(t, "value1", string(res)) + }) + + t.Run("map del", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when/then + err = redisStore.MapDel("hash", []string{"key1", "key2"}...) + require.NoError(t, err) + + err = redisStore.MapDel("hash", "key3") + require.NoError(t, err) + + err = redisStore.Del("hash") + }) + + t.Run("map get all", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapGetAll("hash") + require.NoError(t, err) + + // then + expectedResponse := map[string][]byte{"key1": []byte("value1"), "key2": []byte("value2"), "key3": []byte("value3")} + require.Equal(t, expectedResponse, res) + + err = redisStore.Del("hash") + require.NoError(t, err) + }) + + t.Run("map len", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapLen("hash") + require.NoError(t, err) + + // then + require.Equal(t, int64(3), res) + + err = redisStore.Del("hash") + require.NoError(t, err) + }) + + t.Run("map extract all", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapExtractAll("hash") + require.NoError(t, err) + + // then + expectedResponse := map[string][]byte{"key1": []byte("value1"), "key2": []byte("value2"), "key3": []byte("value3")} + require.Equal(t, expectedResponse, res) + + hLen, err := redisStore.MapLen("hash") + require.NoError(t, err) + require.Equal(t, int64(0), hLen) + }) +} diff --git a/internal/test_utils/docker.go b/internal/test_utils/docker.go index 212aa9615..32e5b4445 100644 --- a/internal/test_utils/docker.go +++ b/internal/test_utils/docker.go @@ -1,11 +1,13 @@ package testutils import ( + "context" "errors" "fmt" "net/url" "os" + "github.com/go-redis/redis/v8" "github.com/ordishs/go-bitcoin" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -168,3 +170,49 @@ func RunNode(pool *dockertest.Pool, port, name string, cmds ...string) (*dockert return resource, hostPort, nil } + +func RunRedis(pool *dockertest.Pool, port, name string, cmds ...string) (*dockertest.Resource, string, error) { + opts := dockertest.RunOptions{ + Repository: "redis", + Tag: "7.4.1", + ExposedPorts: []string{"6379"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "6379": { + {HostIP: "0.0.0.0", HostPort: port}, + }, + }, + Name: name, + Cmd: cmds, + } + + resource, err := pool.RunWithOptions(&opts, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + if err != nil { + return nil, "", fmt.Errorf("failed to create resource: %v", err) + } + + hostPort := resource.GetPort("6379/tcp") + + err = pool.Retry(func() error { + c := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 1, + }) + + ctx := context.Background() + status := c.Ping(ctx) + _, err := status.Result() + return err + }) + if err != nil { + return nil, "", fmt.Errorf("failed to create resource: %v", err) + } + + return resource, hostPort, nil +} From 3f95b2b0ca390f9007d5678152d81edd1866f930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 21 Nov 2024 13:56:16 +0100 Subject: [PATCH 22/26] fix(ARCO-283): fix linter error --- internal/cache/redis_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cache/redis_test.go b/internal/cache/redis_test.go index cbc8f5d0b..2a96e7f4d 100644 --- a/internal/cache/redis_test.go +++ b/internal/cache/redis_test.go @@ -139,6 +139,7 @@ func TestRedisClient(t *testing.T) { require.NoError(t, err) err = redisStore.Del("hash") + require.NoError(t, err) }) t.Run("map get all", func(t *testing.T) { From f633e424cc904fa1e8096197b7a289aade1df9b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Thu, 21 Nov 2024 14:50:50 +0100 Subject: [PATCH 23/26] test(ARCO-283): fix Redis test --- internal/cache/redis_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/cache/redis_test.go b/internal/cache/redis_test.go index 2a96e7f4d..ed83a631e 100644 --- a/internal/cache/redis_test.go +++ b/internal/cache/redis_test.go @@ -137,9 +137,6 @@ func TestRedisClient(t *testing.T) { err = redisStore.MapDel("hash", "key3") require.NoError(t, err) - - err = redisStore.Del("hash") - require.NoError(t, err) }) t.Run("map get all", func(t *testing.T) { From 7158db085824336a996ec074556e07d1ad33c0c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 22 Nov 2024 15:36:09 +0100 Subject: [PATCH 24/26] fix(ARCO-283): update status in cache only when bigger --- internal/metamorph/processor_helpers.go | 20 ++++++++----- internal/metamorph/processor_helpers_test.go | 30 +++++++++++++------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 8d302009f..384a58f03 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -34,14 +34,16 @@ func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) error { return err } + if shouldUpdateCompetingTxs(statusUpdate, *currentStatusUpdate) { + currentStatusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs) + } + if shouldUpdateStatus(statusUpdate, *currentStatusUpdate) { - if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs) - } + currentStatusUpdate.Status = statusUpdate.Status // TODO: combine status history } - return p.setTransactionStatus(statusUpdate) + return p.setTransactionStatus(*currentStatusUpdate) } func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { @@ -107,12 +109,16 @@ func (p *Processor) getStatusUpdateCount() (int, error) { return int(count), nil } -func shouldUpdateStatus(new, found store.UpdateStatus) bool { - if new.Status > found.Status { +func shouldUpdateCompetingTxs(new, found store.UpdateStatus) bool { + if new.Status >= found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs) { return true } - if new.Status == found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs) { + return false +} + +func shouldUpdateStatus(new, found store.UpdateStatus) bool { + if new.Status > found.Status { return true } diff --git a/internal/metamorph/processor_helpers_test.go b/internal/metamorph/processor_helpers_test.go index 71dd03928..3c02087f6 100644 --- a/internal/metamorph/processor_helpers_test.go +++ b/internal/metamorph/processor_helpers_test.go @@ -32,10 +32,11 @@ func BenchmarkUnorderedEqual(b *testing.B) { func TestShouldUpdateStatus(t *testing.T) { testCases := []struct { - name string - existingStatus store.UpdateStatus - newStatus store.UpdateStatus - expectedResult bool + name string + existingStatus store.UpdateStatus + newStatus store.UpdateStatus + expectedResultStatus bool + expectedResultCompetingTx bool }{ { name: "new status lower than existing", @@ -45,7 +46,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_ACCEPTED_BY_NETWORK, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "new status higher than existing", @@ -55,7 +57,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_SEEN_ON_NETWORK, }, - expectedResult: true, + expectedResultStatus: true, + expectedResultCompetingTx: false, }, { name: "new status lower than existing, unequal competing txs", @@ -66,7 +69,8 @@ func TestShouldUpdateStatus(t *testing.T) { Status: metamorph_api.Status_ACCEPTED_BY_NETWORK, CompetingTxs: []string{"1234"}, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "statuses equal", @@ -76,7 +80,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_SEEN_ON_NETWORK, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "statuses equal, but unequal competing txs", @@ -88,17 +93,20 @@ func TestShouldUpdateStatus(t *testing.T) { Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED, CompetingTxs: []string{"1234"}, }, - expectedResult: true, + expectedResultStatus: false, + expectedResultCompetingTx: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // when - actualResult := shouldUpdateStatus(tc.newStatus, tc.existingStatus) + actualResultCompetingTx := shouldUpdateCompetingTxs(tc.newStatus, tc.existingStatus) + actualResultStatus := shouldUpdateStatus(tc.newStatus, tc.existingStatus) // then - assert.Equal(t, tc.expectedResult, actualResult) + assert.Equal(t, tc.expectedResultStatus, actualResultStatus) + assert.Equal(t, tc.expectedResultCompetingTx, actualResultCompetingTx) }) } } From fc19ee8ab6ce640b3fbcd374d2beb64cb37f4025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 22 Nov 2024 15:39:26 +0100 Subject: [PATCH 25/26] fix(ARCO-283): fix PR comments --- internal/cache/redis.go | 5 +---- internal/metamorph/processor_helpers.go | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 2e255843b..43eaa2046 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -85,14 +85,11 @@ func (r *RedisStore) MapSet(hashsetKey string, key string, value []byte) error { // MapDel removes a value by key in specific hashsetKey. func (r *RedisStore) MapDel(hashsetKey string, keys ...string) error { - result, err := r.client.HDel(r.ctx, hashsetKey, keys...).Result() + err := r.client.HDel(r.ctx, hashsetKey, keys...).Err() if err != nil { return errors.Join(ErrCacheFailedToDel, err) } - if result == 0 { - return ErrCacheNotFound - } return nil } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 384a58f03..9d8bc9dc7 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -92,6 +92,7 @@ func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error err = json.Unmarshal(value, &status) if err != nil { p.logger.Error("failed to unmarshal status", slog.String("error", err.Error()), slog.String("key", key)) + continue } statuses[*hash] = status From 4fcc557676bc8268281dcf4c7fe36b0951eb5648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= Date: Fri, 22 Nov 2024 15:40:42 +0100 Subject: [PATCH 26/26] fix(ARCO-283): fix linter error --- internal/metamorph/processor_helpers.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 9d8bc9dc7..4c36bd15a 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -119,11 +119,7 @@ func shouldUpdateCompetingTxs(new, found store.UpdateStatus) bool { } func shouldUpdateStatus(new, found store.UpdateStatus) bool { - if new.Status > found.Status { - return true - } - - return false + return new.Status > found.Status } // unorderedEqual checks if two string slices contain