diff --git a/cmd/arc/services/cache.go b/cmd/arc/services/cache.go index 96440184f..9bcb09843 100644 --- a/cmd/arc/services/cache.go +++ b/cmd/arc/services/cache.go @@ -6,7 +6,6 @@ import ( "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/config" - "github.com/coocood/freecache" "github.com/go-redis/redis/v8" ) @@ -15,9 +14,8 @@ var ErrCacheUnknownType = errors.New("unknown cache type") // NewCacheStore creates a new CacheStore based on the provided configuration. func NewCacheStore(cacheConfig *config.CacheConfig) (cache.Store, error) { switch cacheConfig.Engine { - case config.FreeCache: - cacheSize := freecache.NewCache(cacheConfig.Freecache.Size) - return cache.NewFreecacheStore(cacheSize), nil + case config.InMemory: + return cache.NewMemoryStore(), nil case config.Redis: c := redis.NewClient(&redis.Options{ Addr: cacheConfig.Redis.Addr, diff --git a/config/config.go b/config/config.go index 171ddb8e5..8342ba8c7 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,8 @@ import ( ) const ( - FreeCache = "freecache" - Redis = "redis" + InMemory = "in-memory" + Redis = "redis" ) type ArcConfig struct { @@ -137,9 +137,8 @@ type PostgresConfig struct { } type CacheConfig struct { - Engine string `mapstructure:"engine"` - Freecache *FreeCacheConfig `mapstructure:"freecache"` - Redis *RedisConfig `mapstructure:"redis"` + Engine string `mapstructure:"engine"` + Redis *RedisConfig `mapstructure:"redis"` } type FreeCacheConfig struct { diff --git a/config/defaults.go b/config/defaults.go index ca902819d..072942904 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -198,11 +198,8 @@ func getCallbackerConfig() *CallbackerConfig { func getCacheConfig() *CacheConfig { return &CacheConfig{ - Engine: FreeCache, - Freecache: &FreeCacheConfig{ - Size: 10 * 1024 * 1024, // Default size 10MB. - }, - Redis: &RedisConfig{ + Engine: InMemory, // use in memory cache + Redis: &RedisConfig{ // example of Redis config Addr: "localhost:6379", Password: "", DB: 0, diff --git a/config/example_config.yaml b/config/example_config.yaml index 281a953f9..10b9b979c 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -48,10 +48,8 @@ broadcasting: # settings for connection to nodes p2p: 18335 cache: - engine: freecache # cache engine - one of freecache | redis - freecache: # freecache configuration - size: 10000000 # size of cache - redis: + engine: redis # cache engine - in-memory/redis + redis: # redis cache configuration in case that engine: redis addr: "localhost:6379" password: "" db: 1 diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 23e71ca7a..2198defe8 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,14 +6,24 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") + ErrCacheFailedToGetCount = errors.New("failed to get count from cache") + ErrCacheFailedToExecuteTx = errors.New("failed to execute transaction") ) type Store interface { Get(key string) ([]byte, error) Set(key string, value []byte, ttl time.Duration) error - Del(key string) error + Del(keys ...string) error + + MapGet(hashsetKey string, field string) ([]byte, error) + MapGetAll(hashsetKey string) (map[string][]byte, error) + MapSet(hashsetKey string, field string, value []byte) error + MapDel(hashsetKey string, fields ...string) error + MapLen(hashsetKey string) (int64, error) + MapExtractAll(hashsetKey string) (map[string][]byte, error) } diff --git a/internal/cache/freecache.go b/internal/cache/freecache.go deleted file mode 100644 index a84460a54..000000000 --- a/internal/cache/freecache.go +++ /dev/null @@ -1,50 +0,0 @@ -package cache - -import ( - "errors" - "time" - - "github.com/coocood/freecache" -) - -// FreecacheStore is an implementation of CacheStore using freecache. -type FreecacheStore struct { - cache *freecache.Cache -} - -// NewFreecacheStore initializes a FreecacheStore. -func NewFreecacheStore(c *freecache.Cache) *FreecacheStore { - return &FreecacheStore{ - cache: c, - } -} - -// Get retrieves a value by key. -func (f *FreecacheStore) Get(key string) ([]byte, error) { - value, err := f.cache.Get([]byte(key)) - if err != nil { - if errors.Is(err, freecache.ErrNotFound) { - return nil, ErrCacheNotFound - } - return nil, errors.Join(ErrCacheFailedToGet, err) - } - return value, nil -} - -// Set stores a value with a TTL. -func (f *FreecacheStore) Set(key string, value []byte, ttl time.Duration) error { - err := f.cache.Set([]byte(key), value, int(ttl.Seconds())) - if err != nil { - return errors.Join(ErrCacheFailedToSet, err) - } - return nil -} - -// Del removes a value by key. -func (f *FreecacheStore) Del(key string) error { - success := f.cache.Del([]byte(key)) - if !success { - return ErrCacheNotFound - } - return nil -} diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go new file mode 100644 index 000000000..3f5174282 --- /dev/null +++ b/internal/cache/in_memory.go @@ -0,0 +1,142 @@ +package cache + +import ( + "errors" + "sync" + "time" +) + +type MemoryStore struct { + data sync.Map +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{} +} + +// Get retrieves a value by key. +func (s *MemoryStore) Get(key string) ([]byte, error) { + value, found := s.data.Load(key) + if !found { + return nil, ErrCacheNotFound + } + + bytes, ok := value.([]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + return bytes, nil +} + +// Set stores a key-value pair, ignoring the ttl parameter. +func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { + s.data.Store(key, value) + return nil +} + +// Del removes a key from the store. +func (s *MemoryStore) Del(keys ...string) error { + for _, k := range keys { + s.data.Delete(k) + } + return nil +} + +// MapGet retrieves a value by key and hashsetKey. Return err if hashsetKey or key not found. +func (s *MemoryStore) MapGet(hashsetKey string, key string) ([]byte, error) { + hashValue, found := s.data.Load(hashsetKey) + if !found { + return nil, ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + fieldValue, exists := hashMap[key] + if !exists { + return nil, ErrCacheNotFound + } + + return fieldValue, nil +} + +// MapSet stores a key-value pair for specific hashsetKey. +func (s *MemoryStore) MapSet(hashsetKey string, key string, value []byte) error { + raw, _ := s.data.LoadOrStore(hashsetKey, make(map[string][]byte)) + + hashMap, ok := raw.(map[string][]byte) + if !ok { + return ErrCacheFailedToSet + } + + hashMap[key] = value + + s.data.Store(hashsetKey, hashMap) + return nil +} + +// MapDel removes a value by key in specific hashsetKey. +func (s *MemoryStore) MapDel(hashsetKey string, keys ...string) error { + hashValue, found := s.data.Load(hashsetKey) + if !found { + return ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return errors.Join(ErrCacheFailedToDel, ErrCacheFailedToGet) + } + + for _, k := range keys { + delete(hashMap, k) + } + + s.data.Store(hashsetKey, hashMap) + return nil +} + +// MapGetAll retrieves all key-value pairs for a specific hashsetKey. Return err if hashsetKey not found. +func (s *MemoryStore) MapGetAll(hashsetKey string) (map[string][]byte, error) { + hashValue, found := s.data.Load(hashsetKey) + if !found { + return nil, ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + return hashMap, nil +} + +// MapExtractAll retrieves all key-value pairs for a specific hashsetKey and deletes the hashsetKey. Return err if hashsetKey not found. +func (s *MemoryStore) MapExtractAll(hashsetKey string) (map[string][]byte, error) { + hashValue, found := s.data.LoadAndDelete(hashsetKey) + if !found { + return nil, ErrCacheNotFound + } + + hashMap, ok := hashValue.(map[string][]byte) + if !ok { + return nil, ErrCacheFailedToGet + } + + return hashMap, nil +} + +// MapLen returns the number of elements in a hashsetKey in memory. +func (s *MemoryStore) MapLen(hashsetKey string) (int64, error) { + hashMap, err := s.MapGetAll(hashsetKey) + if err != nil { + if errors.Is(err, ErrCacheNotFound) { + return 0, nil + } + return 0, err + } + + return int64(len(hashMap)), nil +} diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 932f4ace0..43eaa2046 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -25,26 +25,31 @@ func NewRedisStore(ctx context.Context, c redis.UniversalClient) *RedisStore { // Get retrieves a value by key. func (r *RedisStore) Get(key string) ([]byte, error) { result, err := r.client.Get(r.ctx, key).Result() + if errors.Is(err, redis.Nil) { return nil, ErrCacheNotFound } else if err != nil { return nil, errors.Join(ErrCacheFailedToGet, err) } + return []byte(result), nil } -// Set stores a value with a TTL. +// Set stores a value with a TTL for key. func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error { err := r.client.Set(r.ctx, key, value, ttl).Err() + if err != nil { return errors.Join(ErrCacheFailedToSet, err) } + return nil } // Del removes a value by key. -func (r *RedisStore) Del(key string) error { - result, err := r.client.Del(r.ctx, key).Result() +func (r *RedisStore) Del(keys ...string) error { + result, err := r.client.Del(r.ctx, keys...).Result() + if err != nil { return errors.Join(ErrCacheFailedToDel, err) } @@ -53,3 +58,82 @@ func (r *RedisStore) Del(key string) error { } return nil } + +// MapGet retrieves a value by key and hashsetKey (if given). Return err if hashsetKey or key not found. +func (r *RedisStore) MapGet(hashsetKey string, key string) ([]byte, error) { + result, err := r.client.HGet(r.ctx, hashsetKey, key).Result() + + if errors.Is(err, redis.Nil) { + return nil, ErrCacheNotFound + } else if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + + return []byte(result), nil +} + +// MapSet stores a value for a specific hashsetKey. +func (r *RedisStore) MapSet(hashsetKey string, key string, value []byte) error { + err := r.client.HSet(r.ctx, hashsetKey, key, value).Err() + + if err != nil { + return errors.Join(ErrCacheFailedToSet, err) + } + + return nil +} + +// MapDel removes a value by key in specific hashsetKey. +func (r *RedisStore) MapDel(hashsetKey string, keys ...string) error { + err := r.client.HDel(r.ctx, hashsetKey, keys...).Err() + + if err != nil { + return errors.Join(ErrCacheFailedToDel, err) + } + + return nil +} + +// MapGetAll retrieves all key-value pairs for a specific hashsetKey. Return err if hashsetKey not found. +func (r *RedisStore) MapGetAll(hashsetKey string) (map[string][]byte, error) { + values, err := r.client.HGetAll(r.ctx, hashsetKey).Result() + if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + + result := make(map[string][]byte) + for field, value := range values { + result[field] = []byte(value) + } + return result, nil +} + +// MapExtractAll retrieves all key-value pairs for a specific hashsetKey and remove them from cache, all in one transaction. +func (r *RedisStore) MapExtractAll(hashsetKey string) (map[string][]byte, error) { + tx := r.client.TxPipeline() + + getAllCmd := tx.HGetAll(r.ctx, hashsetKey) + tx.Del(r.ctx, hashsetKey) + + _, err := tx.Exec(r.ctx) + if err != nil { + return nil, errors.Join(ErrCacheFailedToExecuteTx, err) + } + + getAllCmdResult := getAllCmd.Val() + + result := make(map[string][]byte) + for field, value := range getAllCmdResult { + result[field] = []byte(value) + } + return result, nil +} + +// MapLen returns the number of elements in a hashsetKey. +func (r *RedisStore) MapLen(hashsetKey string) (int64, error) { + count, err := r.client.HLen(r.ctx, hashsetKey).Result() + if err != nil { + return 0, errors.Join(ErrCacheFailedToGetCount, err) + } + return count, nil +} diff --git a/internal/cache/redis_test.go b/internal/cache/redis_test.go new file mode 100644 index 000000000..ed83a631e --- /dev/null +++ b/internal/cache/redis_test.go @@ -0,0 +1,204 @@ +package cache + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + + testutils "github.com/bitcoin-sv/arc/internal/test_utils" +) + +var ( + hostPort int + redisClient *redis.Client +) + +const ( + port = 6379 +) + +func TestMain(m *testing.M) { + os.Exit(testmain(m)) +} + +func testmain(m *testing.M) int { + pool, err := dockertest.NewPool("") + if err != nil { + log.Printf("failed to create pool: %v", err) + return 1 + } + + resource, resourcePort, err := testutils.RunRedis(pool, strconv.Itoa(port), "cache") + if err != nil { + log.Print(err) + return 1 + } + defer func() { + err = pool.Purge(resource) + if err != nil { + log.Fatalf("failed to purge pool: %v", err) + } + }() + + hostPort, err = strconv.Atoi(resourcePort) + if err != nil { + log.Fatalf("failed to convert port to int: %v", err) + } + + return m.Run() +} + +func setup() { + redisClient = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("localhost:%d", hostPort), + Password: "", + DB: 1, + }) + + _, err := redisClient.Ping(context.Background()).Result() + if err != nil { + log.Fatalln(err) + } +} + +func TestRedisClient(t *testing.T) { + t.Helper() + if testing.Short() { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + setup() + + redisStore := NewRedisStore(ctx, redisClient) + + t.Run("get/set", func(t *testing.T) { + // given + err := redisStore.Set("key", []byte("value"), 4*time.Second) + require.NoError(t, err) + + // when + res, err := redisStore.Get("key") + require.NoError(t, err) + + // then + require.Equal(t, "value", string(res)) + }) + + t.Run("del", func(t *testing.T) { + // given + err := redisStore.Set("key1", []byte("value1"), 4*time.Second) + require.NoError(t, err) + err = redisStore.Set("key2", []byte("value2"), 4*time.Second) + require.NoError(t, err) + err = redisStore.Set("key3", []byte("value3"), 4*time.Second) + require.NoError(t, err) + + // when + err = redisStore.Del([]string{"key1", "key2", "key3"}...) + + // then + require.NoError(t, err) + }) + + t.Run("map set/get", func(t *testing.T) { + // when + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + + res, err := redisStore.MapGet("hash", "key1") + require.NoError(t, err) + + // then + require.Equal(t, "value1", string(res)) + }) + + t.Run("map del", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when/then + err = redisStore.MapDel("hash", []string{"key1", "key2"}...) + require.NoError(t, err) + + err = redisStore.MapDel("hash", "key3") + require.NoError(t, err) + }) + + t.Run("map get all", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapGetAll("hash") + require.NoError(t, err) + + // then + expectedResponse := map[string][]byte{"key1": []byte("value1"), "key2": []byte("value2"), "key3": []byte("value3")} + require.Equal(t, expectedResponse, res) + + err = redisStore.Del("hash") + require.NoError(t, err) + }) + + t.Run("map len", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapLen("hash") + require.NoError(t, err) + + // then + require.Equal(t, int64(3), res) + + err = redisStore.Del("hash") + require.NoError(t, err) + }) + + t.Run("map extract all", func(t *testing.T) { + // given + err := redisStore.MapSet("hash", "key1", []byte("value1")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key2", []byte("value2")) + require.NoError(t, err) + err = redisStore.MapSet("hash", "key3", []byte("value3")) + require.NoError(t, err) + + // when + res, err := redisStore.MapExtractAll("hash") + require.NoError(t, err) + + // then + expectedResponse := map[string][]byte{"key1": []byte("value1"), "key2": []byte("value2"), "key3": []byte("value3")} + require.Equal(t, expectedResponse, res) + + hLen, err := redisStore.MapLen("hash") + require.NoError(t, err) + require.Equal(t, int64(0), hLen) + }) +} diff --git a/internal/metamorph/health_check_test.go b/internal/metamorph/health_check_test.go index 4f6b6b84b..2dca75711 100644 --- a/internal/metamorph/health_check_test.go +++ b/internal/metamorph/health_check_test.go @@ -14,8 +14,6 @@ import ( storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks" ) -const baseCacheSize = 100 * 1024 * 1024 - func TestCheck(t *testing.T) { tt := []struct { name string diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 7a8e04f13..dd6ce5ef6 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/coocood/freecache" _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" "github.com/libsv/go-p2p/chaincfg/chainhash" @@ -124,9 +123,8 @@ func TestDoubleSpendDetection(t *testing.T) { require.NoError(t, err) defer metamorphStore.Close(context.Background()) - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index c8dfb9e17..5c2642b76 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -409,28 +409,46 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { go func() { defer p.waitGroup.Done() - statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{} - for { select { case <-p.ctx.Done(): return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - updateStatusMap(statusUpdatesMap, statusUpdate) + err := p.updateStatusMap(statusUpdate) + if err != nil { + p.logger.Error("failed to update status", slog.String("err", err.Error())) + return + } - if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} + statusUpdateCount, err := p.getStatusUpdateCount() + if err != nil { + p.logger.Error("failed to get status update count", slog.String("err", err.Error())) + return + } + + if statusUpdateCount >= p.processStatusUpdatesBatchSize { + err := p.checkAndUpdate(ctx) + if err != nil { + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) + } // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: - if len(statusUpdatesMap) > 0 { - p.checkAndUpdate(ctx, statusUpdatesMap) - statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} + statusUpdateCount, err := p.getStatusUpdateCount() + if err != nil { + p.logger.Error("failed to get status update count", slog.String("err", err.Error())) + return + } + + if statusUpdateCount > 0 { + err := p.checkAndUpdate(ctx) + if err != nil { + p.logger.Error("failed to check and update statuses", slog.String("err", err.Error())) + } // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -441,15 +459,20 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { }() } -func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) { +func (p *Processor) checkAndUpdate(ctx context.Context) error { var err error ctx, span := tracing.StartTracing(ctx, "checkAndUpdate", p.tracingEnabled, p.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() + statusUpdatesMap, err := p.getAndDeleteAllTransactionStatuses() + if err != nil { + return err + } + if len(statusUpdatesMap) == 0 { - return + return nil } statusUpdates := make([]store.UpdateStatus, 0, len(statusUpdatesMap)) @@ -467,6 +490,8 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha if err != nil { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } + + return nil } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) (err error) { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index d50f1ba80..4c36bd15a 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -3,15 +3,17 @@ package metamorph import ( "encoding/json" "errors" + "log/slog" "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph/store" ) -//lint:file-ignore U1000 Ignore all unused code, functions are temporarily not used +type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus -const CacheStatusUpdateKey = "status-updates" +var CacheStatusUpdateHash = "mtm-tx-status-update" var ( ErrFailedToSerialize = errors.New("failed to serialize value") @@ -22,78 +24,104 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) { - foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] - - if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { - if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) +func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) error { + currentStatusUpdate, err := p.getTransactionStatus(statusUpdate.Hash) + if err != nil { + if errors.Is(err, cache.ErrCacheNotFound) { + // if record doesn't exist, save new one + return p.setTransactionStatus(statusUpdate) } - - statusUpdatesMap[statusUpdate.Hash] = statusUpdate + return err } -} -func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused - statusUpdatesMap := p.getStatusUpdateMap() + if shouldUpdateCompetingTxs(statusUpdate, *currentStatusUpdate) { + currentStatusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs) + } - foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] + if shouldUpdateStatus(statusUpdate, *currentStatusUpdate) { + currentStatusUpdate.Status = statusUpdate.Status + // TODO: combine status history + } - if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { - if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) - } + return p.setTransactionStatus(*currentStatusUpdate) +} - statusUpdatesMap[statusUpdate.Hash] = statusUpdate +func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { + bytes, err := json.Marshal(status) + if err != nil { + return errors.Join(ErrFailedToSerialize, err) } - err := p.setStatusUpdateMap(statusUpdatesMap) + err = p.cacheStore.MapSet(CacheStatusUpdateHash, status.Hash.String(), bytes) if err != nil { - return nil, err + return err } - - return statusUpdatesMap, nil + return nil } -func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { //nolint:unused - bytes, err := serializeStatusMap(statusUpdatesMap) +func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) { + bytes, err := p.cacheStore.MapGet(CacheStatusUpdateHash, hash.String()) if err != nil { - return err + return nil, err } - err = p.cacheStore.Set(CacheStatusUpdateKey, bytes, processStatusUpdatesIntervalDefault) + var status store.UpdateStatus + err = json.Unmarshal(bytes, &status) if err != nil { - return err + return nil, err } - return nil + + return &status, nil } -func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { //nolint:unused - existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) +func (p *Processor) getAndDeleteAllTransactionStatuses() (StatusUpdateMap, error) { + statuses := make(StatusUpdateMap) + keys, err := p.cacheStore.MapExtractAll(CacheStatusUpdateHash) + if err != nil { + return nil, err + } + + for key, value := range keys { + hash, err := chainhash.NewHashFromStr(key) + if err != nil { + p.logger.Error("failed to convert hash from key", slog.String("error", err.Error()), slog.String("key", key)) + continue + } - if err == nil { - statusUpdatesMap, err := deserializeStatusMap(existingMap) - if err == nil { - return statusUpdatesMap + var status store.UpdateStatus + err = json.Unmarshal(value, &status) + if err != nil { + p.logger.Error("failed to unmarshal status", slog.String("error", err.Error()), slog.String("key", key)) + continue } + + statuses[*hash] = status } - // If the key doesn't exist or there was an error unmarshalling the value return new map - return make(map[chainhash.Hash]store.UpdateStatus) + return statuses, nil } -func shouldUpdateStatus(new, found store.UpdateStatus) bool { - if new.Status > found.Status { - return true +func (p *Processor) getStatusUpdateCount() (int, error) { + count, err := p.cacheStore.MapLen(CacheStatusUpdateHash) + if err != nil { + return 0, err } - if new.Status == found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs) { + return int(count), nil +} + +func shouldUpdateCompetingTxs(new, found store.UpdateStatus) bool { + if new.Status >= found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs) { return true } return false } +func shouldUpdateStatus(new, found store.UpdateStatus) bool { + return new.Status > found.Status +} + // unorderedEqual checks if two string slices contain // the same elements, regardless of order func unorderedEqual(sliceOne, sliceTwo []string) bool { @@ -135,36 +163,3 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } - -func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { //nolint:unused - serializeMap := make(map[string]store.UpdateStatus) - for k, v := range updateStatusMap { - serializeMap[k.String()] = v - } - - bytes, err := json.Marshal(serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToSerialize, err) - } - return bytes, nil -} - -func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused - serializeMap := make(map[string]store.UpdateStatus) - updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus) - - err := json.Unmarshal(data, &serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - - for k, v := range serializeMap { - hash, err := chainhash.NewHashFromStr(k) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - updateStatusMap[*hash] = v - } - - return updateStatusMap, nil -} diff --git a/internal/metamorph/processor_helpers_test.go b/internal/metamorph/processor_helpers_test.go index 71dd03928..3c02087f6 100644 --- a/internal/metamorph/processor_helpers_test.go +++ b/internal/metamorph/processor_helpers_test.go @@ -32,10 +32,11 @@ func BenchmarkUnorderedEqual(b *testing.B) { func TestShouldUpdateStatus(t *testing.T) { testCases := []struct { - name string - existingStatus store.UpdateStatus - newStatus store.UpdateStatus - expectedResult bool + name string + existingStatus store.UpdateStatus + newStatus store.UpdateStatus + expectedResultStatus bool + expectedResultCompetingTx bool }{ { name: "new status lower than existing", @@ -45,7 +46,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_ACCEPTED_BY_NETWORK, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "new status higher than existing", @@ -55,7 +57,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_SEEN_ON_NETWORK, }, - expectedResult: true, + expectedResultStatus: true, + expectedResultCompetingTx: false, }, { name: "new status lower than existing, unequal competing txs", @@ -66,7 +69,8 @@ func TestShouldUpdateStatus(t *testing.T) { Status: metamorph_api.Status_ACCEPTED_BY_NETWORK, CompetingTxs: []string{"1234"}, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "statuses equal", @@ -76,7 +80,8 @@ func TestShouldUpdateStatus(t *testing.T) { newStatus: store.UpdateStatus{ Status: metamorph_api.Status_SEEN_ON_NETWORK, }, - expectedResult: false, + expectedResultStatus: false, + expectedResultCompetingTx: false, }, { name: "statuses equal, but unequal competing txs", @@ -88,17 +93,20 @@ func TestShouldUpdateStatus(t *testing.T) { Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED, CompetingTxs: []string{"1234"}, }, - expectedResult: true, + expectedResultStatus: false, + expectedResultCompetingTx: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // when - actualResult := shouldUpdateStatus(tc.newStatus, tc.existingStatus) + actualResultCompetingTx := shouldUpdateCompetingTxs(tc.newStatus, tc.existingStatus) + actualResultStatus := shouldUpdateStatus(tc.newStatus, tc.existingStatus) // then - assert.Equal(t, tc.expectedResult, actualResult) + assert.Equal(t, tc.expectedResultStatus, actualResultStatus) + assert.Equal(t, tc.expectedResultCompetingTx, actualResultCompetingTx) }) } } diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index c37477281..90bf3af83 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/coocood/freecache" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/stretchr/testify/assert" @@ -37,8 +36,7 @@ func TestNewProcessor(t *testing.T) { } pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) + cStore := cache.NewMemoryStore() tt := []struct { name string @@ -125,10 +123,10 @@ func TestStartLockTransactions(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + // when sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) require.NoError(t, err) @@ -228,8 +226,6 @@ func TestProcessTransaction(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ AnnounceTransactionFunc: func(txHash *chainhash.Hash, _ []p2p.PeerI) []p2p.PeerI { require.True(t, testdata.TX1Hash.IsEqual(txHash)) @@ -238,6 +234,8 @@ func TestProcessTransaction(t *testing.T) { RequestTransactionFunc: func(_ *chainhash.Hash) p2p.PeerI { return nil }, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil @@ -490,10 +488,10 @@ func TestStartSendStatusForTransaction(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + callbackSender := &mocks.CallbackSenderMock{ SendCallbackFunc: func(_ context.Context, _ *store.Data) { callbackSent <- struct{}{} @@ -641,7 +639,6 @@ func TestStartProcessSubmittedTxs(t *testing.T) { }, SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) counter := 0 pm := &mocks.PeerManagerMock{ AnnounceTransactionFunc: func(txHash *chainhash.Hash, _ []p2p.PeerI) []p2p.PeerI { @@ -657,6 +654,8 @@ func TestStartProcessSubmittedTxs(t *testing.T) { ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil @@ -774,8 +773,6 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ RequestTransactionFunc: func(_ *chainhash.Hash) p2p.PeerI { return nil @@ -786,6 +783,8 @@ func TestProcessExpiredTransactions(t *testing.T) { ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() + publisher := &mocks.MessageQueueClientMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { return nil @@ -873,7 +872,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { callbackSender := &mocks.CallbackSenderMock{ SendCallbackFunc: func(_ context.Context, _ *store.Data) {}, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) + cStore := cache.NewMemoryStore() sut, err := metamorph.NewProcessor( metamorphStore, cStore, @@ -950,8 +949,6 @@ func TestProcessorHealth(t *testing.T) { }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ AddPeerFunc: func(_ p2p.PeerI) error { return nil @@ -966,6 +963,7 @@ func TestProcessorHealth(t *testing.T) { }, ShutdownFunc: func() {}, } + cStore := cache.NewMemoryStore() sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), @@ -1019,10 +1017,10 @@ func TestStart(t *testing.T) { return 0, nil }} - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + cStore := cache.NewMemoryStore() + var subscribeMinedTxsFunction func([]byte) error var subscribeSubmitTxsFunction func([]byte) error mqClient := &mocks.MessageQueueClientMock{ diff --git a/internal/metamorph/stats_collector_test.go b/internal/metamorph/stats_collector_test.go index 7bd40efab..2dd35de12 100644 --- a/internal/metamorph/stats_collector_test.go +++ b/internal/metamorph/stats_collector_test.go @@ -3,8 +3,6 @@ package metamorph_test import ( "context" "errors" - "github.com/bitcoin-sv/arc/internal/cache" - "github.com/coocood/freecache" "log/slog" "os" "testing" @@ -51,11 +49,9 @@ func TestStartCollectStats(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - processor, err := metamorph.NewProcessor(mtmStore, cStore, pm, nil, + processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), metamorph.WithStatCollectionInterval(10*time.Millisecond), ) diff --git a/internal/test_utils/docker.go b/internal/test_utils/docker.go index 212aa9615..32e5b4445 100644 --- a/internal/test_utils/docker.go +++ b/internal/test_utils/docker.go @@ -1,11 +1,13 @@ package testutils import ( + "context" "errors" "fmt" "net/url" "os" + "github.com/go-redis/redis/v8" "github.com/ordishs/go-bitcoin" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -168,3 +170,49 @@ func RunNode(pool *dockertest.Pool, port, name string, cmds ...string) (*dockert return resource, hostPort, nil } + +func RunRedis(pool *dockertest.Pool, port, name string, cmds ...string) (*dockertest.Resource, string, error) { + opts := dockertest.RunOptions{ + Repository: "redis", + Tag: "7.4.1", + ExposedPorts: []string{"6379"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "6379": { + {HostIP: "0.0.0.0", HostPort: port}, + }, + }, + Name: name, + Cmd: cmds, + } + + resource, err := pool.RunWithOptions(&opts, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + if err != nil { + return nil, "", fmt.Errorf("failed to create resource: %v", err) + } + + hostPort := resource.GetPort("6379/tcp") + + err = pool.Retry(func() error { + c := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 1, + }) + + ctx := context.Background() + status := c.Ping(ctx) + _, err := status.Result() + return err + }) + if err != nil { + return nil, "", fmt.Errorf("failed to create resource: %v", err) + } + + return resource, hostPort, nil +}