Skip to content

Commit

Permalink
Merge pull request #451 from bitcoin-sv/fix/del-block-processing-rows
Browse files Browse the repository at this point in the history
Del block processing function returns affected rows
  • Loading branch information
boecklim authored Jun 18, 2024
2 parents a9a22b7 + 73c3192 commit 5dc27e4
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 176 deletions.
8 changes: 4 additions & 4 deletions internal/blocktx/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {

blockId, err := ph.insertBlock(ctx, &blockHash, &merkleRoot, &previousBlockHash, msg.Height)
if err != nil {
errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
_, errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
if errDel != nil {
ph.logger.Error("failed to delete block processing - after inserting block failed", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
Expand All @@ -493,15 +493,15 @@ func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {
calculatedMerkleTree := buildMerkleTreeStoreChainHash(ctx, msg.TransactionHashes)

if !merkleRoot.IsEqual(calculatedMerkleTree[len(calculatedMerkleTree)-1]) {
errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
_, errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
if errDel != nil {
ph.logger.Error("failed to delete block processing - after merkle root mismatch", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
return fmt.Errorf("merkle root mismatch for block %s", blockHash.String())
}

if err = ph.markTransactionsAsMined(ctx, blockId, calculatedMerkleTree, msg.Height, &blockHash); err != nil {
errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
_, errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
if errDel != nil {
ph.logger.Error("failed to delete block processing - after marking transactions as mined failed", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
Expand All @@ -518,7 +518,7 @@ func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {
}

if err = ph.markBlockAsProcessed(ctx, block); err != nil {
errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
_, errDel := ph.store.DelBlockProcessing(ctx, &blockHash, ph.hostname)
if errDel != nil {
ph.logger.Error("failed to delete block processing - after marking block as processed failed", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
Expand Down
19 changes: 11 additions & 8 deletions internal/blocktx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,35 @@ func (s *Server) Health(_ context.Context, _ *emptypb.Empty) (*blocktx_api.Healt
}, nil
}

func (s *Server) ClearTransactions(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.ClearDataResponse, error) {
func (s *Server) ClearTransactions(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.RowsAffectedResponse, error) {
return s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "transactions")
}

func (s *Server) ClearBlocks(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.ClearDataResponse, error) {
func (s *Server) ClearBlocks(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.RowsAffectedResponse, error) {
return s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "blocks")
}

func (s *Server) ClearBlockTransactionsMap(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.ClearDataResponse, error) {
func (s *Server) ClearBlockTransactionsMap(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.RowsAffectedResponse, error) {
return s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "block_transactions_map")
}

func (s *Server) DelUnfinishedBlockProcessing(ctx context.Context, req *blocktx_api.DelUnfinishedBlockProcessingRequest) (*emptypb.Empty, error) {
func (s *Server) DelUnfinishedBlockProcessing(ctx context.Context, req *blocktx_api.DelUnfinishedBlockProcessingRequest) (*blocktx_api.RowsAffectedResponse, error) {
bhs, err := s.store.GetBlockHashesProcessingInProgress(ctx, req.GetProcessedBy())
if err != nil {
return &emptypb.Empty{}, err
return &blocktx_api.RowsAffectedResponse{}, err
}

var rowsTotal int64
for _, bh := range bhs {
err = s.store.DelBlockProcessing(ctx, bh, req.GetProcessedBy())
rows, err := s.store.DelBlockProcessing(ctx, bh, req.GetProcessedBy())
if err != nil {
return &emptypb.Empty{}, err
return &blocktx_api.RowsAffectedResponse{}, err
}

rowsTotal += rows
}

return &emptypb.Empty{}, nil
return &blocktx_api.RowsAffectedResponse{Rows: rowsTotal}, nil
}

func (s *Server) VerifyMerkleRoots(ctx context.Context, req *blocktx_api.MerkleRootsVerificationRequest) (*blocktx_api.MerkleRootVerificationResponse, error) {
Expand Down
66 changes: 66 additions & 0 deletions internal/blocktx/server_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package blocktx

import (
"context"
"errors"
"log/slog"
"os"
"testing"
"time"

"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/bitcoin-sv/arc/pkg/blocktx/blocktx_api"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -36,3 +41,64 @@ func TestStartGRPCServer(t *testing.T) {
})
}
}

func TestDelUnfinishedBlock(t *testing.T) {
tt := []struct {
name string
getBlockHashesProcessingInProgressErr error
delBlockProcessingErr error

expectedRows int64
expectedErrorStr string
}{
{
name: "success",

expectedRows: 6,
},
{
name: "error - getBlockHashesProcessingInProgress",
getBlockHashesProcessingInProgressErr: errors.New("failed to get block hashes processing in progress"),

expectedErrorStr: "failed to get block hashes processing in progress",
expectedRows: 0,
},
{
name: "error - delBlockProcessingErr",
delBlockProcessingErr: errors.New("failed to delete block processing error"),

expectedErrorStr: "failed to delete block processing error",
expectedRows: 0,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
storeMock := &store.BlocktxStoreMock{
GetBlockHashesProcessingInProgressFunc: func(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) {
return []*chainhash.Hash{testdata.TX1Hash, testdata.TX2Hash}, tc.getBlockHashesProcessingInProgressErr
},

DelBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) {
return 3, tc.delBlockProcessingErr
},
}

server := NewServer(storeMock, logger, nil, 0)

resp, err := server.DelUnfinishedBlockProcessing(context.Background(), &blocktx_api.DelUnfinishedBlockProcessingRequest{
ProcessedBy: "host",
})

if tc.expectedErrorStr == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, tc.expectedErrorStr)
return
}

require.Equal(t, tc.expectedRows, resp.Rows)
})
}
}
12 changes: 6 additions & 6 deletions internal/blocktx/store/blocktx_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/blocktx/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type BlocktxStore interface {
UpsertBlockTransactions(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) (registeredTxs []UpsertBlockTransactionsResult, err error)
MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error
GetBlockGaps(ctx context.Context, heightRange int) ([]*BlockGap, error)
ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.ClearDataResponse, error)
ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error)
GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]GetMinedTransactionResult, error)

SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error)
GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error)
DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) error
DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error)
VerifyMerkleRoots(ctx context.Context, merkleRoots []*blocktx_api.MerkleRootVerificationRequest, maxAllowedBlockHeightMismatch int) (*blocktx_api.MerkleRootVerificationResponse, error)

Ping(ctx context.Context) error
Expand Down
4 changes: 2 additions & 2 deletions internal/blocktx/store/postgresql/clear_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
numericalDateHourLayout = "2006010215"
)

