From 7303948aed37203f45fb61f8eaf81a11c9862740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= <42514703+boecklim@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:22:38 +0100 Subject: [PATCH] refactor: block processing (#742) --- cmd/arc/services/k8s_watcher.go | 11 +- go.mod | 9 +- go.sum | 14 +- .../blocktx/blocktx_api/blocktx_api.pb.go | 45 +-- .../blocktx/blocktx_api/blocktx_api.proto | 3 - .../blocktx_api/blocktx_api_grpc.pb.go | 48 +-- internal/blocktx/blocktx_mocks.go | 3 - internal/blocktx/client.go | 33 -- internal/blocktx/client_test.go | 77 ---- internal/blocktx/interface.go | 4 - internal/blocktx/mocks/blocktx_api_mock.go | 64 +--- internal/blocktx/mocks/blocktx_client_mock.go | 176 --------- internal/blocktx/mocks/peer_manager_mock.go | 339 ------------------ internal/blocktx/processor.go | 118 +----- internal/blocktx/processor_test.go | 80 +---- internal/blocktx/server.go | 24 +- internal/blocktx/server_test.go | 70 ---- .../blocktx/store/mocks/blocktx_store_mock.go | 197 +++------- .../blocktx.block_processing.yaml | 11 +- .../block_processing/blocktx.blocks.yaml | 36 +- .../blocktx.block_processing.yaml | 3 - .../get_block_gaps/blocktx.blocks.yaml | 4 - .../store/postgresql/get_block_gaps.go | 12 +- .../000021_block_transactions.up.sql | 6 +- .../000022_block_processing.down.sql | 3 + .../migrations/000022_block_processing.up.sql | 3 + .../blocktx/store/postgresql/postgres_test.go | 55 +-- .../store/postgresql/set_block_processing.go | 114 +++--- internal/blocktx/store/store.go | 35 +- internal/k8s_watcher/watcher.go | 73 +--- internal/k8s_watcher/watcher_test.go | 94 +---- internal/p2p/mocks/wire_msg_mock.go | 11 +- internal/p2p/p2p_mocks.go | 3 + internal/p2p/wire.go | 14 + internal/validator/helpers_mock.go | 5 - ...erface_mock.go => merkle_verifier_mock.go} | 0 ...er_interface_mock.go => tx_finder_mock.go} | 0 internal/validator/validator_mocks.go | 5 + test/submit_01_single_test.go | 32 +- 39 files changed, 316 insertions(+), 1518 deletions(-) delete mode 100644 internal/blocktx/mocks/blocktx_client_mock.go delete mode 100644 internal/blocktx/mocks/peer_manager_mock.go delete mode 100644 internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.block_processing.yaml create mode 100644 internal/blocktx/store/postgresql/migrations/000022_block_processing.down.sql create mode 100644 internal/blocktx/store/postgresql/migrations/000022_block_processing.up.sql create mode 100644 internal/p2p/wire.go delete mode 100644 internal/validator/helpers_mock.go rename internal/validator/mocks/{merkle_verifier_interface_mock.go => merkle_verifier_mock.go} (100%) rename internal/validator/mocks/{tx_finder_interface_mock.go => tx_finder_mock.go} (100%) create mode 100644 internal/validator/validator_mocks.go diff --git a/cmd/arc/services/k8s_watcher.go b/cmd/arc/services/k8s_watcher.go index ba08d5fd5..446265c3f 100644 --- a/cmd/arc/services/k8s_watcher.go +++ b/cmd/arc/services/k8s_watcher.go @@ -5,8 +5,6 @@ import ( "log/slog" "github.com/bitcoin-sv/arc/config" - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/k8s_watcher" "github.com/bitcoin-sv/arc/internal/k8s_watcher/k8s_client" "github.com/bitcoin-sv/arc/internal/metamorph" @@ -28,14 +26,7 @@ func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), return nil, fmt.Errorf("failed to get k8s-client: %v", err) } - blocktxConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.Prometheus.Endpoint, arcConfig.GrpcMessageSize, nil) - if err != nil { - return nil, fmt.Errorf("failed to connect to block-tx server: %v", err) - } - - blocktxClient := blocktx.NewClient(blocktx_api.NewBlockTxAPIClient(blocktxConn)) - - k8sWatcher := k8s_watcher.New(metamorphClient, blocktxClient, k8sClient, arcConfig.K8sWatcher.Namespace, k8s_watcher.WithLogger(logger)) + k8sWatcher := k8s_watcher.New(metamorphClient, k8sClient, arcConfig.K8sWatcher.Namespace, k8s_watcher.WithLogger(logger)) err = k8sWatcher.Start() if err != nil { return nil, fmt.Errorf("faile to start k8s-watcher: %v", err) diff --git a/go.mod b/go.mod index 33e70724a..9821f0b53 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 - github.com/jackc/pgx/v5 v5.3.1 + github.com/jackc/pgx/v5 v5.7.2 github.com/jedib0t/go-pretty/v6 v6.5.9 github.com/jmoiron/sqlx v1.3.5 github.com/labstack/echo-contrib v0.17.1 @@ -46,7 +46,6 @@ require ( go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.31.0 golang.org/x/sync v0.10.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 @@ -101,7 +100,8 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/yaml v0.3.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -158,7 +158,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/net v0.30.0 // indirect + golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect @@ -166,6 +166,7 @@ require ( golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index e993f8e85..88a6c49f4 100644 --- a/go.sum +++ b/go.sum @@ -165,14 +165,16 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgproto3/v2 v2.3.2 h1:7eY55bdBeCz1F2fTzSz69QC+pG46jYq9/jtSPiJ5nn0= github.com/jackc/pgproto3/v2 v2.3.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= github.com/jackc/pgx/v4 v4.18.1 h1:YP7G1KABtKpB5IHrO9vYwSrCOhs7p3uqhvhhQBptya0= github.com/jackc/pgx/v4 v4.18.1/go.mod h1:FydWkUyadDmdNH/mHnGob881GawxeEm7TcMCzkb+qQE= -github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= -github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU= github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= @@ -417,8 +419,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/blocktx/blocktx_api/blocktx_api.pb.go b/internal/blocktx/blocktx_api/blocktx_api.pb.go index ed33cc5cb..37f2000f2 100644 --- a/internal/blocktx/blocktx_api/blocktx_api.pb.go +++ b/internal/blocktx/blocktx_api/blocktx_api.pb.go @@ -852,7 +852,7 @@ var file_internal_blocktx_blocktx_api_blocktx_api_proto_rawDesc = []byte{ 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4c, 0x4f, 0x4e, 0x47, 0x45, 0x53, 0x54, 0x10, 0x0a, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x54, 0x41, 0x4c, 0x45, 0x10, 0x14, 0x12, 0x0c, 0x0a, 0x08, 0x4f, 0x52, 0x50, 0x48, 0x41, 0x4e, - 0x45, 0x44, 0x10, 0x1e, 0x32, 0xdd, 0x03, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x78, + 0x45, 0x44, 0x10, 0x1e, 0x32, 0xe6, 0x02, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x78, 0x41, 0x50, 0x49, 0x12, 0x3f, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, @@ -867,23 +867,16 @@ var file_internal_blocktx_blocktx_api_blocktx_api_proto_rawDesc = []byte{ 0x16, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x6f, 0x77, 0x73, 0x41, 0x66, 0x66, 0x65, 0x63, 0x74, - 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x1c, - 0x44, 0x65, 0x6c, 0x55, 0x6e, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x12, 0x30, 0x2e, 0x62, - 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x44, 0x65, 0x6c, 0x55, 0x6e, - 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, - 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, - 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x6f, 0x77, - 0x73, 0x41, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x11, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x4d, 0x65, 0x72, - 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x12, 0x2b, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, - 0x74, 0x73, 0x56, 0x65, 0x72, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, - 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x56, 0x65, - 0x72, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x3b, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, - 0x78, 0x5f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x11, + 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, + 0x73, 0x12, 0x2b, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, + 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x56, 0x65, 0x72, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x65, 0x72, + 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x56, 0x65, 0x72, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0f, 0x5a, + 0x0d, 0x2e, 0x3b, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x74, 0x78, 0x5f, 0x61, 0x70, 0x69, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -927,15 +920,13 @@ var file_internal_blocktx_blocktx_api_blocktx_api_proto_depIdxs = []int32{ 14, // 6: blocktx_api.BlockTxAPI.Health:input_type -> google.protobuf.Empty 7, // 7: blocktx_api.BlockTxAPI.ClearBlocks:input_type -> blocktx_api.ClearData 7, // 8: blocktx_api.BlockTxAPI.ClearRegisteredTransactions:input_type -> blocktx_api.ClearData - 9, // 9: blocktx_api.BlockTxAPI.DelUnfinishedBlockProcessing:input_type -> blocktx_api.DelUnfinishedBlockProcessingRequest - 11, // 10: blocktx_api.BlockTxAPI.VerifyMerkleRoots:input_type -> blocktx_api.MerkleRootsVerificationRequest - 1, // 11: blocktx_api.BlockTxAPI.Health:output_type -> blocktx_api.HealthResponse - 8, // 12: blocktx_api.BlockTxAPI.ClearBlocks:output_type -> blocktx_api.RowsAffectedResponse - 8, // 13: blocktx_api.BlockTxAPI.ClearRegisteredTransactions:output_type -> blocktx_api.RowsAffectedResponse - 8, // 14: blocktx_api.BlockTxAPI.DelUnfinishedBlockProcessing:output_type -> blocktx_api.RowsAffectedResponse - 12, // 15: blocktx_api.BlockTxAPI.VerifyMerkleRoots:output_type -> blocktx_api.MerkleRootVerificationResponse - 11, // [11:16] is the sub-list for method output_type - 6, // [6:11] is the sub-list for method input_type + 11, // 9: blocktx_api.BlockTxAPI.VerifyMerkleRoots:input_type -> blocktx_api.MerkleRootsVerificationRequest + 1, // 10: blocktx_api.BlockTxAPI.Health:output_type -> blocktx_api.HealthResponse + 8, // 11: blocktx_api.BlockTxAPI.ClearBlocks:output_type -> blocktx_api.RowsAffectedResponse + 8, // 12: blocktx_api.BlockTxAPI.ClearRegisteredTransactions:output_type -> blocktx_api.RowsAffectedResponse + 12, // 13: blocktx_api.BlockTxAPI.VerifyMerkleRoots:output_type -> blocktx_api.MerkleRootVerificationResponse + 10, // [10:14] is the sub-list for method output_type + 6, // [6:10] is the sub-list for method input_type 6, // [6:6] is the sub-list for extension type_name 6, // [6:6] is the sub-list for extension extendee 0, // [0:6] is the sub-list for field type_name diff --git a/internal/blocktx/blocktx_api/blocktx_api.proto b/internal/blocktx/blocktx_api/blocktx_api.proto index 78e5ed0f7..9fc2e6d99 100644 --- a/internal/blocktx/blocktx_api/blocktx_api.proto +++ b/internal/blocktx/blocktx_api/blocktx_api.proto @@ -17,9 +17,6 @@ service BlockTxAPI { // ClearRegisteredTransactions clears registered transactions rpc ClearRegisteredTransactions(ClearData) returns (RowsAffectedResponse) {} - // DelUnfinishedBlockProcessing deletes unfinished block processing - rpc DelUnfinishedBlockProcessing(DelUnfinishedBlockProcessingRequest) returns (RowsAffectedResponse) {} - // VerifyMerkleRoots verifies the merkle roots existance in blocktx db and returns unverified block heights rpc VerifyMerkleRoots(MerkleRootsVerificationRequest) returns (MerkleRootVerificationResponse) {} } diff --git a/internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go b/internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go index c31176459..5bb4fea12 100644 --- a/internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go +++ b/internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go @@ -20,11 +20,10 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - BlockTxAPI_Health_FullMethodName = "/blocktx_api.BlockTxAPI/Health" - BlockTxAPI_ClearBlocks_FullMethodName = "/blocktx_api.BlockTxAPI/ClearBlocks" - BlockTxAPI_ClearRegisteredTransactions_FullMethodName = "/blocktx_api.BlockTxAPI/ClearRegisteredTransactions" - BlockTxAPI_DelUnfinishedBlockProcessing_FullMethodName = "/blocktx_api.BlockTxAPI/DelUnfinishedBlockProcessing" - BlockTxAPI_VerifyMerkleRoots_FullMethodName = "/blocktx_api.BlockTxAPI/VerifyMerkleRoots" + BlockTxAPI_Health_FullMethodName = "/blocktx_api.BlockTxAPI/Health" + BlockTxAPI_ClearBlocks_FullMethodName = "/blocktx_api.BlockTxAPI/ClearBlocks" + BlockTxAPI_ClearRegisteredTransactions_FullMethodName = "/blocktx_api.BlockTxAPI/ClearRegisteredTransactions" + BlockTxAPI_VerifyMerkleRoots_FullMethodName = "/blocktx_api.BlockTxAPI/VerifyMerkleRoots" ) // BlockTxAPIClient is the client API for BlockTxAPI service. @@ -37,8 +36,6 @@ type BlockTxAPIClient interface { ClearBlocks(ctx context.Context, in *ClearData, opts ...grpc.CallOption) (*RowsAffectedResponse, error) // ClearRegisteredTransactions clears registered transactions ClearRegisteredTransactions(ctx context.Context, in *ClearData, opts ...grpc.CallOption) (*RowsAffectedResponse, error) - // DelUnfinishedBlockProcessing deletes unfinished block processing - DelUnfinishedBlockProcessing(ctx context.Context, in *DelUnfinishedBlockProcessingRequest, opts ...grpc.CallOption) (*RowsAffectedResponse, error) // VerifyMerkleRoots verifies the merkle roots existance in blocktx db and returns unverified block heights VerifyMerkleRoots(ctx context.Context, in *MerkleRootsVerificationRequest, opts ...grpc.CallOption) (*MerkleRootVerificationResponse, error) } @@ -81,16 +78,6 @@ func (c *blockTxAPIClient) ClearRegisteredTransactions(ctx context.Context, in * return out, nil } -func (c *blockTxAPIClient) DelUnfinishedBlockProcessing(ctx context.Context, in *DelUnfinishedBlockProcessingRequest, opts ...grpc.CallOption) (*RowsAffectedResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(RowsAffectedResponse) - err := c.cc.Invoke(ctx, BlockTxAPI_DelUnfinishedBlockProcessing_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *blockTxAPIClient) VerifyMerkleRoots(ctx context.Context, in *MerkleRootsVerificationRequest, opts ...grpc.CallOption) (*MerkleRootVerificationResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(MerkleRootVerificationResponse) @@ -111,8 +98,6 @@ type BlockTxAPIServer interface { ClearBlocks(context.Context, *ClearData) (*RowsAffectedResponse, error) // ClearRegisteredTransactions clears registered transactions ClearRegisteredTransactions(context.Context, *ClearData) (*RowsAffectedResponse, error) - // DelUnfinishedBlockProcessing deletes unfinished block processing - DelUnfinishedBlockProcessing(context.Context, *DelUnfinishedBlockProcessingRequest) (*RowsAffectedResponse, error) // VerifyMerkleRoots verifies the merkle roots existance in blocktx db and returns unverified block heights VerifyMerkleRoots(context.Context, *MerkleRootsVerificationRequest) (*MerkleRootVerificationResponse, error) mustEmbedUnimplementedBlockTxAPIServer() @@ -134,9 +119,6 @@ func (UnimplementedBlockTxAPIServer) ClearBlocks(context.Context, *ClearData) (* func (UnimplementedBlockTxAPIServer) ClearRegisteredTransactions(context.Context, *ClearData) (*RowsAffectedResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ClearRegisteredTransactions not implemented") } -func (UnimplementedBlockTxAPIServer) DelUnfinishedBlockProcessing(context.Context, *DelUnfinishedBlockProcessingRequest) (*RowsAffectedResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method DelUnfinishedBlockProcessing not implemented") -} func (UnimplementedBlockTxAPIServer) VerifyMerkleRoots(context.Context, *MerkleRootsVerificationRequest) (*MerkleRootVerificationResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method VerifyMerkleRoots not implemented") } @@ -215,24 +197,6 @@ func _BlockTxAPI_ClearRegisteredTransactions_Handler(srv interface{}, ctx contex return interceptor(ctx, in, info, handler) } -func _BlockTxAPI_DelUnfinishedBlockProcessing_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DelUnfinishedBlockProcessingRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BlockTxAPIServer).DelUnfinishedBlockProcessing(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: BlockTxAPI_DelUnfinishedBlockProcessing_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BlockTxAPIServer).DelUnfinishedBlockProcessing(ctx, req.(*DelUnfinishedBlockProcessingRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _BlockTxAPI_VerifyMerkleRoots_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(MerkleRootsVerificationRequest) if err := dec(in); err != nil { @@ -270,10 +234,6 @@ var BlockTxAPI_ServiceDesc = grpc.ServiceDesc{ MethodName: "ClearRegisteredTransactions", Handler: _BlockTxAPI_ClearRegisteredTransactions_Handler, }, - { - MethodName: "DelUnfinishedBlockProcessing", - Handler: _BlockTxAPI_DelUnfinishedBlockProcessing_Handler, - }, { MethodName: "VerifyMerkleRoots", Handler: _BlockTxAPI_VerifyMerkleRoots_Handler, diff --git a/internal/blocktx/blocktx_mocks.go b/internal/blocktx/blocktx_mocks.go index 79b566e2b..aac49f978 100644 --- a/internal/blocktx/blocktx_mocks.go +++ b/internal/blocktx/blocktx_mocks.go @@ -11,6 +11,3 @@ package blocktx // from client.go //go:generate moq -pkg mocks -out ./mocks/merkle_roots_verifier_mock.go . MerkleRootsVerifier - -// from client.go -//go:generate moq -pkg mocks -out ./mocks/blocktx_client_mock.go . Watcher diff --git a/internal/blocktx/client.go b/internal/blocktx/client.go index 1dc1b410e..b12cfe6d7 100644 --- a/internal/blocktx/client.go +++ b/internal/blocktx/client.go @@ -4,7 +4,6 @@ import ( "context" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" @@ -12,15 +11,8 @@ import ( ) // check if Client implements all necessary interfaces -var _ Watcher = &Client{} var _ MerkleRootsVerifier = &Client{} -type Watcher interface { - Health(ctx context.Context) error - ClearBlocks(ctx context.Context, retentionDays int32) (int64, error) - DelUnfinishedBlockProcessing(ctx context.Context, processedBy string) (int64, error) -} - // MerkleRootsVerifier verifies the merkle roots existence in blocktx db and returns unverified block heights. type MerkleRootsVerifier interface { // VerifyMerkleRoots verifies the merkle roots existence in blocktx db and returns unverified block heights. @@ -44,31 +36,6 @@ func NewClient(client blocktx_api.BlockTxAPIClient) *Client { return btc } -func (btc *Client) Health(ctx context.Context) error { - _, err := btc.client.Health(ctx, &emptypb.Empty{}) - if err != nil { - return err - } - - return nil -} - -func (btc *Client) DelUnfinishedBlockProcessing(ctx context.Context, processedBy string) (int64, error) { - resp, err := btc.client.DelUnfinishedBlockProcessing(ctx, &blocktx_api.DelUnfinishedBlockProcessingRequest{ProcessedBy: processedBy}) - if err != nil { - return 0, err - } - return resp.Rows, nil -} - -func (btc *Client) ClearBlocks(ctx context.Context, retentionDays int32) (int64, error) { - resp, err := btc.client.ClearBlocks(ctx, &blocktx_api.ClearData{RetentionDays: retentionDays}) - if err != nil { - return 0, err - } - return resp.Rows, nil -} - func (btc *Client) VerifyMerkleRoots(ctx context.Context, merkleRootVerificationRequest []MerkleRootVerificationRequest) ([]uint64, error) { merkleRoots := make([]*blocktx_api.MerkleRootVerificationRequest, 0) diff --git a/internal/blocktx/client_test.go b/internal/blocktx/client_test.go index 9ed51f76f..ad564444d 100644 --- a/internal/blocktx/client_test.go +++ b/internal/blocktx/client_test.go @@ -14,83 +14,6 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/mocks" ) -func TestClient_DelUnfinishedBlockProcessing(t *testing.T) { - tt := []struct { - name string - delErr error - - expectedErrorStr string - }{ - { - name: "success", - }, - { - name: "err", - delErr: errors.New("failed to delete unfinished block processing"), - - expectedErrorStr: "failed to delete unfinished block processing", - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - apiClient := &mocks.BlockTxAPIClientMock{ - DelUnfinishedBlockProcessingFunc: func(_ context.Context, _ *blocktx_api.DelUnfinishedBlockProcessingRequest, _ ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) { - return &blocktx_api.RowsAffectedResponse{}, tc.delErr - }, - } - client := blocktx.NewClient(apiClient) - - _, err := client.DelUnfinishedBlockProcessing(context.Background(), "test-1") - if tc.expectedErrorStr != "" { - require.ErrorContains(t, err, tc.expectedErrorStr) - return - } - - require.NoError(t, err) - }) - } -} - -func TestClient_ClearBlocks(t *testing.T) { - tt := []struct { - name string - clearErr error - - expectedErrorStr string - }{ - { - name: "success", - }, - { - name: "err", - clearErr: errors.New("failed to clear data"), - - expectedErrorStr: "failed to clear data", - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - apiClient := &mocks.BlockTxAPIClientMock{ - ClearBlocksFunc: func(_ context.Context, _ *blocktx_api.ClearData, _ ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) { - return &blocktx_api.RowsAffectedResponse{Rows: 5}, tc.clearErr - }, - } - client := blocktx.NewClient(apiClient) - - res, err := client.ClearBlocks(context.Background(), 1) - if tc.expectedErrorStr != "" { - require.ErrorContains(t, err, tc.expectedErrorStr) - return - } - - require.NoError(t, err) - require.Equal(t, int64(5), res) - }) - } -} - func TestClient_VerifyMerkleRoots(t *testing.T) { tt := []struct { name string diff --git a/internal/blocktx/interface.go b/internal/blocktx/interface.go index 9fa6d5264..a17b15f6d 100644 --- a/internal/blocktx/interface.go +++ b/internal/blocktx/interface.go @@ -1,15 +1,11 @@ package blocktx import ( - "errors" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" ) -var ErrMerklePathNotFoundForTransaction = errors.New("merkle path not found for transaction") - type BlockRequest struct { Hash *chainhash.Hash Peer p2p.PeerI diff --git a/internal/blocktx/mocks/blocktx_api_mock.go b/internal/blocktx/mocks/blocktx_api_mock.go index 3bc561e9b..aca054e5f 100644 --- a/internal/blocktx/mocks/blocktx_api_mock.go +++ b/internal/blocktx/mocks/blocktx_api_mock.go @@ -27,9 +27,6 @@ var _ blocktx_api.BlockTxAPIClient = &BlockTxAPIClientMock{} // ClearRegisteredTransactionsFunc: func(ctx context.Context, in *blocktx_api.ClearData, opts ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) { // panic("mock out the ClearRegisteredTransactions method") // }, -// DelUnfinishedBlockProcessingFunc: func(ctx context.Context, in *blocktx_api.DelUnfinishedBlockProcessingRequest, opts ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) { -// panic("mock out the DelUnfinishedBlockProcessing method") -// }, // HealthFunc: func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*blocktx_api.HealthResponse, error) { // panic("mock out the Health method") // }, @@ -49,9 +46,6 @@ type BlockTxAPIClientMock struct { // ClearRegisteredTransactionsFunc mocks the ClearRegisteredTransactions method. ClearRegisteredTransactionsFunc func(ctx context.Context, in *blocktx_api.ClearData, opts ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) - // DelUnfinishedBlockProcessingFunc mocks the DelUnfinishedBlockProcessing method. - DelUnfinishedBlockProcessingFunc func(ctx context.Context, in *blocktx_api.DelUnfinishedBlockProcessingRequest, opts ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) - // HealthFunc mocks the Health method. HealthFunc func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*blocktx_api.HealthResponse, error) @@ -78,15 +72,6 @@ type BlockTxAPIClientMock struct { // Opts is the opts argument value. Opts []grpc.CallOption } - // DelUnfinishedBlockProcessing holds details about calls to the DelUnfinishedBlockProcessing method. - DelUnfinishedBlockProcessing []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // In is the in argument value. - In *blocktx_api.DelUnfinishedBlockProcessingRequest - // Opts is the opts argument value. - Opts []grpc.CallOption - } // Health holds details about calls to the Health method. Health []struct { // Ctx is the ctx argument value. @@ -106,11 +91,10 @@ type BlockTxAPIClientMock struct { Opts []grpc.CallOption } } - lockClearBlocks sync.RWMutex - lockClearRegisteredTransactions sync.RWMutex - lockDelUnfinishedBlockProcessing sync.RWMutex - lockHealth sync.RWMutex - lockVerifyMerkleRoots sync.RWMutex + lockClearBlocks sync.RWMutex + lockClearRegisteredTransactions sync.RWMutex + lockHealth sync.RWMutex + lockVerifyMerkleRoots sync.RWMutex } // ClearBlocks calls ClearBlocksFunc. @@ -193,46 +177,6 @@ func (mock *BlockTxAPIClientMock) ClearRegisteredTransactionsCalls() []struct { return calls } -// DelUnfinishedBlockProcessing calls DelUnfinishedBlockProcessingFunc. -func (mock *BlockTxAPIClientMock) DelUnfinishedBlockProcessing(ctx context.Context, in *blocktx_api.DelUnfinishedBlockProcessingRequest, opts ...grpc.CallOption) (*blocktx_api.RowsAffectedResponse, error) { - if mock.DelUnfinishedBlockProcessingFunc == nil { - panic("BlockTxAPIClientMock.DelUnfinishedBlockProcessingFunc: method is nil but BlockTxAPIClient.DelUnfinishedBlockProcessing was just called") - } - callInfo := struct { - Ctx context.Context - In *blocktx_api.DelUnfinishedBlockProcessingRequest - Opts []grpc.CallOption - }{ - Ctx: ctx, - In: in, - Opts: opts, - } - mock.lockDelUnfinishedBlockProcessing.Lock() - mock.calls.DelUnfinishedBlockProcessing = append(mock.calls.DelUnfinishedBlockProcessing, callInfo) - mock.lockDelUnfinishedBlockProcessing.Unlock() - return mock.DelUnfinishedBlockProcessingFunc(ctx, in, opts...) -} - -// DelUnfinishedBlockProcessingCalls gets all the calls that were made to DelUnfinishedBlockProcessing. -// Check the length with: -// -// len(mockedBlockTxAPIClient.DelUnfinishedBlockProcessingCalls()) -func (mock *BlockTxAPIClientMock) DelUnfinishedBlockProcessingCalls() []struct { - Ctx context.Context - In *blocktx_api.DelUnfinishedBlockProcessingRequest - Opts []grpc.CallOption -} { - var calls []struct { - Ctx context.Context - In *blocktx_api.DelUnfinishedBlockProcessingRequest - Opts []grpc.CallOption - } - mock.lockDelUnfinishedBlockProcessing.RLock() - calls = mock.calls.DelUnfinishedBlockProcessing - mock.lockDelUnfinishedBlockProcessing.RUnlock() - return calls -} - // Health calls HealthFunc. func (mock *BlockTxAPIClientMock) Health(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*blocktx_api.HealthResponse, error) { if mock.HealthFunc == nil { diff --git a/internal/blocktx/mocks/blocktx_client_mock.go b/internal/blocktx/mocks/blocktx_client_mock.go deleted file mode 100644 index 2eed4073e..000000000 --- a/internal/blocktx/mocks/blocktx_client_mock.go +++ /dev/null @@ -1,176 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "context" - "github.com/bitcoin-sv/arc/internal/blocktx" - "sync" -) - -// Ensure, that WatcherMock does implement blocktx.Watcher. -// If this is not the case, regenerate this file with moq. -var _ blocktx.Watcher = &WatcherMock{} - -// WatcherMock is a mock implementation of blocktx.Watcher. -// -// func TestSomethingThatUsesWatcher(t *testing.T) { -// -// // make and configure a mocked blocktx.Watcher -// mockedWatcher := &WatcherMock{ -// ClearBlocksFunc: func(ctx context.Context, retentionDays int32) (int64, error) { -// panic("mock out the ClearBlocks method") -// }, -// DelUnfinishedBlockProcessingFunc: func(ctx context.Context, processedBy string) (int64, error) { -// panic("mock out the DelUnfinishedBlockProcessing method") -// }, -// HealthFunc: func(ctx context.Context) error { -// panic("mock out the Health method") -// }, -// } -// -// // use mockedWatcher in code that requires blocktx.Watcher -// // and then make assertions. -// -// } -type WatcherMock struct { - // ClearBlocksFunc mocks the ClearBlocks method. - ClearBlocksFunc func(ctx context.Context, retentionDays int32) (int64, error) - - // DelUnfinishedBlockProcessingFunc mocks the DelUnfinishedBlockProcessing method. - DelUnfinishedBlockProcessingFunc func(ctx context.Context, processedBy string) (int64, error) - - // HealthFunc mocks the Health method. - HealthFunc func(ctx context.Context) error - - // calls tracks calls to the methods. - calls struct { - // ClearBlocks holds details about calls to the ClearBlocks method. - ClearBlocks []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // RetentionDays is the retentionDays argument value. - RetentionDays int32 - } - // DelUnfinishedBlockProcessing holds details about calls to the DelUnfinishedBlockProcessing method. - DelUnfinishedBlockProcessing []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } - // Health holds details about calls to the Health method. - Health []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } - } - lockClearBlocks sync.RWMutex - lockDelUnfinishedBlockProcessing sync.RWMutex - lockHealth sync.RWMutex -} - -// ClearBlocks calls ClearBlocksFunc. -func (mock *WatcherMock) ClearBlocks(ctx context.Context, retentionDays int32) (int64, error) { - if mock.ClearBlocksFunc == nil { - panic("WatcherMock.ClearBlocksFunc: method is nil but Watcher.ClearBlocks was just called") - } - callInfo := struct { - Ctx context.Context - RetentionDays int32 - }{ - Ctx: ctx, - RetentionDays: retentionDays, - } - mock.lockClearBlocks.Lock() - mock.calls.ClearBlocks = append(mock.calls.ClearBlocks, callInfo) - mock.lockClearBlocks.Unlock() - return mock.ClearBlocksFunc(ctx, retentionDays) -} - -// ClearBlocksCalls gets all the calls that were made to ClearBlocks. -// Check the length with: -// -// len(mockedWatcher.ClearBlocksCalls()) -func (mock *WatcherMock) ClearBlocksCalls() []struct { - Ctx context.Context - RetentionDays int32 -} { - var calls []struct { - Ctx context.Context - RetentionDays int32 - } - mock.lockClearBlocks.RLock() - calls = mock.calls.ClearBlocks - mock.lockClearBlocks.RUnlock() - return calls -} - -// DelUnfinishedBlockProcessing calls DelUnfinishedBlockProcessingFunc. -func (mock *WatcherMock) DelUnfinishedBlockProcessing(ctx context.Context, processedBy string) (int64, error) { - if mock.DelUnfinishedBlockProcessingFunc == nil { - panic("WatcherMock.DelUnfinishedBlockProcessingFunc: method is nil but Watcher.DelUnfinishedBlockProcessing was just called") - } - callInfo := struct { - Ctx context.Context - ProcessedBy string - }{ - Ctx: ctx, - ProcessedBy: processedBy, - } - mock.lockDelUnfinishedBlockProcessing.Lock() - mock.calls.DelUnfinishedBlockProcessing = append(mock.calls.DelUnfinishedBlockProcessing, callInfo) - mock.lockDelUnfinishedBlockProcessing.Unlock() - return mock.DelUnfinishedBlockProcessingFunc(ctx, processedBy) -} - -// DelUnfinishedBlockProcessingCalls gets all the calls that were made to DelUnfinishedBlockProcessing. -// Check the length with: -// -// len(mockedWatcher.DelUnfinishedBlockProcessingCalls()) -func (mock *WatcherMock) DelUnfinishedBlockProcessingCalls() []struct { - Ctx context.Context - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - ProcessedBy string - } - mock.lockDelUnfinishedBlockProcessing.RLock() - calls = mock.calls.DelUnfinishedBlockProcessing - mock.lockDelUnfinishedBlockProcessing.RUnlock() - return calls -} - -// Health calls HealthFunc. -func (mock *WatcherMock) Health(ctx context.Context) error { - if mock.HealthFunc == nil { - panic("WatcherMock.HealthFunc: method is nil but Watcher.Health was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockHealth.Lock() - mock.calls.Health = append(mock.calls.Health, callInfo) - mock.lockHealth.Unlock() - return mock.HealthFunc(ctx) -} - -// HealthCalls gets all the calls that were made to Health. -// Check the length with: -// -// len(mockedWatcher.HealthCalls()) -func (mock *WatcherMock) HealthCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockHealth.RLock() - calls = mock.calls.Health - mock.lockHealth.RUnlock() - return calls -} diff --git a/internal/blocktx/mocks/peer_manager_mock.go b/internal/blocktx/mocks/peer_manager_mock.go deleted file mode 100644 index 646f80919..000000000 --- a/internal/blocktx/mocks/peer_manager_mock.go +++ /dev/null @@ -1,339 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "sync" -) - -// Ensure, that PeerManagerMock does implement blocktx.PeerManager. -// If this is not the case, regenerate this file with moq. -var _ blocktx.PeerManager = &PeerManagerMock{} - -// PeerManagerMock is a mock implementation of blocktx.PeerManager. -// -// func TestSomethingThatUsesPeerManager(t *testing.T) { -// -// // make and configure a mocked blocktx.PeerManager -// mockedPeerManager := &PeerManagerMock{ -// AddPeerFunc: func(peer p2p.PeerI) error { -// panic("mock out the AddPeer method") -// }, -// AnnounceBlockFunc: func(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { -// panic("mock out the AnnounceBlock method") -// }, -// AnnounceTransactionFunc: func(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { -// panic("mock out the AnnounceTransaction method") -// }, -// GetPeersFunc: func() []p2p.PeerI { -// panic("mock out the GetPeers method") -// }, -// RequestBlockFunc: func(blockHash *chainhash.Hash) p2p.PeerI { -// panic("mock out the RequestBlock method") -// }, -// RequestTransactionFunc: func(txHash *chainhash.Hash) p2p.PeerI { -// panic("mock out the RequestTransaction method") -// }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// } -// -// // use mockedPeerManager in code that requires blocktx.PeerManager -// // and then make assertions. -// -// } -type PeerManagerMock struct { - // AddPeerFunc mocks the AddPeer method. - AddPeerFunc func(peer p2p.PeerI) error - - // AnnounceBlockFunc mocks the AnnounceBlock method. - AnnounceBlockFunc func(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - - // AnnounceTransactionFunc mocks the AnnounceTransaction method. - AnnounceTransactionFunc func(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - - // GetPeersFunc mocks the GetPeers method. - GetPeersFunc func() []p2p.PeerI - - // RequestBlockFunc mocks the RequestBlock method. - RequestBlockFunc func(blockHash *chainhash.Hash) p2p.PeerI - - // RequestTransactionFunc mocks the RequestTransaction method. - RequestTransactionFunc func(txHash *chainhash.Hash) p2p.PeerI - - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - - // calls tracks calls to the methods. - calls struct { - // AddPeer holds details about calls to the AddPeer method. - AddPeer []struct { - // Peer is the peer argument value. - Peer p2p.PeerI - } - // AnnounceBlock holds details about calls to the AnnounceBlock method. - AnnounceBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - // Peers is the peers argument value. - Peers []p2p.PeerI - } - // AnnounceTransaction holds details about calls to the AnnounceTransaction method. - AnnounceTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - // Peers is the peers argument value. - Peers []p2p.PeerI - } - // GetPeers holds details about calls to the GetPeers method. - GetPeers []struct { - } - // RequestBlock holds details about calls to the RequestBlock method. - RequestBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // RequestTransaction holds details about calls to the RequestTransaction method. - RequestTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct { - } - } - lockAddPeer sync.RWMutex - lockAnnounceBlock sync.RWMutex - lockAnnounceTransaction sync.RWMutex - lockGetPeers sync.RWMutex - lockRequestBlock sync.RWMutex - lockRequestTransaction sync.RWMutex - lockShutdown sync.RWMutex -} - -// AddPeer calls AddPeerFunc. -func (mock *PeerManagerMock) AddPeer(peer p2p.PeerI) error { - if mock.AddPeerFunc == nil { - panic("PeerManagerMock.AddPeerFunc: method is nil but PeerManager.AddPeer was just called") - } - callInfo := struct { - Peer p2p.PeerI - }{ - Peer: peer, - } - mock.lockAddPeer.Lock() - mock.calls.AddPeer = append(mock.calls.AddPeer, callInfo) - mock.lockAddPeer.Unlock() - return mock.AddPeerFunc(peer) -} - -// AddPeerCalls gets all the calls that were made to AddPeer. -// Check the length with: -// -// len(mockedPeerManager.AddPeerCalls()) -func (mock *PeerManagerMock) AddPeerCalls() []struct { - Peer p2p.PeerI -} { - var calls []struct { - Peer p2p.PeerI - } - mock.lockAddPeer.RLock() - calls = mock.calls.AddPeer - mock.lockAddPeer.RUnlock() - return calls -} - -// AnnounceBlock calls AnnounceBlockFunc. -func (mock *PeerManagerMock) AnnounceBlock(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { - if mock.AnnounceBlockFunc == nil { - panic("PeerManagerMock.AnnounceBlockFunc: method is nil but PeerManager.AnnounceBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI - }{ - BlockHash: blockHash, - Peers: peers, - } - mock.lockAnnounceBlock.Lock() - mock.calls.AnnounceBlock = append(mock.calls.AnnounceBlock, callInfo) - mock.lockAnnounceBlock.Unlock() - return mock.AnnounceBlockFunc(blockHash, peers) -} - -// AnnounceBlockCalls gets all the calls that were made to AnnounceBlock. -// Check the length with: -// -// len(mockedPeerManager.AnnounceBlockCalls()) -func (mock *PeerManagerMock) AnnounceBlockCalls() []struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI -} { - var calls []struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI - } - mock.lockAnnounceBlock.RLock() - calls = mock.calls.AnnounceBlock - mock.lockAnnounceBlock.RUnlock() - return calls -} - -// AnnounceTransaction calls AnnounceTransactionFunc. -func (mock *PeerManagerMock) AnnounceTransaction(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { - if mock.AnnounceTransactionFunc == nil { - panic("PeerManagerMock.AnnounceTransactionFunc: method is nil but PeerManager.AnnounceTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI - }{ - TxHash: txHash, - Peers: peers, - } - mock.lockAnnounceTransaction.Lock() - mock.calls.AnnounceTransaction = append(mock.calls.AnnounceTransaction, callInfo) - mock.lockAnnounceTransaction.Unlock() - return mock.AnnounceTransactionFunc(txHash, peers) -} - -// AnnounceTransactionCalls gets all the calls that were made to AnnounceTransaction. -// Check the length with: -// -// len(mockedPeerManager.AnnounceTransactionCalls()) -func (mock *PeerManagerMock) AnnounceTransactionCalls() []struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI -} { - var calls []struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI - } - mock.lockAnnounceTransaction.RLock() - calls = mock.calls.AnnounceTransaction - mock.lockAnnounceTransaction.RUnlock() - return calls -} - -// GetPeers calls GetPeersFunc. -func (mock *PeerManagerMock) GetPeers() []p2p.PeerI { - if mock.GetPeersFunc == nil { - panic("PeerManagerMock.GetPeersFunc: method is nil but PeerManager.GetPeers was just called") - } - callInfo := struct { - }{} - mock.lockGetPeers.Lock() - mock.calls.GetPeers = append(mock.calls.GetPeers, callInfo) - mock.lockGetPeers.Unlock() - return mock.GetPeersFunc() -} - -// GetPeersCalls gets all the calls that were made to GetPeers. -// Check the length with: -// -// len(mockedPeerManager.GetPeersCalls()) -func (mock *PeerManagerMock) GetPeersCalls() []struct { -} { - var calls []struct { - } - mock.lockGetPeers.RLock() - calls = mock.calls.GetPeers - mock.lockGetPeers.RUnlock() - return calls -} - -// RequestBlock calls RequestBlockFunc. -func (mock *PeerManagerMock) RequestBlock(blockHash *chainhash.Hash) p2p.PeerI { - if mock.RequestBlockFunc == nil { - panic("PeerManagerMock.RequestBlockFunc: method is nil but PeerManager.RequestBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockRequestBlock.Lock() - mock.calls.RequestBlock = append(mock.calls.RequestBlock, callInfo) - mock.lockRequestBlock.Unlock() - return mock.RequestBlockFunc(blockHash) -} - -// RequestBlockCalls gets all the calls that were made to RequestBlock. -// Check the length with: -// -// len(mockedPeerManager.RequestBlockCalls()) -func (mock *PeerManagerMock) RequestBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockRequestBlock.RLock() - calls = mock.calls.RequestBlock - mock.lockRequestBlock.RUnlock() - return calls -} - -// RequestTransaction calls RequestTransactionFunc. -func (mock *PeerManagerMock) RequestTransaction(txHash *chainhash.Hash) p2p.PeerI { - if mock.RequestTransactionFunc == nil { - panic("PeerManagerMock.RequestTransactionFunc: method is nil but PeerManager.RequestTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockRequestTransaction.Lock() - mock.calls.RequestTransaction = append(mock.calls.RequestTransaction, callInfo) - mock.lockRequestTransaction.Unlock() - return mock.RequestTransactionFunc(txHash) -} - -// RequestTransactionCalls gets all the calls that were made to RequestTransaction. -// Check the length with: -// -// len(mockedPeerManager.RequestTransactionCalls()) -func (mock *PeerManagerMock) RequestTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockRequestTransaction.RLock() - calls = mock.calls.RequestTransaction - mock.lockRequestTransaction.RUnlock() - return calls -} - -// Shutdown calls ShutdownFunc. -func (mock *PeerManagerMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("PeerManagerMock.ShutdownFunc: method is nil but PeerManager.Shutdown was just called") - } - callInfo := struct { - }{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// -// len(mockedPeerManager.ShutdownCalls()) -func (mock *PeerManagerMock) ShutdownCalls() []struct { -} { - var calls []struct { - } - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 2dfe49578..36c3613cb 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "time" - "github.com/cenkalti/backoff/v4" "github.com/libsv/go-bc" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" @@ -48,6 +47,7 @@ const ( registerTxsBatchSizeDefault = 100 registerRequestTxBatchSizeDefault = 100 waitForBlockProcessing = 5 * time.Minute + lockTime = 5 * time.Minute parallellism = 5 ) @@ -68,7 +68,6 @@ type Processor struct { registerRequestTxsBatchSize int tracingEnabled bool tracingAttributes []attribute.KeyValue - processGuardsMap sync.Map stats *processorStats statCollectionInterval time.Duration @@ -155,29 +154,6 @@ func (p *Processor) Start(statsEnabled bool) error { func (p *Processor) StartBlockRequesting() { p.waitGroup.Add(1) - waitUntilFree := func(ctx context.Context) bool { - t := time.NewTicker(time.Second) - defer t.Stop() - - for { - bhs, err := p.store.GetBlockHashesProcessingInProgress(p.ctx, p.hostname) - if err != nil { - p.logger.Error("failed to get block hashes where processing in progress", slog.String("err", err.Error())) - } - - if len(bhs) < maxBlocksInProgress && err == nil { - return true - } - - select { - case <-ctx.Done(): - return false - - case <-t.C: - } - } - } - go func() { defer p.waitGroup.Done() for { @@ -188,15 +164,13 @@ func (p *Processor) StartBlockRequesting() { hash := req.Hash peer := req.Peer - if ok := waitUntilFree(p.ctx); !ok { - continue - } - // lock block for the current instance to process - processedBy, err := p.store.SetBlockProcessing(p.ctx, hash, p.hostname) + processedBy, err := p.store.SetBlockProcessing(p.ctx, hash, p.hostname, lockTime, maxBlocksInProgress) if err != nil { - // block is already being processed by another blocktx instance - if errors.Is(err, store.ErrBlockProcessingDuplicateKey) { + if errors.Is(err, store.ErrBlockProcessingMaximumReached) { + p.logger.Debug("block processing maximum reached", slog.String("hash", hash.String()), slog.String("processed_by", processedBy)) + continue + } else if errors.Is(err, store.ErrBlockProcessingInProgress) { p.logger.Debug("block processing already in progress", slog.String("hash", hash.String()), slog.String("processed_by", processedBy)) continue } @@ -210,7 +184,6 @@ func (p *Processor) StartBlockRequesting() { _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, hash)) // ignore error at this point _ = peer.WriteMsg(msg) - p.startBlockProcessGuard(p.ctx, hash) p.logger.Info("Block request message sent to peer", slog.String("hash", hash.String()), slog.String("peer", peer.String())) } } @@ -238,14 +211,12 @@ func (p *Processor) StartBlockProcessing() { err = p.processBlock(blockMsg) if err != nil { p.logger.Error("block processing failed", slog.String("hash", hash.String()), slog.String("err", err.Error())) - p.unlockBlock(p.ctx, &hash) continue } storeErr := p.store.MarkBlockAsDone(p.ctx, &hash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) if storeErr != nil { p.logger.Error("unable to mark block as processed", slog.String("hash", hash.String()), slog.String("err", storeErr.Error())) - p.unlockBlock(p.ctx, &hash) continue } @@ -264,66 +235,6 @@ func (p *Processor) StartBlockProcessing() { }() } -func (p *Processor) startBlockProcessGuard(ctx context.Context, hash *chainhash.Hash) { - p.waitGroup.Add(1) - - execCtx, stopFn := context.WithCancel(ctx) - p.processGuardsMap.Store(*hash, stopFn) - - go func() { - defer p.waitGroup.Done() - defer p.processGuardsMap.Delete(*hash) - - select { - case <-execCtx.Done(): - // we may do nothing here: - // 1. block processing is completed, or - // 2. processor is shutting down – all unprocessed blocks are released in the Shutdown func - return - - case <-time.After(p.maxBlockProcessingDuration): - // check if block was processed successfully - block, _ := p.store.GetBlock(execCtx, hash) - - if block != nil { - return // success - } - - p.logger.Warn(fmt.Sprintf("block was not processed after %v. Unlock the block to be processed later", waitForBlockProcessing), slog.String("hash", hash.String())) - p.unlockBlock(execCtx, hash) - } - }() -} - -func (p *Processor) stopBlockProcessGuard(hash *chainhash.Hash) { - stopFn, found := p.processGuardsMap.Load(*hash) - if found { - stopFn.(context.CancelFunc)() - } -} - -// unlock block for future processing -func (p *Processor) unlockBlock(ctx context.Context, hash *chainhash.Hash) { - // use closures for retries - unlockFn := func() error { - _, err := p.store.DelBlockProcessing(ctx, hash, p.hostname) - if errors.Is(err, store.ErrBlockNotFound) { - return nil // block is already unlocked - } - - return err - } - - var bo backoff.BackOff - bo = backoff.NewConstantBackOff(100 * time.Millisecond) - bo = backoff.WithContext(bo, ctx) - bo = backoff.WithMaxRetries(bo, 5) - - if unlockErr := backoff.Retry(unlockFn, bo); unlockErr != nil { - p.logger.ErrorContext(ctx, "failed to delete block processing", slog.String("hash", hash.String()), slog.String("err", unlockErr.Error())) - } -} - func (p *Processor) StartProcessRegisterTxs() { p.waitGroup.Add(1) txHashes := make([][]byte, 0, p.registerTxsBatchSize) @@ -469,9 +380,6 @@ func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { var block *blocktx_api.Block blockHash := blockMsg.Header.BlockHash() - // release guardian - defer p.stopBlockProcessGuard(&blockHash) - ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...) defer func() { if span != nil { @@ -1059,18 +967,4 @@ func (p *Processor) calculateMerklePaths(ctx context.Context, txs []store.BlockT func (p *Processor) Shutdown() { p.cancelAll() p.waitGroup.Wait() - - // unlock unprocessed blocks - bhs, err := p.store.GetBlockHashesProcessingInProgress(context.Background(), p.hostname) - if err != nil { - p.logger.Error("reading unprocessing blocks on shutdown failed", slog.Any("err", err)) - return - } - - for _, bh := range bhs { - _, err := p.store.DelBlockProcessing(context.Background(), bh, p.hostname) - if err != nil { - p.logger.Error("unlocking unprocessed block on shutdown failed", slog.String("hash", bh.String()), slog.Any("err", err)) - } - } } diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index b293992db..d226fc794 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -177,8 +177,7 @@ func TestHandleBlock(t *testing.T) { GetBlockTransactionsHashesFunc: func(_ context.Context, _ []byte) ([]*chainhash.Hash, error) { return nil, nil }, - MarkBlockAsDoneFunc: func(_ context.Context, _ *chainhash.Hash, _ uint64, _ uint64) error { return nil }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { return nil, nil }, + MarkBlockAsDoneFunc: func(_ context.Context, _ *chainhash.Hash, _ uint64, _ uint64) error { return nil }, } storeMock.InsertBlockTransactionsFunc = func(_ context.Context, _ uint64, txsWithMerklePaths []store.TxHashWithMerkleTreeIndex) error { @@ -437,9 +436,6 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { MarkBlockAsDoneFunc: func(_ context.Context, _ *chainhash.Hash, _, _ uint64) error { return nil }, - DelBlockProcessingFunc: func(_ context.Context, _ *chainhash.Hash, _ string) (int64, error) { - return 0, nil - }, } // build peer manager and processor @@ -511,9 +507,6 @@ func TestStartProcessRegisterTxs(t *testing.T) { RegisterTransactionsFunc: func(_ context.Context, _ [][]byte) error { return registerErrTest }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return nil, nil - }, GetBlockTransactionsHashesFunc: func(_ context.Context, _ []byte) ([]*chainhash.Hash, error) { return nil, nil }, @@ -563,48 +556,35 @@ func TestStartBlockRequesting(t *testing.T) { setBlockProcessingErr error bhsProcInProg []*chainhash.Hash - expectedSetBlockProcessingCalls int - expectedDelBlockProcessingCalls int - expectedDelBlockProcessingErrors int - expectedGetBlockHashesProcessingInProgressCalls int - expectedPeerWriteMessageCalls int + expectedSetBlockProcessingCalls int + expectedPeerWriteMessageCalls int }{ { name: "process block", - expectedSetBlockProcessingCalls: 1, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 1, + expectedSetBlockProcessingCalls: 1, + expectedPeerWriteMessageCalls: 1, }, { - name: "block already processed", - setBlockProcessingErr: store.ErrBlockProcessingDuplicateKey, + name: "block processing maximum reached", + setBlockProcessingErr: store.ErrBlockProcessingMaximumReached, - expectedSetBlockProcessingCalls: 1, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 0, + expectedSetBlockProcessingCalls: 1, + expectedPeerWriteMessageCalls: 0, }, { - name: "failed to set block processing", - setBlockProcessingErr: errors.New("failed to set block processing"), + name: "block processing already in progress", + setBlockProcessingErr: store.ErrBlockProcessingInProgress, - expectedSetBlockProcessingCalls: 1, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 0, + expectedSetBlockProcessingCalls: 1, + expectedPeerWriteMessageCalls: 0, }, { - name: "max blocks being processed reached", - bhsProcInProg: []*chainhash.Hash{ - testdata.Block1Hash, testdata.Block2Hash, - testdata.Block1Hash, testdata.Block2Hash, - testdata.Block1Hash, testdata.Block2Hash, - testdata.Block1Hash, testdata.Block2Hash, - testdata.Block1Hash, testdata.Block2Hash, - }, + name: "failed to set block processing", + setBlockProcessingErr: errors.New("failed to set block processing"), - expectedSetBlockProcessingCalls: 0, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 0, + expectedSetBlockProcessingCalls: 1, + expectedPeerWriteMessageCalls: 0, }, } @@ -612,21 +592,10 @@ func TestStartBlockRequesting(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given setBlockProcessingErrTest := tc.setBlockProcessingErr - bhsProcInProgErr := tc.bhsProcInProg storeMock := &storeMocks.BlocktxStoreMock{ - SetBlockProcessingFunc: func(_ context.Context, _ *chainhash.Hash, _ string) (string, error) { + SetBlockProcessingFunc: func(_ context.Context, _ *chainhash.Hash, _ string, _ time.Duration, _ int) (string, error) { return "abc", setBlockProcessingErrTest }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return bhsProcInProgErr, nil - }, - } - storeMock.DelBlockProcessingFunc = func(_ context.Context, _ *chainhash.Hash, _ string) (int64, error) { - j := len(storeMock.DelBlockProcessingCalls()) - if j <= tc.expectedDelBlockProcessingErrors { - return 0, errors.New("DelBlockProcessing failed") - } - return 1, nil } peerMock := &mocks.PeerMock{ @@ -660,8 +629,6 @@ func TestStartBlockRequesting(t *testing.T) { // then defer sut.Shutdown() - require.Equal(t, tc.expectedGetBlockHashesProcessingInProgressCalls, len(storeMock.GetBlockHashesProcessingInProgressCalls())) - require.Equal(t, tc.expectedDelBlockProcessingCalls, len(storeMock.DelBlockProcessingCalls())) require.Equal(t, tc.expectedSetBlockProcessingCalls, len(storeMock.SetBlockProcessingCalls())) require.Equal(t, tc.expectedPeerWriteMessageCalls, len(peerMock.WriteMsgCalls())) }) @@ -746,9 +713,6 @@ func TestStartProcessRequestTxs(t *testing.T) { BlockHeight: 1, }}, tc.getMinedErr }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return nil, nil - }, GetBlockTransactionsHashesFunc: func(_ context.Context, _ []byte) ([]*chainhash.Hash, error) { return []*chainhash.Hash{testdata.TX1Hash}, nil }, @@ -816,12 +780,6 @@ func TestStart(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // given - storeMock := &storeMocks.BlocktxStoreMock{ - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return nil, nil - }, - } - mqClient := &mocks.MessageQueueClientMock{ SubscribeFunc: func(topic string, _ func([]byte) error) error { err, ok := tc.topicErr[topic] @@ -832,7 +790,7 @@ func TestStart(t *testing.T) { }, } logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - sut, err := blocktx.NewProcessor(logger, storeMock, nil, nil, blocktx.WithMessageQueueClient(mqClient)) + sut, err := blocktx.NewProcessor(logger, nil, nil, nil, blocktx.WithMessageQueueClient(mqClient)) require.NoError(t, err) // when diff --git a/internal/blocktx/server.go b/internal/blocktx/server.go index e1ae3b3d9..6a5b99193 100644 --- a/internal/blocktx/server.go +++ b/internal/blocktx/server.go @@ -60,6 +60,11 @@ func (s *Server) Health(_ context.Context, _ *emptypb.Empty) (*blocktx_api.Healt } func (s *Server) ClearBlocks(ctx context.Context, clearData *blocktx_api.ClearData) (*blocktx_api.RowsAffectedResponse, error) { + _, err := s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "block_processing") + if err != nil { + return nil, err + } + return s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "blocks") } @@ -67,25 +72,6 @@ func (s *Server) ClearRegisteredTransactions(ctx context.Context, clearData *blo return s.store.ClearBlocktxTable(ctx, clearData.GetRetentionDays(), "registered_transactions") } -func (s *Server) DelUnfinishedBlockProcessing(ctx context.Context, req *blocktx_api.DelUnfinishedBlockProcessingRequest) (*blocktx_api.RowsAffectedResponse, error) { - bhs, err := s.store.GetBlockHashesProcessingInProgress(ctx, req.GetProcessedBy()) - if err != nil { - return &blocktx_api.RowsAffectedResponse{}, err - } - - var rowsTotal int64 - for _, bh := range bhs { - rows, err := s.store.DelBlockProcessing(ctx, bh, req.GetProcessedBy()) - if err != nil { - return &blocktx_api.RowsAffectedResponse{}, err - } - - rowsTotal += rows - } - - return &blocktx_api.RowsAffectedResponse{Rows: rowsTotal}, nil -} - func (s *Server) VerifyMerkleRoots(ctx context.Context, req *blocktx_api.MerkleRootsVerificationRequest) (*blocktx_api.MerkleRootVerificationResponse, error) { return s.store.VerifyMerkleRoots(ctx, req.GetMerkleRoots(), s.maxAllowedBlockHeightMismatch) } diff --git a/internal/blocktx/server_test.go b/internal/blocktx/server_test.go index 18a7090bf..9c479f20e 100644 --- a/internal/blocktx/server_test.go +++ b/internal/blocktx/server_test.go @@ -1,21 +1,16 @@ package blocktx_test import ( - "context" - "errors" "log/slog" "os" "testing" "time" "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" - "github.com/bitcoin-sv/arc/internal/testdata" ) func TestListenAndServe(t *testing.T) { @@ -47,68 +42,3 @@ func TestListenAndServe(t *testing.T) { }) } } - -func TestDelUnfinishedBlock(t *testing.T) { - tt := []struct { - name string - getBlockHashesProcessingInProgressErr error - delBlockProcessingErr error - - expectedRows int64 - expectedErrorStr string - }{ - { - name: "success", - - expectedRows: 6, - }, - { - name: "error - getBlockHashesProcessingInProgress", - getBlockHashesProcessingInProgressErr: errors.New("failed to get block hashes processing in progress"), - - expectedErrorStr: "failed to get block hashes processing in progress", - expectedRows: 0, - }, - { - name: "error - delBlockProcessingErr", - delBlockProcessingErr: errors.New("failed to delete block processing error"), - - expectedErrorStr: "failed to delete block processing error", - expectedRows: 0, - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - // given - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - storeMock := &storeMocks.BlocktxStoreMock{ - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return []*chainhash.Hash{testdata.TX1Hash, testdata.TX2Hash}, tc.getBlockHashesProcessingInProgressErr - }, - - DelBlockProcessingFunc: func(_ context.Context, _ *chainhash.Hash, _ string) (int64, error) { - return 3, tc.delBlockProcessingErr - }, - } - - sut, err := blocktx.NewServer("", 0, logger, storeMock, nil, 0, nil) - require.NoError(t, err) - defer sut.GracefulStop() - - // when - resp, err := sut.DelUnfinishedBlockProcessing(context.Background(), &blocktx_api.DelUnfinishedBlockProcessingRequest{ - ProcessedBy: "host", - }) - - // then - if tc.expectedErrorStr != "" { - require.ErrorContains(t, err, tc.expectedErrorStr) - return - } - - require.NoError(t, err) - require.Equal(t, tc.expectedRows, resp.Rows) - }) - } -} diff --git a/internal/blocktx/store/mocks/blocktx_store_mock.go b/internal/blocktx/store/mocks/blocktx_store_mock.go index 121e6b991..b218fe7da 100644 --- a/internal/blocktx/store/mocks/blocktx_store_mock.go +++ b/internal/blocktx/store/mocks/blocktx_store_mock.go @@ -9,6 +9,7 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/libsv/go-p2p/chaincfg/chainhash" "sync" + "time" ) // Ensure, that BlocktxStoreMock does implement store.BlocktxStore. @@ -27,18 +28,12 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // CloseFunc: func() error { // panic("mock out the Close method") // }, -// DelBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) { -// panic("mock out the DelBlockProcessing method") -// }, // GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { // panic("mock out the GetBlock method") // }, // GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { // panic("mock out the GetBlockGaps method") // }, -// GetBlockHashesProcessingInProgressFunc: func(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) { -// panic("mock out the GetBlockHashesProcessingInProgress method") -// }, // GetBlockTransactionsHashesFunc: func(ctx context.Context, blockHash []byte) ([]*chainhash.Hash, error) { // panic("mock out the GetBlockTransactionsHashes method") // }, @@ -78,7 +73,7 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // RegisterTransactionsFunc: func(ctx context.Context, txHashes [][]byte) error { // panic("mock out the RegisterTransactions method") // }, -// SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { +// SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, setProcessedBy string, lockTime time.Duration, maxParallelProcessing int) (string, error) { // panic("mock out the SetBlockProcessing method") // }, // UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { @@ -103,18 +98,12 @@ type BlocktxStoreMock struct { // CloseFunc mocks the Close method. CloseFunc func() error - // DelBlockProcessingFunc mocks the DelBlockProcessing method. - DelBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) - // GetBlockFunc mocks the GetBlock method. GetBlockFunc func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) // GetBlockGapsFunc mocks the GetBlockGaps method. GetBlockGapsFunc func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) - // GetBlockHashesProcessingInProgressFunc mocks the GetBlockHashesProcessingInProgress method. - GetBlockHashesProcessingInProgressFunc func(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) - // GetBlockTransactionsHashesFunc mocks the GetBlockTransactionsHashes method. GetBlockTransactionsHashesFunc func(ctx context.Context, blockHash []byte) ([]*chainhash.Hash, error) @@ -155,7 +144,7 @@ type BlocktxStoreMock struct { RegisterTransactionsFunc func(ctx context.Context, txHashes [][]byte) error // SetBlockProcessingFunc mocks the SetBlockProcessing method. - SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) + SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, setProcessedBy string, lockTime time.Duration, maxParallelProcessing int) (string, error) // UpdateBlocksStatusesFunc mocks the UpdateBlocksStatuses method. UpdateBlocksStatusesFunc func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error @@ -180,15 +169,6 @@ type BlocktxStoreMock struct { // Close holds details about calls to the Close method. Close []struct { } - // DelBlockProcessing holds details about calls to the DelBlockProcessing method. - DelBlockProcessing []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } // GetBlock holds details about calls to the GetBlock method. GetBlock []struct { // Ctx is the ctx argument value. @@ -203,13 +183,6 @@ type BlocktxStoreMock struct { // HeightRange is the heightRange argument value. HeightRange int } - // GetBlockHashesProcessingInProgress holds details about calls to the GetBlockHashesProcessingInProgress method. - GetBlockHashesProcessingInProgress []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } // GetBlockTransactionsHashes holds details about calls to the GetBlockTransactionsHashes method. GetBlockTransactionsHashes []struct { // Ctx is the ctx argument value. @@ -309,8 +282,12 @@ type BlocktxStoreMock struct { Ctx context.Context // Hash is the hash argument value. Hash *chainhash.Hash - // ProcessedBy is the processedBy argument value. - ProcessedBy string + // SetProcessedBy is the setProcessedBy argument value. + SetProcessedBy string + // LockTime is the lockTime argument value. + LockTime time.Duration + // MaxParallelProcessing is the maxParallelProcessing argument value. + MaxParallelProcessing int } // UpdateBlocksStatuses holds details about calls to the UpdateBlocksStatuses method. UpdateBlocksStatuses []struct { @@ -336,29 +313,27 @@ type BlocktxStoreMock struct { MaxAllowedBlockHeightMismatch int } } - lockClearBlocktxTable sync.RWMutex - lockClose sync.RWMutex - lockDelBlockProcessing sync.RWMutex - lockGetBlock sync.RWMutex - lockGetBlockGaps sync.RWMutex - lockGetBlockHashesProcessingInProgress sync.RWMutex - lockGetBlockTransactionsHashes sync.RWMutex - lockGetChainTip sync.RWMutex - lockGetLongestBlockByHeight sync.RWMutex - lockGetLongestChainFromHeight sync.RWMutex - lockGetMinedTransactions sync.RWMutex - lockGetOrphansBackToNonOrphanAncestor sync.RWMutex - lockGetRegisteredTxsByBlockHashes sync.RWMutex - lockGetStaleChainBackFromHash sync.RWMutex - lockGetStats sync.RWMutex - lockInsertBlockTransactions sync.RWMutex - lockMarkBlockAsDone sync.RWMutex - lockPing sync.RWMutex - lockRegisterTransactions sync.RWMutex - lockSetBlockProcessing sync.RWMutex - lockUpdateBlocksStatuses sync.RWMutex - lockUpsertBlock sync.RWMutex - lockVerifyMerkleRoots sync.RWMutex + lockClearBlocktxTable sync.RWMutex + lockClose sync.RWMutex + lockGetBlock sync.RWMutex + lockGetBlockGaps sync.RWMutex + lockGetBlockTransactionsHashes sync.RWMutex + lockGetChainTip sync.RWMutex + lockGetLongestBlockByHeight sync.RWMutex + lockGetLongestChainFromHeight sync.RWMutex + lockGetMinedTransactions sync.RWMutex + lockGetOrphansBackToNonOrphanAncestor sync.RWMutex + lockGetRegisteredTxsByBlockHashes sync.RWMutex + lockGetStaleChainBackFromHash sync.RWMutex + lockGetStats sync.RWMutex + lockInsertBlockTransactions sync.RWMutex + lockMarkBlockAsDone sync.RWMutex + lockPing sync.RWMutex + lockRegisterTransactions sync.RWMutex + lockSetBlockProcessing sync.RWMutex + lockUpdateBlocksStatuses sync.RWMutex + lockUpsertBlock sync.RWMutex + lockVerifyMerkleRoots sync.RWMutex } // ClearBlocktxTable calls ClearBlocktxTableFunc. @@ -428,46 +403,6 @@ func (mock *BlocktxStoreMock) CloseCalls() []struct { return calls } -// DelBlockProcessing calls DelBlockProcessingFunc. -func (mock *BlocktxStoreMock) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) { - if mock.DelBlockProcessingFunc == nil { - panic("BlocktxStoreMock.DelBlockProcessingFunc: method is nil but BlocktxStore.DelBlockProcessing was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - }{ - Ctx: ctx, - Hash: hash, - ProcessedBy: processedBy, - } - mock.lockDelBlockProcessing.Lock() - mock.calls.DelBlockProcessing = append(mock.calls.DelBlockProcessing, callInfo) - mock.lockDelBlockProcessing.Unlock() - return mock.DelBlockProcessingFunc(ctx, hash, processedBy) -} - -// DelBlockProcessingCalls gets all the calls that were made to DelBlockProcessing. -// Check the length with: -// -// len(mockedBlocktxStore.DelBlockProcessingCalls()) -func (mock *BlocktxStoreMock) DelBlockProcessingCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - } - mock.lockDelBlockProcessing.RLock() - calls = mock.calls.DelBlockProcessing - mock.lockDelBlockProcessing.RUnlock() - return calls -} - // GetBlock calls GetBlockFunc. func (mock *BlocktxStoreMock) GetBlock(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { if mock.GetBlockFunc == nil { @@ -540,42 +475,6 @@ func (mock *BlocktxStoreMock) GetBlockGapsCalls() []struct { return calls } -// GetBlockHashesProcessingInProgress calls GetBlockHashesProcessingInProgressFunc. -func (mock *BlocktxStoreMock) GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) { - if mock.GetBlockHashesProcessingInProgressFunc == nil { - panic("BlocktxStoreMock.GetBlockHashesProcessingInProgressFunc: method is nil but BlocktxStore.GetBlockHashesProcessingInProgress was just called") - } - callInfo := struct { - Ctx context.Context - ProcessedBy string - }{ - Ctx: ctx, - ProcessedBy: processedBy, - } - mock.lockGetBlockHashesProcessingInProgress.Lock() - mock.calls.GetBlockHashesProcessingInProgress = append(mock.calls.GetBlockHashesProcessingInProgress, callInfo) - mock.lockGetBlockHashesProcessingInProgress.Unlock() - return mock.GetBlockHashesProcessingInProgressFunc(ctx, processedBy) -} - -// GetBlockHashesProcessingInProgressCalls gets all the calls that were made to GetBlockHashesProcessingInProgress. -// Check the length with: -// -// len(mockedBlocktxStore.GetBlockHashesProcessingInProgressCalls()) -func (mock *BlocktxStoreMock) GetBlockHashesProcessingInProgressCalls() []struct { - Ctx context.Context - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - ProcessedBy string - } - mock.lockGetBlockHashesProcessingInProgress.RLock() - calls = mock.calls.GetBlockHashesProcessingInProgress - mock.lockGetBlockHashesProcessingInProgress.RUnlock() - return calls -} - // GetBlockTransactionsHashes calls GetBlockTransactionsHashesFunc. func (mock *BlocktxStoreMock) GetBlockTransactionsHashes(ctx context.Context, blockHash []byte) ([]*chainhash.Hash, error) { if mock.GetBlockTransactionsHashesFunc == nil { @@ -1049,23 +948,27 @@ func (mock *BlocktxStoreMock) RegisterTransactionsCalls() []struct { } // SetBlockProcessing calls SetBlockProcessingFunc. -func (mock *BlocktxStoreMock) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { +func (mock *BlocktxStoreMock) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, setProcessedBy string, lockTime time.Duration, maxParallelProcessing int) (string, error) { if mock.SetBlockProcessingFunc == nil { panic("BlocktxStoreMock.SetBlockProcessingFunc: method is nil but BlocktxStore.SetBlockProcessing was just called") } callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string + Ctx context.Context + Hash *chainhash.Hash + SetProcessedBy string + LockTime time.Duration + MaxParallelProcessing int }{ - Ctx: ctx, - Hash: hash, - ProcessedBy: processedBy, + Ctx: ctx, + Hash: hash, + SetProcessedBy: setProcessedBy, + LockTime: lockTime, + MaxParallelProcessing: maxParallelProcessing, } mock.lockSetBlockProcessing.Lock() mock.calls.SetBlockProcessing = append(mock.calls.SetBlockProcessing, callInfo) mock.lockSetBlockProcessing.Unlock() - return mock.SetBlockProcessingFunc(ctx, hash, processedBy) + return mock.SetBlockProcessingFunc(ctx, hash, setProcessedBy, lockTime, maxParallelProcessing) } // SetBlockProcessingCalls gets all the calls that were made to SetBlockProcessing. @@ -1073,14 +976,18 @@ func (mock *BlocktxStoreMock) SetBlockProcessing(ctx context.Context, hash *chai // // len(mockedBlocktxStore.SetBlockProcessingCalls()) func (mock *BlocktxStoreMock) SetBlockProcessingCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string + Ctx context.Context + Hash *chainhash.Hash + SetProcessedBy string + LockTime time.Duration + MaxParallelProcessing int } { var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string + Ctx context.Context + Hash *chainhash.Hash + SetProcessedBy string + LockTime time.Duration + MaxParallelProcessing int } mock.lockSetBlockProcessing.RLock() calls = mock.calls.SetBlockProcessing diff --git a/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.block_processing.yaml b/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.block_processing.yaml index 68229a06a..132b33608 100644 --- a/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.block_processing.yaml +++ b/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.block_processing.yaml @@ -1,5 +1,14 @@ +# completed +- block_hash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + processed_by: pod-2 + inserted_at: 2023-12-22 11:40:00 +# in progress - block_hash: 0xf97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000 processed_by: pod-2 -- block_hash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + inserted_at: 2023-12-22 11:50:00 + +# in progress +- block_hash: 0xc20b4d510e1a7a4ab3da30e55676de0884b4cb79139ccc0a0000000000000000 processed_by: pod-2 + inserted_at: 2023-12-22 12:10:00 diff --git a/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.blocks.yaml index 4e04686ca..7f08d4586 100644 --- a/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.blocks.yaml +++ b/internal/blocktx/store/postgresql/fixtures/block_processing/blocktx.blocks.yaml @@ -1,17 +1,33 @@ -- inserted_at: 2023-12-10 14:00:00 +- inserted_at: 2023-12-22 11:40:00 id: 1 - hash: 0xf97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000 - prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 - merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 - height: 822013 - size: 86840000 - tx_count: 23477 -- inserted_at: 2023-12-15 14:00:00 - id: 2 hash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 prevhash: 0x3a03313b727fa08c170fab2660c225d52b4d85516c92a0020000000000000000 merkleroot: 0x3eeee879a8a08fc537a04682178687bb0e58a5103938eafc349705a2acb06410 height: 822012 - processed_at: 2023-12-15 14:10:00 + processed_at: 2023-12-22 11:45:00 size: 3030000 tx_count: 856 +- inserted_at: 2023-12-22 11:50:00 + id: 2 + hash: 0xf97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000 + prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822013 + size: 86840000 + tx_count: 23477 +- inserted_at: 2023-12-10 12:00:00 + id: 3 + hash: 0xc20b4d510e1a7a4ab3da30e55676de0884b4cb79139ccc0a0000000000000000 + prevhash: 0xf97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822014 + size: 86840000 + tx_count: 23477 +- inserted_at: 2023-12-10 12:10:00 + id: 4 + hash: 0x51e9e0bacaf8ff4e993ca083aabbbd9bf56e724508d159fe2d43360500000000 + prevhash: 0xc20b4d510e1a7a4ab3da30e55676de0884b4cb79139ccc0a0000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822015 + size: 86840000 + tx_count: 23477 diff --git a/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.block_processing.yaml b/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.block_processing.yaml deleted file mode 100644 index f8ba9c24a..000000000 --- a/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.block_processing.yaml +++ /dev/null @@ -1,3 +0,0 @@ - -- block_hash: 0x3a03313b727fa08c170fab2660c225d52b4d85516c92a0020000000000000000 - processed_by: pod-2 diff --git a/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.blocks.yaml index d3641f5a3..4b26b6f3f 100644 --- a/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.blocks.yaml +++ b/internal/blocktx/store/postgresql/fixtures/get_block_gaps/blocktx.blocks.yaml @@ -22,9 +22,6 @@ processed_at: 2023-12-10 14:10:00 size: 244000000 tx_count: 4437 - -# block 822011 is being processed (in table block_processing) - - inserted_at: 2023-12-10 14:00:00 id: 2 hash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 @@ -131,4 +128,3 @@ processed_at: 2023-12-15 15:00:00 size: 8630000 tx_count: 36724 - diff --git a/internal/blocktx/store/postgresql/get_block_gaps.go b/internal/blocktx/store/postgresql/get_block_gaps.go index 6c4a83a6c..7c89ccbca 100644 --- a/internal/blocktx/store/postgresql/get_block_gaps.go +++ b/internal/blocktx/store/postgresql/get_block_gaps.go @@ -3,8 +3,9 @@ package postgresql import ( "context" - "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/libsv/go-p2p/chaincfg/chainhash" + + "github.com/bitcoin-sv/arc/internal/blocktx/store" ) func (p *PostgreSQL) GetBlockGaps(ctx context.Context, blockHeightRange int) ([]*store.BlockGap, error) { @@ -15,10 +16,7 @@ func (p *PostgreSQL) GetBlockGaps(ctx context.Context, blockHeightRange int) ([] // // 2. Add to result from 1. all blocks from the blocks table that are unprocessed yet. // - // 3. Combine the result from 1. and 2. with block_processing table and remove all - // results that are being currently processed. - // - // 4. Sort by height. + // 3. Sort by height descending q := ` SELECT DISTINCT all_missing.missing_height, all_missing.missing_hash FROM ( @@ -32,9 +30,7 @@ func (p *PostgreSQL) GetBlockGaps(ctx context.Context, blockHeightRange int) ([] WHERE unprocessed.processed_at IS NULL AND unprocessed.height > (SELECT max(height) - $1 FROM blocktx.blocks) ) AS all_missing - LEFT JOIN blocktx.block_processing bp ON bp.block_hash = all_missing.missing_hash - WHERE bp.block_hash IS NULL - ORDER BY all_missing.missing_height ASC; + ORDER BY all_missing.missing_height DESC; ` rows, err := p.db.QueryContext(ctx, q, blockHeightRange) diff --git a/internal/blocktx/store/postgresql/migrations/000021_block_transactions.up.sql b/internal/blocktx/store/postgresql/migrations/000021_block_transactions.up.sql index 8ad6bd846..43869dab8 100644 --- a/internal/blocktx/store/postgresql/migrations/000021_block_transactions.up.sql +++ b/internal/blocktx/store/postgresql/migrations/000021_block_transactions.up.sql @@ -19,8 +19,8 @@ CREATE INDEX IF NOT EXISTS ix_registered_transactions_inserted_at ON blocktx.reg INSERT INTO blocktx.registered_transactions SELECT t.hash AS hash FROM blocktx.transactions t WHERE t.is_registered = TRUE; -DROP INDEX ix_block_transactions_map_inserted_at; +DROP INDEX blocktx.ix_block_transactions_map_inserted_at; DROP TABLE blocktx.block_transactions_map; -DROP INDEX ix_transactions_inserted_at; -DROP INDEX ux_transactions_hash; +DROP INDEX blocktx.ix_transactions_inserted_at; +DROP INDEX blocktx.ux_transactions_hash; DROP TABLE blocktx.transactions; diff --git a/internal/blocktx/store/postgresql/migrations/000022_block_processing.down.sql b/internal/blocktx/store/postgresql/migrations/000022_block_processing.down.sql new file mode 100644 index 000000000..9eb92ffd3 --- /dev/null +++ b/internal/blocktx/store/postgresql/migrations/000022_block_processing.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE blocktx.block_processing ADD PRIMARY KEY (block_hash); + +DROP INDEX ix_block_processing_inserted_at; diff --git a/internal/blocktx/store/postgresql/migrations/000022_block_processing.up.sql b/internal/blocktx/store/postgresql/migrations/000022_block_processing.up.sql new file mode 100644 index 000000000..bb86d65cc --- /dev/null +++ b/internal/blocktx/store/postgresql/migrations/000022_block_processing.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE blocktx.block_processing DROP CONSTRAINT block_processing_pkey; + +CREATE INDEX ix_block_processing_inserted_at ON blocktx.block_processing (inserted_at); diff --git a/internal/blocktx/store/postgresql/postgres_test.go b/internal/blocktx/store/postgresql/postgres_test.go index 3747eff6d..4ff579784 100644 --- a/internal/blocktx/store/postgresql/postgres_test.go +++ b/internal/blocktx/store/postgresql/postgres_test.go @@ -227,6 +227,7 @@ func TestPostgresDB(t *testing.T) { prepareDb(t, postgresDB, "fixtures/get_block_gaps") hash822014 := testutils.RevChainhash(t, "67708796ef57464ed9eaf2a663d3da32372e4c2fb65558020000000000000000") + hash822011 := testutils.RevChainhash(t, "3a03313b727fa08c170fab2660c225d52b4d85516c92a0020000000000000000") hash822019 := testutils.RevChainhash(t, "5696fc6e504b6aa2ae5d9c46b9418192dc61bd1b2e3364030000000000000000") hash822020 := testutils.RevChainhash(t, "76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000") hash822009 := testutils.RevChainhash(t, "4ad773b1a464129a0ed8c7a8c71bb98175f0f01da1793f0e0000000000000000") @@ -254,6 +255,10 @@ func TestPostgresDB(t *testing.T) { Height: 822020, Hash: hash822020, }, + { // processing not finished + Height: 822011, + Hash: hash822011, + }, } // when @@ -559,36 +564,36 @@ func TestPostgresDB(t *testing.T) { require.Len(t, bts, 5) }) - t.Run("set/get/del block processing", func(t *testing.T) { + t.Run("set block processing", func(t *testing.T) { prepareDb(t, postgresDB, "fixtures/block_processing") - bh1 := testutils.RevChainhash(t, "747468cf7e6639ba9aa277ade1cf27639b0f214cec5719020000000000000000") - - processedBy, err := postgresDB.SetBlockProcessing(ctx, bh1, "pod-1") + var processedBy string + var err error + const lockTime = 15 * time.Minute + + // block already processed by pod-2 + bh2 := testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000") + processedBy, err = postgresDB.SetBlockProcessing(ctx, bh2, "pod-1", lockTime, 2) + require.ErrorIs(t, err, store.ErrBlockProcessingInProgress) + require.Equal(t, "pod-2", processedBy) + + // pod-2 already processing 2 blocks => maximum reached + bh3 := testutils.RevChainhash(t, "51e9e0bacaf8ff4e993ca083aabbbd9bf56e724508d159fe2d43360500000000") + processedBy, err = postgresDB.SetBlockProcessing(ctx, bh3, "pod-2", lockTime, 2) + require.ErrorIs(t, err, store.ErrBlockProcessingMaximumReached) + require.Equal(t, "", processedBy) + + // successfully insert new block processing + bh0 := testutils.RevChainhash(t, "747468cf7e6639ba9aa277ade1cf27639b0f214cec5719020000000000000000") + processedBy, err = postgresDB.SetBlockProcessing(ctx, bh0, "pod-1", lockTime, 2) require.NoError(t, err) require.Equal(t, "pod-1", processedBy) - // set a second time, expect error - processedBy, err = postgresDB.SetBlockProcessing(ctx, bh1, "pod-1") - require.ErrorIs(t, err, store.ErrBlockProcessingDuplicateKey) - require.Equal(t, "pod-1", processedBy) - - bhInProgress := testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000") - - blockHashes, err := postgresDB.GetBlockHashesProcessingInProgress(ctx, "pod-2") - require.NoError(t, err) - require.Len(t, blockHashes, 1) - require.True(t, bhInProgress.IsEqual(blockHashes[0])) - - _, err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-1") - require.ErrorIs(t, err, store.ErrBlockNotFound) - - _, err = postgresDB.DelBlockProcessing(ctx, bhInProgress, "pod-2") - require.NoError(t, err) - - blockHashes, err = postgresDB.GetBlockHashesProcessingInProgress(ctx, "pod-2") - require.NoError(t, err) - require.Len(t, blockHashes, 0) + // pod-1 already processing 1 block => maximum reached + bh1 := testutils.RevChainhash(t, "f64b7221f28256a0f47521121d0f63b91223d7fe603993c5e676460e00000000") + processedBy, err = postgresDB.SetBlockProcessing(ctx, bh1, "pod-1", lockTime, 1) + require.ErrorIs(t, err, store.ErrBlockProcessingMaximumReached) + require.Equal(t, "", processedBy) }) t.Run("mark block as done", func(t *testing.T) { diff --git a/internal/blocktx/store/postgresql/set_block_processing.go b/internal/blocktx/store/postgresql/set_block_processing.go index 2d2a61b4e..f205c3e59 100644 --- a/internal/blocktx/store/postgresql/set_block_processing.go +++ b/internal/blocktx/store/postgresql/set_block_processing.go @@ -2,95 +2,79 @@ package postgresql import ( "context" + "database/sql" + "encoding/binary" "errors" + "fmt" + "time" - "github.com/jackc/pgx/v5/pgconn" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/bitcoin-sv/arc/internal/blocktx/store" - "github.com/bitcoin-sv/arc/internal/tracing" ) -func (p *PostgreSQL) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, setProcessedBy string) (string, error) { +// SetBlockProcessing tries to insert a record to the block processing table in order to mark a certain block as being processed by an instance. A new entry will be inserted successfully if there is no entry from any instance inserted less than `lockTime` ago and if there are less than `maxParallelProcessing` blocks currently being processed by the instance denoted by `setProcessedBy`. +func (p *PostgreSQL) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, setProcessedBy string, lockTime time.Duration, maxParallelProcessing int) (string, error) { // Try to set a block as being processed by this instance + tx, err := p.db.Begin() + if err != nil { + return "", err + } + + data := binary.BigEndian.Uint32(hash[0:5]) + + _, err = tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock($1)`, data) + if err != nil { + rollBackErr := tx.Rollback() + if rollBackErr != nil { + return "", errors.Join(err, fmt.Errorf("failed to rollback: %v", rollBackErr)) + } + return "", err + } + qInsert := ` INSERT INTO blocktx.block_processing (block_hash, processed_by) - VALUES ($1, $2) + SELECT $1, $2 + WHERE NOT EXISTS ( + (SELECT 1 FROM blocktx.block_processing bp WHERE bp.block_hash = $1 AND inserted_at > $3) + UNION + (SELECT 1 FROM blocktx.block_processing bp + LEFT JOIN blocktx.blocks b ON b.hash = bp.block_hash + WHERE b.processed_at IS NULL AND bp.processed_by = $2 + OFFSET $4) + ) RETURNING processed_by ` var processedBy string - err := p.db.QueryRowContext(ctx, qInsert, hash[:], setProcessedBy).Scan(&processedBy) + err = p.db.QueryRowContext(ctx, qInsert, hash[:], setProcessedBy, p.now().Add(-1*lockTime), maxParallelProcessing-1).Scan(&processedBy) if err != nil { - var pqErr *pgconn.PgError + rollBackErr := tx.Rollback() + if rollBackErr != nil { + err = errors.Join(err, fmt.Errorf("failed to rollback: %v", rollBackErr)) + } - // Error 23505 is: "duplicate key violates unique constraint" - if errors.As(err, &pqErr) && pqErr.Code == "23505" { - err = p.db.QueryRowContext(ctx, `SELECT processed_by FROM blocktx.block_processing WHERE block_hash = $1`, hash[:]).Scan(&processedBy) - if err != nil { - return "", errors.Join(store.ErrFailedToSetBlockProcessing, err) + if errors.Is(err, sql.ErrNoRows) { + var currentlyProcessedBy string + err = p.db.QueryRowContext(ctx, `SELECT processed_by FROM blocktx.block_processing WHERE block_hash = $1 AND inserted_at > $2 ORDER BY inserted_at DESC LIMIT 1`, hash[:], p.now().Add(-1*lockTime)).Scan(¤tlyProcessedBy) + if err == nil { + return currentlyProcessedBy, errors.Join(err, store.ErrBlockProcessingInProgress) } - return processedBy, store.ErrBlockProcessingDuplicateKey + return "", errors.Join(err, store.ErrBlockProcessingMaximumReached) } - } - - return processedBy, nil -} - -func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (rowsAffected int64, err error) { - ctx, span := tracing.StartTracing(ctx, "DelBlockProcessing", p.tracingEnabled, p.tracingAttributes...) - defer func() { - tracing.EndTracing(span, err) - }() - - q := ` - DELETE FROM blocktx.block_processing WHERE block_hash = $1 AND processed_by = $2; - ` - res, err := p.db.ExecContext(ctx, q, hash[:], processedBy) - if err != nil { - return 0, err - } - rowsAffected, _ = res.RowsAffected() - if rowsAffected != 1 { - return 0, store.ErrBlockNotFound + return "", errors.Join(store.ErrFailedToSetBlockProcessing, err) } - return rowsAffected, nil -} - -func (p *PostgreSQL) GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) { - // Check how many blocks this instance is currently processing - q := ` - SELECT bp.block_hash FROM blocktx.block_processing bp - LEFT JOIN blocktx.blocks b ON b.hash = bp.block_hash - WHERE b.processed_at IS NULL AND bp.processed_by = $1; - ` - - rows, err := p.db.QueryContext(ctx, q, processedBy) + err = tx.Commit() if err != nil { - return nil, err - } - defer rows.Close() - - hashes := make([]*chainhash.Hash, 0) - - for rows.Next() { - var hash []byte - - err = rows.Scan(&hash) - if err != nil { - return nil, err + rollBackErr := tx.Rollback() + if rollBackErr != nil { + return "", errors.Join(err, fmt.Errorf("failed to rollback: %v", rollBackErr)) } - - txHash, err := chainhash.NewHash(hash) - if err != nil { - return nil, err - } - - hashes = append(hashes, txHash) + return "", err } - return hashes, nil + return processedBy, nil } diff --git a/internal/blocktx/store/store.go b/internal/blocktx/store/store.go index c0a94e450..aa1996613 100644 --- a/internal/blocktx/store/store.go +++ b/internal/blocktx/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" "errors" + "time" "github.com/libsv/go-p2p/chaincfg/chainhash" @@ -10,22 +11,20 @@ import ( ) var ( - ErrNotFound = errors.New("not found") - ErrBlockProcessingDuplicateKey = errors.New("block hash already exists") - ErrBlockNotFound = errors.New("block not found") - ErrUnableToPrepareStatement = errors.New("unable to prepare statement") - ErrUnableToDeleteRows = errors.New("unable to delete rows") - ErrUnableToGetSQLConnection = errors.New("unable to get or create sql connection") - ErrFailedToInsertBlock = errors.New("failed to insert block") - ErrFailedToUpdateBlockStatuses = errors.New("failed to update block statuses") - ErrFailedToOpenDB = errors.New("failed to open postgres database") - ErrFailedToInsertTransactions = errors.New("failed to bulk insert transactions") - ErrFailedToGetRows = errors.New("failed to get rows") - ErrFailedToSetBlockProcessing = errors.New("failed to set block processing") - ErrFailedToUpsertTransactions = errors.New("failed to upsert transactions") - ErrFailedToUpsertBlockTransactionsMap = errors.New("failed to upsert block transactions map") - ErrFailedToParseHash = errors.New("failed to parse hash") - ErrMismatchedTxIDsAndMerklePathLength = errors.New("mismatched tx IDs and merkle path length") + ErrNotFound = errors.New("not found") + ErrBlockProcessingMaximumReached = errors.New("block processing maximum reached") + ErrBlockProcessingInProgress = errors.New("block processing already in progress") + ErrBlockNotFound = errors.New("block not found") + ErrUnableToPrepareStatement = errors.New("unable to prepare statement") + ErrUnableToDeleteRows = errors.New("unable to delete rows") + ErrUnableToGetSQLConnection = errors.New("unable to get or create sql connection") + ErrFailedToInsertBlock = errors.New("failed to insert block") + ErrFailedToUpdateBlockStatuses = errors.New("failed to update block statuses") + ErrFailedToOpenDB = errors.New("failed to open postgres database") + ErrFailedToInsertTransactions = errors.New("failed to bulk insert transactions") + ErrFailedToGetRows = errors.New("failed to get rows") + ErrFailedToSetBlockProcessing = errors.New("failed to set block processing") + ErrFailedToParseHash = errors.New("failed to parse hash") ) type Stats struct { @@ -51,9 +50,7 @@ type BlocktxStore interface { UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []BlockStatusUpdate) error GetStats(ctx context.Context) (*Stats, error) - SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) - GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) - DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) + SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, setProcessedBy string, lockTime time.Duration, maxParallelProcessing int) (string, error) VerifyMerkleRoots(ctx context.Context, merkleRoots []*blocktx_api.MerkleRootVerificationRequest, maxAllowedBlockHeightMismatch int) (*blocktx_api.MerkleRootVerificationResponse, error) Ping(ctx context.Context) error diff --git a/internal/k8s_watcher/watcher.go b/internal/k8s_watcher/watcher.go index 6315d2511..842a367de 100644 --- a/internal/k8s_watcher/watcher.go +++ b/internal/k8s_watcher/watcher.go @@ -9,14 +9,12 @@ import ( "sync" "time" - "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/metamorph" ) const ( logLevelDefault = slog.LevelInfo metamorphService = "metamorph" - blocktxService = "blocktx" intervalDefault = 15 * time.Second maxRetries = 5 retryIntervalDefault = 2 * time.Second @@ -28,7 +26,6 @@ type K8sClient interface { type Watcher struct { metamorphClient metamorph.TransactionMaintainer - blocktxClient blocktx.Watcher k8sClient K8sClient logger *slog.Logger tickerMetamorph Ticker @@ -55,10 +52,9 @@ func WithRetryInterval(d time.Duration) func(*Watcher) { type ServerOption func(f *Watcher) // New The K8s watcher listens to events coming from Kubernetes. If it detects a metamorph pod which was terminated, then it sets records locked by this pod to unlocked. This is a safety measure for the case that metamorph is terminated ungracefully where it misses to unlock its records itself. -func New(metamorphClient metamorph.TransactionMaintainer, blocktxClient blocktx.Watcher, k8sClient K8sClient, namespace string, opts ...ServerOption) *Watcher { +func New(metamorphClient metamorph.TransactionMaintainer, k8sClient K8sClient, namespace string, opts ...ServerOption) *Watcher { watcher := &Watcher{ metamorphClient: metamorphClient, - blocktxClient: blocktxClient, k8sClient: k8sClient, namespace: namespace, @@ -98,79 +94,12 @@ func WithMetamorphTicker(t Ticker) func(*Watcher) { } } -func WithBlocktxTicker(t Ticker) func(*Watcher) { - return func(p *Watcher) { - p.tickerBlocktx = t - } -} - func (c *Watcher) Start() error { c.watchMetamorph() - c.watchBlocktx() - return nil } -func (c *Watcher) watchBlocktx() { - ctx, cancel := context.WithCancel(context.Background()) - c.shutdownBlocktx = cancel - c.waitGroup.Add(1) - go func() { - var runningPods map[string]struct{} - defer c.waitGroup.Done() - for { - select { - case <-ctx.Done(): - return - case <-c.tickerBlocktx.Tick(): - // Update the list of running pods. Detect those which have been terminated and call them to delete unfinished block processing - ctx := context.Background() - runningPodsK8s, err := c.k8sClient.GetRunningPodNames(ctx, c.namespace, blocktxService) - if err != nil { - c.logger.Error("failed to get pods", slog.String("err", err.Error())) - continue - } - - for podName := range runningPods { - // Ignore all other services than blocktx - if !strings.Contains(podName, blocktxService) { - continue - } - - _, found := runningPodsK8s[podName] - if !found { - // A previously running pod has been terminated => set records locked by this pod unlocked - - retryTicker := time.NewTicker(c.retryInterval) - i := 0 - - retryLoop: - for range retryTicker.C { - i++ - - if i > maxRetries { - c.logger.Error(fmt.Sprintf("Failed to delete unfinished block processing after %d retries", maxRetries), slog.String("pod-name", podName)) - break retryLoop - } - - rows, err := c.blocktxClient.DelUnfinishedBlockProcessing(ctx, podName) - if err != nil { - c.logger.Error("Failed to delete unfinished block processing", slog.String("pod-name", podName), slog.String("err", err.Error())) - continue - } - c.logger.Info("Deleted unfinished block processing", slog.Int64("rows-affected", rows), slog.String("pod-name", podName)) - break - } - } - } - - runningPods = runningPodsK8s - } - } - }() -} - func (c *Watcher) watchMetamorph() { ctx, cancel := context.WithCancel(context.Background()) c.shutdownMetamorph = cancel diff --git a/internal/k8s_watcher/watcher_test.go b/internal/k8s_watcher/watcher_test.go index 2385bdc37..9baed4706 100644 --- a/internal/k8s_watcher/watcher_test.go +++ b/internal/k8s_watcher/watcher_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" - btxMocks "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/k8s_watcher" "github.com/bitcoin-sv/arc/internal/k8s_watcher/mocks" mtmMocks "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -68,8 +67,6 @@ func TestStartMetamorphWatcher(t *testing.T) { return 3, nil }, } - blocktxMock := &btxMocks.WatcherMock{} - iteration := 0 getPodNamesErrTest := tc.getPodNamesErr podNamestTest := tc.podNames @@ -96,7 +93,7 @@ func TestStartMetamorphWatcher(t *testing.T) { StopFunc: func() {}, } - watcher := k8s_watcher.New(metamorphMock, blocktxMock, k8sClientMock, "test-namespace", k8s_watcher.WithMetamorphTicker(ticker), + watcher := k8s_watcher.New(metamorphMock, k8sClientMock, "test-namespace", k8s_watcher.WithMetamorphTicker(ticker), k8s_watcher.WithLogger(slog.Default()), k8s_watcher.WithRetryInterval(20*time.Millisecond), ) @@ -113,92 +110,3 @@ func TestStartMetamorphWatcher(t *testing.T) { }) } } - -func TestStartBlocktxWatcher(t *testing.T) { - tt := []struct { - name string - podNames []map[string]struct{} - getPodNamesErr error - setUnlockedErr error - - expectedBlocktxDelUnfinishedBlockProcessingFunc int - }{ - { - name: "unlock records for metamorph-pod-2", - podNames: []map[string]struct{}{ - {"blocktx-pod-1": {}, "blocktx-pod-2": {}, "api-pod-1": {}, "metamorph-pod-1": {}}, - {"blocktx-pod-1": {}, "metamorph-pod-1": {}}, - {"blocktx-pod-1": {}, "blocktx-pod-3": {}, "api-pod-2": {}, "metamorph-pod-1": {}}, - }, - - expectedBlocktxDelUnfinishedBlockProcessingFunc: 1, - }, - { - name: "error - get pod names", - podNames: []map[string]struct{}{{"": {}}}, - getPodNamesErr: errors.New("failed to get pod names"), - - expectedBlocktxDelUnfinishedBlockProcessingFunc: 0, - }, - { - name: "error - set unlocked", - podNames: []map[string]struct{}{ - {"blocktx-pod-1": {}, "blocktx-pod-2": {}}, - {"blocktx-pod-1": {}}, - {"blocktx-pod-1": {}, "blocktx-pod-3": {}}, - }, - setUnlockedErr: errors.New("failed to set unlocked"), - - expectedBlocktxDelUnfinishedBlockProcessingFunc: 1, - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - metamorphMock := &mtmMocks.TransactionMaintainerMock{} - blocktxMock := &btxMocks.WatcherMock{ - DelUnfinishedBlockProcessingFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, - } - - iteration := 0 - getPodNamesErrTest := tc.getPodNamesErr - podNamesTest := tc.podNames - k8sClientMock := &mocks.K8sClientMock{ - GetRunningPodNamesFunc: func(_ context.Context, _ string, _ string) (map[string]struct{}, error) { - if getPodNamesErrTest != nil { - return nil, getPodNamesErrTest - } - - podNames := podNamesTest[iteration] - - iteration++ - - return podNames, nil - }, - } - - tickerChannel := make(chan time.Time, 1) - - ticker := &mocks.TickerMock{ - TickFunc: func() <-chan time.Time { - return tickerChannel - }, - StopFunc: func() {}, - } - - watcher := k8s_watcher.New(metamorphMock, blocktxMock, k8sClientMock, "test-namespace", k8s_watcher.WithBlocktxTicker(ticker), - k8s_watcher.WithLogger(slog.Default()), - ) - err := watcher.Start() - require.NoError(t, err) - - for range tc.podNames { - tickerChannel <- time.Now() - } - - watcher.Shutdown() - - require.Equal(t, tc.expectedBlocktxDelUnfinishedBlockProcessingFunc, len(blocktxMock.DelUnfinishedBlockProcessingCalls())) - }) - } -} diff --git a/internal/p2p/mocks/wire_msg_mock.go b/internal/p2p/mocks/wire_msg_mock.go index a758fed25..1d17d1b4d 100644 --- a/internal/p2p/mocks/wire_msg_mock.go +++ b/internal/p2p/mocks/wire_msg_mock.go @@ -4,20 +4,21 @@ package mocks import ( + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/libsv/go-p2p/wire" "io" "sync" ) -// Ensure, that MessageMock does implement wire.Message. +// Ensure, that MessageMock does implement p2p.Message. // If this is not the case, regenerate this file with moq. -var _ wire.Message = &MessageMock{} +var _ p2p.Message = &MessageMock{} -// MessageMock is a mock implementation of wire.Message. +// MessageMock is a mock implementation of p2p.Message. // // func TestSomethingThatUsesMessage(t *testing.T) { // -// // make and configure a mocked wire.Message +// // make and configure a mocked p2p.Message // mockedMessage := &MessageMock{ // BsvEncodeFunc: func(writer io.Writer, v uint32, messageEncoding wire.MessageEncoding) error { // panic("mock out the BsvEncode method") @@ -33,7 +34,7 @@ var _ wire.Message = &MessageMock{} // }, // } // -// // use mockedMessage in code that requires wire.Message +// // use mockedMessage in code that requires p2p.Message // // and then make assertions. // // } diff --git a/internal/p2p/p2p_mocks.go b/internal/p2p/p2p_mocks.go index 7600d3044..8071eef22 100644 --- a/internal/p2p/p2p_mocks.go +++ b/internal/p2p/p2p_mocks.go @@ -1,4 +1,7 @@ package p2p //go:generate moq -pkg mocks -out ./mocks/peer_mock.go . PeerI + //go:generate moq -pkg mocks -out ./mocks/message_handler_mock.go . MessageHandlerI + +//go:generate moq -pkg mocks -out ./mocks/wire_msg_mock.go . Message diff --git a/internal/p2p/wire.go b/internal/p2p/wire.go new file mode 100644 index 000000000..529a2c88f --- /dev/null +++ b/internal/p2p/wire.go @@ -0,0 +1,14 @@ +package p2p + +import ( + "io" + + "github.com/libsv/go-p2p/wire" +) + +type Message interface { + Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error + BsvEncode(io.Writer, uint32, wire.MessageEncoding) error + Command() string + MaxPayloadLength(uint32) uint64 +} diff --git a/internal/validator/helpers_mock.go b/internal/validator/helpers_mock.go deleted file mode 100644 index 30e571d5f..000000000 --- a/internal/validator/helpers_mock.go +++ /dev/null @@ -1,5 +0,0 @@ -package validator - -//go:generate moq -pkg mocks -out ./mocks/tx_finder_interface_mock.go . TxFinderI - -//go:generate moq -pkg mocks -out ./mocks/merkle_verifier_interface_mock.go . MerkleVerifierI diff --git a/internal/validator/mocks/merkle_verifier_interface_mock.go b/internal/validator/mocks/merkle_verifier_mock.go similarity index 100% rename from internal/validator/mocks/merkle_verifier_interface_mock.go rename to internal/validator/mocks/merkle_verifier_mock.go diff --git a/internal/validator/mocks/tx_finder_interface_mock.go b/internal/validator/mocks/tx_finder_mock.go similarity index 100% rename from internal/validator/mocks/tx_finder_interface_mock.go rename to internal/validator/mocks/tx_finder_mock.go diff --git a/internal/validator/validator_mocks.go b/internal/validator/validator_mocks.go new file mode 100644 index 000000000..8d2234f49 --- /dev/null +++ b/internal/validator/validator_mocks.go @@ -0,0 +1,5 @@ +package validator + +//go:generate moq -pkg mocks -out ./mocks/tx_finder_mock.go . TxFinderI + +//go:generate moq -pkg mocks -out ./mocks/merkle_verifier_mock.go . MerkleVerifierI diff --git a/test/submit_01_single_test.go b/test/submit_01_single_test.go index 2d92a989c..c5db2bdab 100644 --- a/test/submit_01_single_test.go +++ b/test/submit_01_single_test.go @@ -567,12 +567,15 @@ func TestBatchCallback(t *testing.T) { // then + var errs []error + // verify callbacks were received correctly for i, srv := range callbackServers { t.Logf("listen callbacks on server %s", srv.url) expectedTxsCallbacks := make(map[string]int) // key: txID, value: number of received callbacks for _, tx := range txs { + t.Logf("expected callback - server: %d, tx ID: %s", i, tx.TxID()) expectedTxsCallbacks[tx.TxID()] = 0 } @@ -586,29 +589,34 @@ func TestBatchCallback(t *testing.T) { require.Greater(t, batch.Count, 0) require.NotNil(t, batch.Callbacks) - t.Logf("callback server %d iteration %d, count: %d result[0]: %s", i, j, batch.Count, batch.Callbacks[0].TxStatus) + t.Logf("callback server: %d, callback: %d, count: %d result[0]: %s", i, j, batch.Count, batch.Callbacks[0].TxStatus) for _, callback := range batch.Callbacks { - visitNumber, expectedTx := expectedTxsCallbacks[callback.Txid] - require.True(t, expectedTx) + visitNumber, txWasExpected := expectedTxsCallbacks[callback.Txid] + assert.True(t, txWasExpected) + visitNumber++ expectedTxsCallbacks[callback.Txid] = visitNumber - if visitNumber == callbacksNumber { - delete(expectedTxsCallbacks, callback.Txid) // remove after receiving expected callbacks - } - - require.Equal(t, StatusMined, callback.TxStatus) + assert.Equal(t, StatusMined, callback.TxStatus) } - case err := <-srv.errChan: - t.Fatalf("callback server %d received - failed to parse %d callback %v", i, j, err) + case err = <-srv.errChan: + errs = append(errs, fmt.Errorf("callback received with error - server: %d, callback: %d, err: %v", i, j, err)) + t.Fail() case <-callbackTimeout: - t.Fatalf("callback server %d not received %d callback - timeout", i, j) + errs = append(errs, fmt.Errorf("callback not received - server: %d callback: %d - timeout", i, j)) + t.Fail() } } - require.Empty(t, expectedTxsCallbacks) // ensure all expected callbacks were received + for _, err = range errs { + assert.NoError(t, err) + } + + for txID, receivedCallbacks := range expectedTxsCallbacks { + assert.Equalf(t, expectedCallbacksNumber, receivedCallbacks, "expected callbacks mismatch for tx: %s", txID) + } } }) }