diff --git a/blocktx/block_notifier_test.go b/blocktx/block_notifier_test.go index f7d67ade6..5dfdece0e 100644 --- a/blocktx/block_notifier_test.go +++ b/blocktx/block_notifier_test.go @@ -41,7 +41,7 @@ func TestNewBlockNotifier(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { storeMock := &store.InterfaceMock{ - GetBlockGapsFunc: func(ctx context.Context) ([]*store.BlockGap, error) { + GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { return []*store.BlockGap{ { Height: 822014, diff --git a/blocktx/peer_handler.go b/blocktx/peer_handler.go index b3f9766ea..b38e7af3e 100644 --- a/blocktx/peer_handler.go +++ b/blocktx/peer_handler.go @@ -93,6 +93,7 @@ type PeerHandler struct { transactionStorageBatchSize int peerHandlerCollector *tracing.PeerHandlerCollector startingHeight int + dataRetentionDays int } func init() { @@ -105,6 +106,12 @@ func WithTransactionBatchSize(size int) func(handler *PeerHandler) { } } +func WithRetentionDays(dataRetentionDays int) func(handler *PeerHandler) { + return func(p *PeerHandler) { + p.dataRetentionDays = dataRetentionDays + } +} + func NewPeerHandler(logger *slog.Logger, storeI store.Interface, blockCh chan *blocktx_api.Block, startingHeight int, opts ...func(*PeerHandler)) *PeerHandler { evictionFunc := func(hash chainhash.Hash, peers []p2p.PeerI) bool { msg := wire.NewMsgGetData() @@ -378,6 +385,11 @@ func (bs *PeerHandler) HandleBlock(wireMsg wire.Message, peer p2p.PeerI) error { return nil } +const ( + hoursPerDay = 24 + blocksPerHour = 6 +) + func (bs *PeerHandler) FillGaps(peer p2p.PeerI) error { primary, err := bs.CheckPrimary() if err != nil { @@ -388,7 +400,9 @@ func (bs *PeerHandler) FillGaps(peer p2p.PeerI) error { return nil } - blockHeightGaps, err := bs.store.GetBlockGaps(context.Background()) + heightRange := bs.dataRetentionDays * hoursPerDay * blocksPerHour + + blockHeightGaps, err := bs.store.GetBlockGaps(context.Background(), heightRange) if err != nil { return err } diff --git a/blocktx/peer_handler_test.go b/blocktx/peer_handler_test.go index a36c370ef..45f7551c4 100644 --- a/blocktx/peer_handler_test.go +++ b/blocktx/peer_handler_test.go @@ -379,7 +379,7 @@ func TestFillGaps(t *testing.T) { t.Run(tc.name, func(t *testing.T) { bockChannel := make(chan *blocktx_api.Block, 1) var storeMock = &store.InterfaceMock{ - GetBlockGapsFunc: func(ctx context.Context) ([]*store.BlockGap, error) { + GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { return tc.blockGaps, tc.getBlockGapsErr }, PrimaryBlocktxFunc: func(ctx context.Context) (string, error) { diff --git a/blocktx/store/interface.go b/blocktx/store/interface.go index 1e50f313b..701c5fc4a 100644 --- a/blocktx/store/interface.go +++ b/blocktx/store/interface.go @@ -35,6 +35,6 @@ type Interface interface { OrphanHeight(ctx context.Context, height uint64) error SetOrphanHeight(ctx context.Context, height uint64, orphaned bool) error GetMinedTransactionsForBlock(ctx context.Context, blockAndSource *blocktx_api.BlockAndSource) (*blocktx_api.MinedTransactions, error) - GetBlockGaps(ctx context.Context) ([]*BlockGap, error) + GetBlockGaps(ctx context.Context, heightRange int) ([]*BlockGap, error) Close() error } diff --git a/blocktx/store/mock.go b/blocktx/store/mock.go index a29e35a47..6c00129b1 100644 --- a/blocktx/store/mock.go +++ b/blocktx/store/mock.go @@ -29,7 +29,7 @@ var _ Interface = &InterfaceMock{} // GetBlockForHeightFunc: func(ctx context.Context, height uint64) (*blocktx_api.Block, error) { // panic("mock out the GetBlockForHeight method") // }, -// GetBlockGapsFunc: func(ctx context.Context) ([]*BlockGap, error) { +// GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*BlockGap, error) { // panic("mock out the GetBlockGaps method") // }, // GetBlockTransactionsFunc: func(ctx context.Context, block *blocktx_api.Block) (*blocktx_api.Transactions, error) { @@ -91,7 +91,7 @@ type InterfaceMock struct { GetBlockForHeightFunc func(ctx context.Context, height uint64) (*blocktx_api.Block, error) // GetBlockGapsFunc mocks the GetBlockGaps method. - GetBlockGapsFunc func(ctx context.Context) ([]*BlockGap, error) + GetBlockGapsFunc func(ctx context.Context, heightRange int) ([]*BlockGap, error) // GetBlockTransactionsFunc mocks the GetBlockTransactions method. GetBlockTransactionsFunc func(ctx context.Context, block *blocktx_api.Block) (*blocktx_api.Transactions, error) @@ -158,6 +158,8 @@ type InterfaceMock struct { GetBlockGaps []struct { // Ctx is the ctx argument value. Ctx context.Context + // HeightRange is the heightRange argument value. + HeightRange int } // GetBlockTransactions holds details about calls to the GetBlockTransactions method. GetBlockTransactions []struct { @@ -384,19 +386,21 @@ func (mock *InterfaceMock) GetBlockForHeightCalls() []struct { } // GetBlockGaps calls GetBlockGapsFunc. -func (mock *InterfaceMock) GetBlockGaps(ctx context.Context) ([]*BlockGap, error) { +func (mock *InterfaceMock) GetBlockGaps(ctx context.Context, heightRange int) ([]*BlockGap, error) { if mock.GetBlockGapsFunc == nil { panic("InterfaceMock.GetBlockGapsFunc: method is nil but Interface.GetBlockGaps was just called") } callInfo := struct { - Ctx context.Context + Ctx context.Context + HeightRange int }{ - Ctx: ctx, + Ctx: ctx, + HeightRange: heightRange, } mock.lockGetBlockGaps.Lock() mock.calls.GetBlockGaps = append(mock.calls.GetBlockGaps, callInfo) mock.lockGetBlockGaps.Unlock() - return mock.GetBlockGapsFunc(ctx) + return mock.GetBlockGapsFunc(ctx, heightRange) } // GetBlockGapsCalls gets all the calls that were made to GetBlockGaps. @@ -404,10 +408,12 @@ func (mock *InterfaceMock) GetBlockGaps(ctx context.Context) ([]*BlockGap, error // // len(mockedInterface.GetBlockGapsCalls()) func (mock *InterfaceMock) GetBlockGapsCalls() []struct { - Ctx context.Context + Ctx context.Context + HeightRange int } { var calls []struct { - Ctx context.Context + Ctx context.Context + HeightRange int } mock.lockGetBlockGaps.RLock() calls = mock.calls.GetBlockGaps diff --git a/blocktx/store/sql/fixtures/get_block_gaps/blocks.yaml b/blocktx/store/sql/fixtures/get_block_gaps/blocks.yaml index bbd165414..f84cfc00f 100644 --- a/blocktx/store/sql/fixtures/get_block_gaps/blocks.yaml +++ b/blocktx/store/sql/fixtures/get_block_gaps/blocks.yaml @@ -1,4 +1,16 @@ +- inserted_at: 2023-12-10 14:00:00 + id: 0 + hash: 0x72ad227eaaf73d36bc86f46347310c9b21a360b277c3000a0000000000000000 + prevhash: 0x4ad773b1a464129a0ed8c7a8c71bb98175f0f01da1793f0e0000000000000000 + merkleroot: 0x145b33264b4440278446f4cb5008dcf87e54e7827a215da9621b652eb17eef88 + height: 822010 + processed_at: 2023-12-10 14:10:00 + size: 244000000 + tx_count: 4437 + orphanedyn: false + inserted_at_num: 2023121014 + - inserted_at: 2023-12-10 14:00:00 id: 1 hash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 @@ -48,7 +60,7 @@ hash: 0x76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000 prevhash: 0x5696fc6e504b6aa2ae5d9c46b9418192dc61bd1b2e3364030000000000000000 merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 - height: 8220120 + height: 822020 processed_at: 2023-12-15 14:50:00 size: 8630000 tx_count: 36724 @@ -59,7 +71,7 @@ hash: 0x1d5fc8020fd68baea5c0cad654f04eb791a81100c51045090000000000000000 prevhash: 0x76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000 merkleroot: 0x3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c - height: 8220121 + height: 822021 processed_at: 2023-12-15 15:00:00 size: 1620000 tx_count: 5578 diff --git a/blocktx/store/sql/get_block_gaps.go b/blocktx/store/sql/get_block_gaps.go index 00f53574c..0c62b6e67 100644 --- a/blocktx/store/sql/get_block_gaps.go +++ b/blocktx/store/sql/get_block_gaps.go @@ -8,7 +8,7 @@ import ( "github.com/ordishs/gocore" ) -func (s *SQL) GetBlockGaps(ctx context.Context) ([]*store.BlockGap, error) { +func (s *SQL) GetBlockGaps(ctx context.Context, blockHeightRange int) ([]*store.BlockGap, error) { start := gocore.CurrentNanos() defer func() { gocore.NewStat("blocktx").NewStat("GetBlockGaps").AddTime(start) @@ -22,14 +22,14 @@ func (s *SQL) GetBlockGaps(ctx context.Context) ([]*store.BlockGap, error) { SELECT bl.block_heights AS missing_block_height FROM ( SELECT unnest(ARRAY( SELECT a.n - FROM generate_series((SELECT min(height) AS block_height FROM blocks b), (SELECT max(height) AS block_height FROM blocks b)) AS a(n) + FROM generate_series((SELECT max(height) - $1 AS block_height FROM blocks b), (SELECT max(height) AS block_height FROM blocks b)) AS a(n) )) AS block_heights) AS bl LEFT JOIN blocks blks ON blks.height = bl.block_heights WHERE blks.height IS NULL ) AS missing_blocks ON blocks.height = missing_blocks.missing_block_height + 1 ORDER BY missing_blocks.missing_block_height DESC;` - rows, err := s.db.QueryContext(ctx, q) + rows, err := s.db.QueryContext(ctx, q, blockHeightRange) if err != nil { return nil, err } diff --git a/blocktx/store/sql/get_block_gaps_test.go b/blocktx/store/sql/get_block_gaps_test.go index d289a2ec4..e9d30885e 100644 --- a/blocktx/store/sql/get_block_gaps_test.go +++ b/blocktx/store/sql/get_block_gaps_test.go @@ -34,7 +34,7 @@ func (s *GetBlockGapTestSuite) Test() { require.NoError(s.T(), err) ctx := context.Background() - blockGaps, err := st.GetBlockGaps(ctx) + blockGaps, err := st.GetBlockGaps(ctx, 7) require.NoError(s.T(), err) require.Equal(s.T(), 2, len(blockGaps)) @@ -45,12 +45,12 @@ func (s *GetBlockGapTestSuite) Test() { expectedBlockGaps := []*store.BlockGap{ { - Height: 822014, - Hash: hash822014, + Height: 822019, + Hash: hash822019, }, { - Height: 8220119, - Hash: hash822019, + Height: 822014, + Hash: hash822014, }, } diff --git a/cmd/blocktx.go b/cmd/blocktx.go index b347dbf27..a24702564 100644 --- a/cmd/blocktx.go +++ b/cmd/blocktx.go @@ -35,7 +35,12 @@ func StartBlockTx(logger *slog.Logger) (func(), error) { return nil, err } - peerHandler := blocktx.NewPeerHandler(logger, blockStore, blockCh, startingBlockHeight) + recordRetentionDays, err := config.GetInt("blocktx.db.cleanData.recordRetentionDays") + if err != nil { + return nil, err + } + + peerHandler := blocktx.NewPeerHandler(logger, blockStore, blockCh, startingBlockHeight, blocktx.WithRetentionDays(recordRetentionDays)) network, err := config.GetNetwork() if err != nil {