func (p *PostgreSQL) ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.ClearDataResponse, error) {
func (p *PostgreSQL) ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) {

now := p.now()
deleteBeforeDate := now.Add(-24 * time.Hour * time.Duration(retentionDays))
Expand All @@ -28,5 +28,5 @@ func (p *PostgreSQL) ClearBlocktxTable(ctx context.Context, retentionDays int32,
return nil, fmt.Errorf("unable to delete rows: %v", err)
}
rows, _ := res.RowsAffected()
return &blocktx_api.ClearDataResponse{Rows: rows}, nil
return &blocktx_api.RowsAffectedResponse{Rows: rows}, nil
}
4 changes: 2 additions & 2 deletions internal/blocktx/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,10 @@ func TestPostgresDB(t *testing.T) {
require.Len(t, blockHashes, 1)
require.True(t, bhInProgress.IsEqual(blockHashes[0]))

err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-1")
_, err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-1")
require.ErrorIs(t, err, store.ErrBlockNotFound)

err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-2")
_, err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-2")
require.NoError(t, err)

blockHashes, err = postgresDB.GetBlockHashesProcessingInProgress(ctx, "pod-2")
Expand Down
9 changes: 4 additions & 5 deletions internal/blocktx/store/postgresql/set_block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"

"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/lib/pq"
"github.com/libsv/go-p2p/chaincfg/chainhash"
Expand Down Expand Up @@ -38,7 +37,7 @@ func (p *PostgreSQL) SetBlockProcessing(ctx context.Context, hash *chainhash.Has
return processedBy, nil
}

func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) error {
func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) {
if tracer != nil {
var span trace.Span
ctx, span = tracer.Start(ctx, "InsertBlock")
Expand All @@ -51,14 +50,14 @@ func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Has

res, err := p.db.ExecContext(ctx, q, hash[:], processedBy)
if err != nil {
return err
return 0, err
}
rowsAffected, _ := res.RowsAffected()
if rowsAffected != 1 {
return store.ErrBlockNotFound
return 0, store.ErrBlockNotFound
}

return nil
return rowsAffected, nil
}

func (p *PostgreSQL) GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) {
Expand Down
6 changes: 3 additions & 3 deletions internal/k8s_watcher/mock/blocktx_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions internal/k8s_watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ func (c *Watcher) watchBlocktx() {
break retryLoop
}

err = c.blocktxClient.DelUnfinishedBlockProcessing(ctx, podName)
rows, err := c.blocktxClient.DelUnfinishedBlockProcessing(ctx, podName)
if err != nil {
c.logger.Error("Failed to delete unfinished block processing", slog.String("pod-name", podName), slog.String("err", err.Error()))
continue
}
c.logger.Info("Deleted unfinished block processing", slog.String("pod-name", podName))
c.logger.Info("Deleted unfinished block processing", slog.Int64("rows-affected", rows), slog.String("pod-name", podName))
break
}
}
Expand Down Expand Up @@ -210,16 +210,16 @@ func (c *Watcher) watchMetamorph() {
i++

if i > maxRetries {
c.logger.Error(fmt.Sprintf("failed to unlock metamorph records after %d retries", maxRetries), slog.String("pod-name", podName))
c.logger.Error(fmt.Sprintf("Failed to unlock metamorph records after %d retries", maxRetries), slog.String("pod-name", podName))
break retryLoop
}

resp, err := c.metamorphClient.SetUnlockedByName(ctx, podName)
rows, err := c.metamorphClient.SetUnlockedByName(ctx, podName)
if err != nil {
c.logger.Error("failed to unlock metamorph records", slog.String("pod-name", podName), slog.String("err", err.Error()))
c.logger.Error("Failed to unlock metamorph records", slog.String("pod-name", podName), slog.String("err", err.Error()))
continue
}
c.logger.Info("records unlocked", slog.Int64("rows-affected", resp), slog.String("pod-name", podName))
c.logger.Info("Records unlocked", slog.Int64("rows-affected", rows), slog.String("pod-name", podName))
break
}
}
Expand Down
4 changes: 1 addition & 3 deletions internal/k8s_watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ func TestStartBlocktxWatcher(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
metamorphMock := &mock.TransactionMaintainerMock{}
blocktxMock := &mock.BlocktxClientMock{
DelUnfinishedBlockProcessingFunc: func(ctx context.Context, processedBy string) error {
return nil
},
DelUnfinishedBlockProcessingFunc: func(ctx context.Context, processedBy string) (int64, error) { return 0, nil },
}

iteration := 0
Expand Down
Loading

0 comments on commit 5dc27e4

Please sign in to comment.