Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCO-205: use cache for updating txs in metamorph #601

Merged
merged 8 commits into from
Oct 11, 2024
Merged
1 change: 1 addition & 0 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

processor, err := metamorph.NewProcessor(
metamorphStore,
cacheStore,
pm,
statusMessageCh,
processorOpts...,
Expand Down
7 changes: 6 additions & 1 deletion internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import (
"time"
)

var ErrCacheNotFound = errors.New("key not found in cache")
var (
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
)

type Store interface {
Get(key string) ([]byte, error)
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/freecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (f *FreecacheStore) Get(key string) ([]byte, error) {
if errors.Is(err, freecache.ErrNotFound) {
return nil, ErrCacheNotFound
}
return nil, err
return nil, errors.Join(ErrCacheFailedToGet, err)
}
return value, nil
}
Expand All @@ -35,7 +35,7 @@ func (f *FreecacheStore) Get(key string) ([]byte, error) {
func (f *FreecacheStore) Set(key string, value []byte, ttl time.Duration) error {
err := f.cache.Set([]byte(key), value, int(ttl.Seconds()))
if err != nil {
return err
return errors.Join(ErrCacheFailedToSet, err)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (r *RedisStore) Get(key string) ([]byte, error) {
if errors.Is(err, redis.Nil) {
return nil, ErrCacheNotFound
} else if err != nil {
return nil, err
return nil, errors.Join(ErrCacheFailedToGet, err)
}
return []byte(result), nil
}
Expand All @@ -37,7 +37,7 @@ func (r *RedisStore) Get(key string) ([]byte, error) {
func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error {
err := r.client.Set(r.ctx, key, value, ttl).Err()
if err != nil {
return err
return errors.Join(ErrCacheFailedToSet, err)
}
return nil
}
Expand All @@ -46,7 +46,7 @@ func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error {
func (r *RedisStore) Del(key string) error {
result, err := r.client.Del(r.ctx, key).Result()
if err != nil {
return err
return errors.Join(ErrCacheFailedToDel, err)
}
if result == 0 {
return ErrCacheNotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package integrationtest
import (
"context"
"database/sql"
"github.com/bitcoin-sv/arc/internal/cache"
"github.com/coocood/freecache"
"log"
"log/slog"
"net/url"
Expand All @@ -40,6 +42,7 @@ import (
const (
zmqTopic = "invalidtx"
msgDoubleSpendAttempted = "7b2266726f6d426c6f636b223a2066616c73652c22736f75726365223a2022703270222c2261646472657373223a20226e6f6465323a3138333333222c226e6f64654964223a20312c2274786964223a202238653735616531306638366438613433303434613534633363353764363630643230636462373465323333626534623563393062613735326562646337653838222c2273697a65223a203139312c22686578223a202230313030303030303031313134386239653931646336383232313635306539363861366164613863313531373135656135373864623130376336623563333362363762376636376630323030303030303030366134373330343430323230313863396166396334626634653736383932376263363335363233623434383362656261656334343433396165613838356363666430363163373731636435613032323034613839626531333534613038613539643466316636323235343937366532373466316333333334383334373137363462623936633565393837626539663365343132313033303830373637393438326663343533323461386133326166643832333730646337316365383966373936376536636635646139646430356330366665356137616666666666666666303130613030303030303030303030303030313937366139313434613037363038353032653464646131363662333830343130613633663066653962383830666532383861633030303030303030222c226973496e76616c6964223a20747275652c22697356616c69646174696f6e4572726f72223a2066616c73652c2269734d697373696e67496e70757473223a2066616c73652c226973446f75626c655370656e644465746563746564223a2066616c73652c2269734d656d706f6f6c436f6e666c6963744465746563746564223a20747275652c2269734e6f6e46696e616c223a2066616c73652c22697356616c69646174696f6e54696d656f75744578636565646564223a2066616c73652c2269735374616e646172645478223a20747275652c2272656a656374696f6e436f6465223a203235382c2272656a656374696f6e526561736f6e223a202274786e2d6d656d706f6f6c2d636f6e666c696374222c22636f6c6c6964656457697468223a205b7b2274786964223a202264363461646663653662313035646336626466343735343934393235626630363830326134316130353832353836663333633262313664353337613062376236222c2273697a65223a203139312c22686578223a202230313030303030303031313134386239653931646336383232313635306539363861366164613863313531373135656135373864623130376336623563333362363762376636376630323030303030303030366134373330343430323230376361326162353332623936303130333362316464636138303838353433396366343433666264663262616463656637303964383930616434373661346162353032323032653730666565353935313462313763353635336138313834643730646232646363643062613339623731663730643239386231643939313764333837396663343132313033303830373637393438326663343533323461386133326166643832333730646337316365383966373936376536636635646139646430356330366665356137616666666666666666303130613030303030303030303030303030313937366139313435313335306233653933363037613437616136623161653964343937616336656135366130623132383861633030303030303030227d5d2c2272656a656374696f6e54696d65223a2022323032342d30372d32355431313a30313a35365a227d"
baseCacheSize = 100 * 1024 * 1024
)

// msgDoubleSpendAttempted contains these hashes
Expand Down Expand Up @@ -120,9 +123,11 @@ func TestDoubleSpendDetection(t *testing.T) {
require.NoError(t, err)
defer metamorphStore.Close(context.Background())

cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize))

pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}

processor, err := metamorph.NewProcessor(metamorphStore, pm, statusMessageChannel,
processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel,
metamorph.WithMinedTxsChan(minedTxChannel),
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
Expand Down
30 changes: 23 additions & 7 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ var (

type Processor struct {
store store.MetamorphStore
cacheStore cache.Store
hostname string
pm p2p.PeerManagerI
mqClient MessageQueueClient
Expand Down Expand Up @@ -105,7 +107,7 @@ type CallbackSender interface {
SendCallback(data *store.StoreData)
}

func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, statusMessageChannel chan *PeerTxMessage, opts ...Option) (*Processor, error) {
func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *PeerTxMessage, opts ...Option) (*Processor, error) {
if s == nil {
return nil, ErrStoreNil
}
Expand All @@ -121,6 +123,7 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, statusMessageChan

p := &Processor{
store: s,
cacheStore: c,
hostname: hostname,
pm: pm,
mapExpiryTime: mapExpiryTimeDefault,
Expand Down Expand Up @@ -397,28 +400,41 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
go func() {
defer p.waitGroup.Done()

statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{}
//statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{}
pawellewandowski98 marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-p.ctx.Done():
return
case statusUpdate := <-p.storageStatusUpdateCh:
// Ensure no duplicate statuses
updateStatusMap(statusUpdatesMap, statusUpdate)
actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate)
if err != nil {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}

if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}
if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(actualUpdateStatusMap)

err = p.cacheStore.Del(CacheStatusUpdateKey)
pawellewandowski98 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
p.logger.Error("failed to reset status update map", slog.String("err", err.Error()))
}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap := p.getStatusUpdateMap()
if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

err := p.cacheStore.Del(CacheStatusUpdateKey)
if err != nil {
p.logger.Error("failed to reset status update map", slog.String("err", err.Error()))
}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
Expand Down
80 changes: 79 additions & 1 deletion internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
package metamorph

import (
"encoding/json"
"errors"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/libsv/go-p2p/chaincfg/chainhash"
)

const CacheStatusUpdateKey = "status-updates"

var (
ErrFailedToSerialize = errors.New("failed to serialize value")
ErrFailedToDeserialize = errors.New("failed to deserialize value")
)

func (p *Processor) GetProcessorMapSize() int {
return p.responseProcessor.getMapLen()
}

func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) {
func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) {
statusUpdatesMap := p.getStatusUpdateMap()

foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]

if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) {
Expand All @@ -19,6 +30,40 @@ func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, sta

statusUpdatesMap[statusUpdate.Hash] = statusUpdate
}

err := p.setStatusUpdateMap(statusUpdatesMap)
if err != nil {
return nil, err
}

return statusUpdatesMap, nil
}

func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error {
bytes, err := serializeStatusMap(statusUpdatesMap)
if err != nil {
return err
}

err = p.cacheStore.Set(CacheStatusUpdateKey, bytes, processStatusUpdatesIntervalDefault)
if err != nil {
return err
}
return nil
}

func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus {
existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey)

if err == nil {
statusUpdatesMap, err := deserializeStatusMap(existingMap)
if err == nil {
return statusUpdatesMap
}
}

// If the key doesn't exist or there was an error unmarshalling the value return new map
return make(map[chainhash.Hash]store.UpdateStatus)
}

func shouldUpdateStatus(new, found store.UpdateStatus) bool {
Expand Down Expand Up @@ -74,3 +119,36 @@ func mergeUnique(arr1, arr2 []string) []string {

return uniqueSlice
}

func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) {
serializeMap := make(map[string]store.UpdateStatus)
for k, v := range updateStatusMap {
serializeMap[k.String()] = v
}

bytes, err := json.Marshal(serializeMap)
if err != nil {
return nil, errors.Join(ErrFailedToSerialize, err)
}
return bytes, nil
}

func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) {
serializeMap := make(map[string]store.UpdateStatus)
updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus)

err := json.Unmarshal(data, &serializeMap)
if err != nil {
return nil, errors.Join(ErrFailedToDeserialize, err)
}

for k, v := range serializeMap {
hash, err := chainhash.NewHashFromStr(k)
if err != nil {
return nil, errors.Join(ErrFailedToDeserialize, err)
}
updateStatusMap[*hash] = v
}

return updateStatusMap, nil
}
Loading
Loading