diff --git a/Makefile b/Makefile index 4020840c1..468821d8c 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,12 @@ run_e2e_tests_with_tracing: docker compose down --remove-orphans ARC_TRACING_ENABLED=TRUE docker compose up --build blocktx callbacker metamorph api tests jaeger --scale blocktx=4 --scale metamorph=2 --no-attach jaeger +.PHONY: run_e2e_mcast_tests +run_e2e_mcast_tests: + docker compose -f docker-compose-mcast.yaml down --remove-orphans + docker compose -f docker-compose-mcast.yaml up --build mcast_sidecar blocktx metamorph api tests --scale blocktx=6 --exit-code-from tests + docker compose -f docker-compose-mcast.yaml down + .PHONY: test test: go test -race -count=1 ./... diff --git a/cmd/mcast/node_sidecar/Dockerfile b/cmd/mcast/node_sidecar/Dockerfile new file mode 100644 index 000000000..b00334c22 --- /dev/null +++ b/cmd/mcast/node_sidecar/Dockerfile @@ -0,0 +1,42 @@ +FROM golang:1.22.5-alpine3.20 AS build-stage + +ARG APP_COMMIT +ARG APP_VERSION +ARG REPOSITORY="github.com/bitcoin-sv/arc" + +RUN apk --update add ca-certificates + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download +RUN go mod verify + +COPY cmd/ cmd/ +COPY internal/ internal/ +COPY pkg/ pkg/ +COPY config/ config/ + +# Add grpc_health_probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.4.24 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-X $REPOSITORY/internal/version.Commit=$APP_COMMIT -X $REPOSITORY/internal/version.Version=$APP_VERSION" -o /bsvn_sidecar_linux_amd64 ./cmd/mcast/node_sidecar/main.go + + +# Deploy the application binary into a lean image +FROM scratch + +WORKDIR /service + +COPY --from=build-stage /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=build-stage /bsvn_sidecar_linux_amd64 /service/sidecar +COPY --from=build-stage /bin/grpc_health_probe /bin/grpc_health_probe +COPY deployments/passwd /etc/passwd + +USER nobody + +EXPOSE 9090 + +CMD ["/service/sidecar"] diff --git a/cmd/mcast/node_sidecar/main.go b/cmd/mcast/node_sidecar/main.go new file mode 100644 index 000000000..713d8f6f7 --- /dev/null +++ b/cmd/mcast/node_sidecar/main.go @@ -0,0 +1,313 @@ +package main + +/* +Description: +This application serves as a sidecar for a Bitcoin SV node, acting as a bridge between the P2P network and multicast groups. +It facilitates communication between these two networks, enabling message exchange in both directions: from P2P to multicast and from multicast to P2P. + +It handles two main types of messages: +1. Block and transaction messages (MsgBlock, MsgTx) transmitted over both P2P and multicast networks. +2. Data requests (e.g., MsgGetData, MsgInv) sent in response to incoming requests on the P2P network. + +The application is configured via a configuration file that specifies: +- P2P node address. +- Multicast group addresses. + +The app: +1. Connects to specified node via P2P. +2. Joins the appropriate multicast groups. +3. Handles sending and receiving messages between P2P and multicast. +*/ + +import ( + "errors" + "flag" + "fmt" + "log" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/bitcoin-sv/arc/config" + arcLogger "github.com/bitcoin-sv/arc/internal/logger" + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/bitcoin-sv/arc/internal/version" + "github.com/libsv/go-p2p/wire" +) + +func main() { + err := run() + if err != nil { + log.Fatalf("Failed to run multicast emulator: %v", err) + } + + os.Exit(0) +} + +func run() error { + configDir, _, _, _, _, _, _ := parseFlags() + arcConfig, err := config.Load(configDir) + if err != nil { + return err + } + + logger, err := arcLogger.NewLogger(arcConfig.LogLevel, arcConfig.LogFormat) + if err != nil { + return fmt.Errorf("failed to create logger: %v", err) + } + + stopFn, err := startMcastSideCar(logger, arcConfig) + if err != nil { + return fmt.Errorf("failed to start Multicast-P2P bridge: %v", err) + } + + // wait for termination signal + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT) + + <-signalChan + + stopFn() + return nil +} + +func startMcastSideCar(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), error) { + logger.Info("Starting Multicast Emulator sidecar", slog.String("version", version.Version), slog.String("commit", version.Commit)) + + network, err := config.GetNetwork(arcConfig.Blocktx.BlockchainNetwork.Network) + if err != nil { + return nil, err + } + + communicationBridge := &MulticastP2PBridge{ + l: logger.With(slog.String("module", "multicast-p2p-bridge")), + } + + // connect to peer + peerCfg := arcConfig.Blocktx.BlockchainNetwork.Peers[0] + peer, err := connectPeer(logger, peerCfg, communicationBridge, network) + if err != nil { + return nil, err + } + communicationBridge.peer = peer + + // connect to mcast groups + blockGroupCfg := arcConfig.Blocktx.BlockchainNetwork.Mcast.McastBlock + blockGroup, err := connectMcastGroup[*wire.MsgBlock](logger, &blockGroupCfg, communicationBridge, network) + if err != nil { + return nil, err + } + communicationBridge.blockGroup = blockGroup + + txGroupCfg := arcConfig.Metamorph.BlockchainNetwork.Mcast.McastTx + txGroup, err := connectMcastGroup[*wire.MsgTx](logger, &txGroupCfg, communicationBridge, network) + if err != nil { + return nil, err + } + communicationBridge.txGroup = txGroup + + // return cleanup function + return func() { + peer.Shutdown() + blockGroup.Disconnect() + txGroup.Disconnect() + }, nil +} + +func connectPeer(logger *slog.Logger, peerCfg *config.PeerConfig, msgHandler p2p.MessageHandlerI, network wire.BitcoinNet) (*p2p.Peer, error) { + peerURL, err := peerCfg.GetP2PUrl() + if err != nil { + return nil, err + } + + peer := p2p.NewPeer(logger, msgHandler, peerURL, network) + connected := peer.Connect() + if !connected { + return nil, errors.New("cannot connect to peer") + } + + return peer, nil +} + +func connectMcastGroup[T wire.Message](logger *slog.Logger, grCfg *config.McastGroup, mcastMsgHandler multicast.MessageHandlerI, network wire.BitcoinNet) (*multicast.Group[T], error) { + mcastGroup := multicast.NewGroup[T](logger, mcastMsgHandler, grCfg.Address, multicast.Read|multicast.Write, network) + connected := mcastGroup.Connect() + if !connected { + return nil, fmt.Errorf("cannot connect to %s multicast group", grCfg.Address) + } + + return mcastGroup, nil +} + +func parseFlags() (string, bool, bool, bool, bool, bool, string) { + startAPI := flag.Bool("api", false, "Start ARC API server") + startMetamorph := flag.Bool("metamorph", false, "Start Metamorph") + startBlockTx := flag.Bool("blocktx", false, "Start BlockTx") + startK8sWatcher := flag.Bool("k8s-watcher", false, "Start K8s-Watcher") + startCallbacker := flag.Bool("callbacker", false, "Start Callbacker") + help := flag.Bool("help", false, "Show help") + dumpConfigFile := flag.String("dump_config", "", "Dump config to specified file and exit") + configDir := flag.String("config", "", "Path to configuration file") + + flag.Parse() + + if *help { + fmt.Println("Usage: main [options]") + os.Exit(0) + } + + return *configDir, *startAPI, *startMetamorph, *startBlockTx, *startK8sWatcher, *startCallbacker, *dumpConfigFile +} + +// MulticastP2PBridge is a bridge between a P2P connection and a multicast groups. +// It facilitates the handling of messages between P2P and multicast communication channels. +// Specifically, it listens for messages from both channels and translates or forwards them accordingly. +type MulticastP2PBridge struct { + l *slog.Logger + + blockGroup *multicast.Group[*wire.MsgBlock] + txGroup *multicast.Group[*wire.MsgTx] + peer *p2p.Peer + + txCache sync.Map +} + +var ( + _ multicast.MessageHandlerI = (*MulticastP2PBridge)(nil) + _ p2p.MessageHandlerI = (*MulticastP2PBridge)(nil) +) + +// implement p2p.MessageHandlerI +// OnReceive handles incoming messages depending on command type +func (b *MulticastP2PBridge) OnReceive(msg wire.Message, peer p2p.PeerI) { + cmd := msg.Command() + switch cmd { + case wire.CmdInv: + invMsg, ok := msg.(*wire.MsgInv) + if !ok { + return + } + b.handleReceivedP2pInvMsg(invMsg, peer) + + case wire.CmdBlock: + blockMsg, ok := msg.(*wire.MsgBlock) + if !ok { + b.l.Error("cannot cast msg to wire.MsgBlock") + return + } + + b.handleReceivedBlockMsg(blockMsg, peer) + + case wire.CmdGetData: + getMsg, ok := msg.(*wire.MsgGetData) + if !ok { + b.l.Error("cannot cast msg to wire.MsgGetData") + return + } + + b.handleReceivedGetDataMsg(getMsg, peer) + + default: + // ignore other msgs + } +} + +// OnSend handles outgoing messages depending on command type +func (b *MulticastP2PBridge) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} + +func (b *MulticastP2PBridge) handleReceivedP2pInvMsg(msg *wire.MsgInv, peer p2p.PeerI) { + for _, inv := range msg.InvList { + if inv.Type == wire.InvTypeBlock { + b.l.Info("Received BlockINV", slog.String("hash", inv.Hash.String()), slog.String("peer", peer.String())) + + b.l.Info("Request Block from peer", slog.String("hash", inv.Hash.String()), slog.String("peer", peer.String())) + msg := wire.NewMsgGetDataSizeHint(1) + _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, &inv.Hash)) // ignore error at this point + peer.WriteMsg(msg) + } + + // ignore other inv + } +} + +func (b *MulticastP2PBridge) handleReceivedBlockMsg(blockMsg *wire.MsgBlock, peer p2p.PeerI) { + if b.blockGroup == nil { + b.l.Warn("multicast is not ready yet") + return + } + + b.l.Info("Received BlockMsg", slog.String("hash", blockMsg.BlockHash().String()), slog.String("peer", peer.String())) + b.l.Info("Send BlockMsg to multicast handler", slog.String("hash", blockMsg.BlockHash().String()), slog.String("peer", peer.String())) + b.blockGroup.WriteMsg(blockMsg) +} + +func (b *MulticastP2PBridge) handleReceivedGetDataMsg(getMsg *wire.MsgGetData, peer p2p.PeerI) { + b.l.Info("Peer requested data", slog.String("peer", peer.String())) + for _, inv := range getMsg.InvList { + if inv.Type == wire.InvTypeTx { + // check if have tx in memory and send it to peer + anyMsg, found := b.txCache.Load(inv.Hash) + if found { + txMsg := anyMsg.(*wire.MsgTx) // nolint:errcheck,revive + peer.WriteMsg(txMsg) + b.l.Info("Sent requested data to the peer", slog.String("hash", inv.Hash.String()), slog.String("peer", peer.String())) + } + } + // ignore other inv + } +} + +// implement multicast.MessageHandlerI +// OnReceiveFromMcast handles incoming messages from multicast group depending on command type +func (b *MulticastP2PBridge) OnReceiveFromMcast(msg wire.Message) { + cmd := msg.Command() + switch cmd { + case wire.CmdBlock: + blockmsg := msg.(*wire.MsgBlock) // nolint:errcheck,revive + b.l.Info("Received BlockMsg from multicast", slog.String("hash", blockmsg.BlockHash().String())) + + case wire.CmdTx: + txmsg := msg.(*wire.MsgTx) // nolint:errcheck,revive + b.handleReceivedMcastTxMsg(txmsg) + + default: + b.l.Error("Unexpected msg from multicast group!", slog.String("unexpected.cmd", cmd)) + } +} + +// OnSendToMcast handles outgoing messages to multicast group depending on command type +func (b *MulticastP2PBridge) OnSendToMcast(msg wire.Message) { + cmd := msg.Command() + switch cmd { + case wire.CmdBlock: + blockmsg := msg.(*wire.MsgBlock) // nolint:errcheck,revive + b.l.Info("Sent BlockMsg to multicast", slog.String("hash", blockmsg.BlockHash().String())) + + case wire.CmdTx: + txmsg := msg.(*wire.MsgTx) // nolint:errcheck,revive + b.l.Info("Sent TxMsg to multicast", slog.String("hash", txmsg.TxHash().String())) + + default: + b.l.Error("Unexpected msg sent to multicast group!", slog.String("unexpected.cmd", cmd)) + } +} + +func (b *MulticastP2PBridge) handleReceivedMcastTxMsg(txmsg *wire.MsgTx) { + txHash := txmsg.TxHash() + b.l.Info("Received TxMsg from multicast", slog.String("hash", txHash.String())) + + // save TxMsg for later to send it to peer when it request it + b.txCache.Store(txHash, txmsg) + + // announce tx to peer + b.l.Info("Send INV to peer", slog.String("hash", txHash.String()), slog.String("peer", b.peer.String())) + + invMsg := wire.NewMsgInv() + _ = invMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, &txHash)) + b.peer.WriteMsg(invMsg) +} diff --git a/docker-compose-mcast.yaml b/docker-compose-mcast.yaml new file mode 100644 index 000000000..fb4cde97b --- /dev/null +++ b/docker-compose-mcast.yaml @@ -0,0 +1,333 @@ +version: '3' +services: + node1: + container_name: node1 + image: bitcoinsv/bitcoin-sv:1.1.0 + ports: + - "18332:18332" + expose: + - "18332" + - "18333" + - "28332" + healthcheck: + test: [ "CMD", "/entrypoint.sh", "bitcoin-cli", "getinfo" ] + volumes: + - ./test/config/bitcoin.conf:/data/bitcoin.conf + - node1-data:/data + command: [ "/entrypoint.sh", "bitcoind", "-connect=node2:18333", "-connect=node3:18333" ] + networks: + - multicast_bridge + + node2: + container_name: node2 + image: bitcoinsv/bitcoin-sv:1.1.0 + expose: + - "18332" + - "18333" + healthcheck: + test: [ "CMD", "/entrypoint.sh", "bitcoin-cli", "getinfo" ] + volumes: + - ./test/config/bitcoin.conf:/data/bitcoin.conf + - node2-data:/data + command: [ "/entrypoint.sh", "bitcoind", "-connect=node1:18333", "-connect=node3:18333" ] + networks: + - multicast_bridge + + node3: + container_name: node3 + image: bitcoinsv/bitcoin-sv:1.1.0 + expose: + - "18332" + - "18333" + healthcheck: + test: [ "CMD", "/entrypoint.sh", "bitcoin-cli", "getinfo" ] + volumes: + - ./test/config/bitcoin.conf:/data/bitcoin.conf + - node3-data:/data + command: [ "/entrypoint.sh", "bitcoind", "-connect=node1:18333", "-connect=node2:18333" ] + networks: + - multicast_bridge + + db: + image: postgres:15.4 + restart: always + environment: + - POSTGRES_USER=arcuser + - POSTGRES_PASSWORD=arcpass + - POSTGRES_DB=main + healthcheck: + test: [ "CMD-SHELL", "pg_isready", "-d blocktx", "-U arcuser" ] + interval: 5s + timeout: 5s + retries: 5 + ports: + - '5432:5432' + expose: + - "5432" + networks: + - multicast_bridge + + migrate-blocktx: + container_name: migrate-blocktx + image: migrate/migrate:v4.16.2 + entrypoint: + [ + "migrate", + "-path", + "/migrations", + "-database", + "postgres://arcuser:arcpass@db:5432/main?sslmode=disable&x-migrations-table=blocktx", + ] + command: [ "up" ] + volumes: + - ./internal/blocktx/store/postgresql/migrations:/migrations + depends_on: + db: + condition: service_healthy + restart: on-failure + networks: + - multicast_bridge + + migrate-metamorph: + container_name: migrate-metamorph + image: migrate/migrate:v4.16.2 + entrypoint: + [ + "migrate", + "-path", + "/migrations", + "-database", + "postgres://arcuser:arcpass@db:5432/main?sslmode=disable&x-migrations-table=metamorph", + ] + command: [ "up" ] + volumes: + - ./internal/metamorph/store/postgresql/migrations:/migrations + depends_on: + db: + condition: service_healthy + restart: on-failure + networks: + - multicast_bridge + + migrate-callbacker: + container_name: migrate-callbacker + image: migrate/migrate:v4.16.2 + entrypoint: + [ + "migrate", + "-path", + "/migrations", + "-database", + "postgres://arcuser:arcpass@db:5432/main?sslmode=disable&x-migrations-table=callbacker", + ] + command: [ "up" ] + volumes: + - ./internal/callbacker/store/postgresql/migrations:/migrations + depends_on: + db: + condition: service_healthy + restart: on-failure + networks: + - multicast_bridge + + nats-1: + image: nats:2.10.18-alpine3.20 + container_name: nats-server-1 + restart: on-failure + ports: + - "4222:4222" + hostname: nats-server + volumes: + - nats1-data:/data + - ./test/config/nats-server-host-1.conf:/etc/nats/nats-server.conf + healthcheck: + test: wget http://localhost:8222/healthz -q -S -O - + interval: 5s + timeout: 5s + retries: 5 + networks: + - multicast_bridge + + nats-2: + image: nats:2.10.18-alpine3.20 + container_name: nats-server-2 + restart: on-failure + ports: + - "4223:4222" + hostname: nats-server + volumes: + - nats2-data:/data + - ./test/config/nats-server-host-2.conf:/etc/nats/nats-server.conf + healthcheck: + test: wget http://localhost:8222/healthz -q -S -O - + interval: 5s + timeout: 5s + retries: 5 + networks: + - multicast_bridge + + cache: + image: redis + hostname: redis + ports: + - "6379:6379" + volumes: + - redis-data:/data + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + networks: + - multicast_bridge + + mcast_sidecar: + build: + context: ./ + dockerfile: ./cmd/mcast/node_sidecar/Dockerfile + volumes: + - ./test/config/config_mcast.yaml:/service/config.yaml + command: [ "./sidecar", "-config=." ] + depends_on: + node1: + condition: service_healthy + node2: + condition: service_healthy + node3: + condition: service_healthy + networks: + - multicast_bridge + + blocktx: + build: ./ + expose: + - "8011" + volumes: + - ./test/config/config_mcast.yaml:/service/config.yaml + command: [ "./arc", "-blocktx=true", "-config=." ] + environment: + - ARC_TRACING_ENABLED + depends_on: + nats-1: + condition: service_healthy + nats-2: + condition: service_healthy + node1: + condition: service_healthy + node2: + condition: service_healthy + node3: + condition: service_healthy + migrate-blocktx: + condition: service_completed_successfully + + healthcheck: + test: ["CMD", "/bin/grpc_health_probe", "-addr=:8006", "-service=liveness", "-rpc-timeout=5s"] + interval: 10s + timeout: 5s + retries: 3 + networks: + - multicast_bridge + + callbacker: + build: ./ + expose: + - "8021" + command: [ "./arc", "-callbacker=true", "-config=." ] + environment: + - ARC_TRACING_ENABLED + volumes: + - ./test/config/config_mcast.yaml:/service/config.yaml + depends_on: + nats-1: + condition: service_healthy + nats-2: + condition: service_healthy + migrate-callbacker: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "/bin/grpc_health_probe", "-addr=:8022", "-service=liveness", "-rpc-timeout=5s"] + interval: 10s + timeout: 5s + retries: 3 + networks: + - multicast_bridge + + metamorph: + build: ./ + expose: + - "8001" + command: [ "./arc", "-metamorph=true", "-config=." ] + environment: + - ARC_TRACING_ENABLED + volumes: + - ./test/config/config_mcast.yaml:/service/config.yaml + depends_on: + blocktx: + condition: service_healthy + callbacker: + condition: service_healthy + cache: + condition: service_healthy + migrate-metamorph: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "/bin/grpc_health_probe", "-addr=:8005", "-service=liveness", "-rpc-timeout=5s"] + interval: 10s + timeout: 5s + retries: 3 + networks: + - multicast_bridge + + api: + build: ./ + ports: + - "8011:8011" + - "9090:9090" + - "9999:9999" + - "2112:2112" + expose: + - "9090" + - "2112" + command: [ "./arc", "-api=true", "-config=." ] + environment: + - ARC_TRACING_ENABLED + volumes: + - ./test/config/config.yaml:/service/config.yaml + depends_on: + metamorph: + condition: service_healthy + networks: + - multicast_bridge + + tests: + build: + context: ./ + dockerfile: ./test/Dockerfile + environment: + - TEST_LOCAL_MCAST=TRUE + depends_on: + - api + networks: + - multicast_bridge + +volumes: + node1-data: + external: false + node2-data: + external: false + node3-data: + external: false + nats1-data: + external: false + nats2-data: + external: false + redis-data: + external: false + +networks: + multicast_bridge: + driver: bridge + enable_ipv6: true + ipam: + driver: default + config: + - subnet: "2001:db8:9::/64" + gateway: "2001:db8:9::1" diff --git a/docker-compose.yaml b/docker-compose.yaml index 6990ce3fd..e135323ef 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -197,6 +197,10 @@ services: volumes: - ./test/config/config.yaml:/service/config.yaml depends_on: + nats-1: + condition: service_healthy + nats-2: + condition: service_healthy migrate-callbacker: condition: service_completed_successfully healthcheck: diff --git a/internal/blocktx/bcnet/mcast/listener.go b/internal/blocktx/bcnet/mcast/listener.go index 3526203c8..fc4a08efa 100644 --- a/internal/blocktx/bcnet/mcast/listener.go +++ b/internal/blocktx/bcnet/mcast/listener.go @@ -69,8 +69,8 @@ func (l *Listener) Disconnect() { l.blockGroup.Disconnect() } -// OnReceive handles received messages from multicast group -func (l *Listener) OnReceive(msg wire.Message) { +// OnReceiveFromMcast handles received messages from multicast group +func (l *Listener) OnReceiveFromMcast(msg wire.Message) { if msg.Command() == wire.CmdBlock { blockMsg, ok := msg.(*bcnet.BlockMessage) if !ok { @@ -104,7 +104,7 @@ func (l *Listener) OnReceive(msg wire.Message) { // ignore other messages } -// OnSend handles sent messages to multicast group -func (l *Listener) OnSend(_ wire.Message) { +// OnSendToMcast handles sent messages to multicast group +func (l *Listener) OnSendToMcast(_ wire.Message) { // ignore } diff --git a/internal/metamorph/bcnet/mcast/multicaster.go b/internal/metamorph/bcnet/mcast/multicaster.go index 2ec1df586..02a0ce181 100644 --- a/internal/metamorph/bcnet/mcast/multicaster.go +++ b/internal/metamorph/bcnet/mcast/multicaster.go @@ -90,7 +90,7 @@ func (m *Multicaster) SendTx(rawTx []byte) error { } // OnReceive should be fire & forget -func (m *Multicaster) OnReceive(msg wire.Message) { +func (m *Multicaster) OnReceiveFromMcast(msg wire.Message) { if msg.Command() == wire.CmdReject { rejectMsg, ok := msg.(*wire.MsgReject) if !ok { @@ -109,7 +109,7 @@ func (m *Multicaster) OnReceive(msg wire.Message) { } // OnSend should be fire & forget -func (m *Multicaster) OnSend(msg wire.Message) { +func (m *Multicaster) OnSendToMcast(msg wire.Message) { if msg.Command() == wire.CmdTx { txMsg, ok := msg.(*wire.MsgTx) if !ok { diff --git a/internal/multicast/group.go b/internal/multicast/group.go index 6f0bc860a..fa565e9a6 100644 --- a/internal/multicast/group.go +++ b/internal/multicast/group.go @@ -66,8 +66,8 @@ type Group[T wire.Message] struct { // By defining this interface, the Group structure decouples the message handling logic // from the underlying communication mechanism, providing extensibility and modularity. type MessageHandlerI interface { - OnReceive(msg wire.Message) - OnSend(msg wire.Message) + OnReceiveFromMcast(msg wire.Message) + OnSendToMcast(msg wire.Message) } type ModeFlag uint8 @@ -240,7 +240,7 @@ func (g *Group[T]) listenForMessages() { } g.logger.Log(context.Background(), slogLvlTrace, "Received message") - g.mh.OnReceive(msg) + g.mh.OnReceiveFromMcast(msg) } }() } @@ -272,7 +272,7 @@ func (g *Group[T]) sendMessages() { g.logger.Log(context.Background(), slogLvlTrace, "Sent message") // let client react on sending msg - g.mh.OnSend(msg) + g.mh.OnSendToMcast(msg) } } }() diff --git a/internal/multicast/mocks/message_handler_mock.go b/internal/multicast/mocks/message_handler_mock.go index 64da79f87..e032c74f3 100644 --- a/internal/multicast/mocks/message_handler_mock.go +++ b/internal/multicast/mocks/message_handler_mock.go @@ -19,11 +19,11 @@ var _ multicast.MessageHandlerI = &MessageHandlerIMock{} // // // make and configure a mocked multicast.MessageHandlerI // mockedMessageHandlerI := &MessageHandlerIMock{ -// OnReceiveFunc: func(msg wire.Message) { -// panic("mock out the OnReceive method") +// OnReceiveFromMcastFunc: func(msg wire.Message) { +// panic("mock out the OnReceiveFromMcast method") // }, -// OnSendFunc: func(msg wire.Message) { -// panic("mock out the OnSend method") +// OnSendToMcastFunc: func(msg wire.Message) { +// panic("mock out the OnSendToMcast method") // }, // } // @@ -32,89 +32,89 @@ var _ multicast.MessageHandlerI = &MessageHandlerIMock{} // // } type MessageHandlerIMock struct { - // OnReceiveFunc mocks the OnReceive method. - OnReceiveFunc func(msg wire.Message) + // OnReceiveFromMcastFunc mocks the OnReceiveFromMcast method. + OnReceiveFromMcastFunc func(msg wire.Message) - // OnSendFunc mocks the OnSend method. - OnSendFunc func(msg wire.Message) + // OnSendToMcastFunc mocks the OnSendToMcast method. + OnSendToMcastFunc func(msg wire.Message) // calls tracks calls to the methods. calls struct { - // OnReceive holds details about calls to the OnReceive method. - OnReceive []struct { + // OnReceiveFromMcast holds details about calls to the OnReceiveFromMcast method. + OnReceiveFromMcast []struct { // Msg is the msg argument value. Msg wire.Message } - // OnSend holds details about calls to the OnSend method. - OnSend []struct { + // OnSendToMcast holds details about calls to the OnSendToMcast method. + OnSendToMcast []struct { // Msg is the msg argument value. Msg wire.Message } } - lockOnReceive sync.RWMutex - lockOnSend sync.RWMutex + lockOnReceiveFromMcast sync.RWMutex + lockOnSendToMcast sync.RWMutex } -// OnReceive calls OnReceiveFunc. -func (mock *MessageHandlerIMock) OnReceive(msg wire.Message) { - if mock.OnReceiveFunc == nil { - panic("MessageHandlerIMock.OnReceiveFunc: method is nil but MessageHandlerI.OnReceive was just called") +// OnReceiveFromMcast calls OnReceiveFromMcastFunc. +func (mock *MessageHandlerIMock) OnReceiveFromMcast(msg wire.Message) { + if mock.OnReceiveFromMcastFunc == nil { + panic("MessageHandlerIMock.OnReceiveFromMcastFunc: method is nil but MessageHandlerI.OnReceiveFromMcast was just called") } callInfo := struct { Msg wire.Message }{ Msg: msg, } - mock.lockOnReceive.Lock() - mock.calls.OnReceive = append(mock.calls.OnReceive, callInfo) - mock.lockOnReceive.Unlock() - mock.OnReceiveFunc(msg) + mock.lockOnReceiveFromMcast.Lock() + mock.calls.OnReceiveFromMcast = append(mock.calls.OnReceiveFromMcast, callInfo) + mock.lockOnReceiveFromMcast.Unlock() + mock.OnReceiveFromMcastFunc(msg) } -// OnReceiveCalls gets all the calls that were made to OnReceive. +// OnReceiveFromMcastCalls gets all the calls that were made to OnReceiveFromMcast. // Check the length with: // -// len(mockedMessageHandlerI.OnReceiveCalls()) -func (mock *MessageHandlerIMock) OnReceiveCalls() []struct { +// len(mockedMessageHandlerI.OnReceiveFromMcastCalls()) +func (mock *MessageHandlerIMock) OnReceiveFromMcastCalls() []struct { Msg wire.Message } { var calls []struct { Msg wire.Message } - mock.lockOnReceive.RLock() - calls = mock.calls.OnReceive - mock.lockOnReceive.RUnlock() + mock.lockOnReceiveFromMcast.RLock() + calls = mock.calls.OnReceiveFromMcast + mock.lockOnReceiveFromMcast.RUnlock() return calls } -// OnSend calls OnSendFunc. -func (mock *MessageHandlerIMock) OnSend(msg wire.Message) { - if mock.OnSendFunc == nil { - panic("MessageHandlerIMock.OnSendFunc: method is nil but MessageHandlerI.OnSend was just called") +// OnSendToMcast calls OnSendToMcastFunc. +func (mock *MessageHandlerIMock) OnSendToMcast(msg wire.Message) { + if mock.OnSendToMcastFunc == nil { + panic("MessageHandlerIMock.OnSendToMcastFunc: method is nil but MessageHandlerI.OnSendToMcast was just called") } callInfo := struct { Msg wire.Message }{ Msg: msg, } - mock.lockOnSend.Lock() - mock.calls.OnSend = append(mock.calls.OnSend, callInfo) - mock.lockOnSend.Unlock() - mock.OnSendFunc(msg) + mock.lockOnSendToMcast.Lock() + mock.calls.OnSendToMcast = append(mock.calls.OnSendToMcast, callInfo) + mock.lockOnSendToMcast.Unlock() + mock.OnSendToMcastFunc(msg) } -// OnSendCalls gets all the calls that were made to OnSend. +// OnSendToMcastCalls gets all the calls that were made to OnSendToMcast. // Check the length with: // -// len(mockedMessageHandlerI.OnSendCalls()) -func (mock *MessageHandlerIMock) OnSendCalls() []struct { +// len(mockedMessageHandlerI.OnSendToMcastCalls()) +func (mock *MessageHandlerIMock) OnSendToMcastCalls() []struct { Msg wire.Message } { var calls []struct { Msg wire.Message } - mock.lockOnSend.RLock() - calls = mock.calls.OnSend - mock.lockOnSend.RUnlock() + mock.lockOnSendToMcast.RLock() + calls = mock.calls.OnSendToMcast + mock.lockOnSendToMcast.RUnlock() return calls } diff --git a/internal/multicast/tests/group_test.go b/internal/multicast/tests/group_test.go index be0e0f406..ec3dc2929 100644 --- a/internal/multicast/tests/group_test.go +++ b/internal/multicast/tests/group_test.go @@ -18,12 +18,12 @@ var ( func TestGroupCommunication(t *testing.T) { // given - lMsgHandler := &mocks.MessageHandlerIMock{OnReceiveFunc: func(_ wire.Message) {}} + lMsgHandler := &mocks.MessageHandlerIMock{OnReceiveFromMcastFunc: func(_ wire.Message) {}} listener := multicast.NewGroup[*wire.MsgPing](slog.Default(), lMsgHandler, addr, multicast.Read, bcNet) require.True(t, listener.Connect()) defer listener.Disconnect() - wMsgHandler := &mocks.MessageHandlerIMock{OnSendFunc: func(_ wire.Message) {}} + wMsgHandler := &mocks.MessageHandlerIMock{OnSendToMcastFunc: func(_ wire.Message) {}} writer := multicast.NewGroup[*wire.MsgPing](slog.Default(), wMsgHandler, addr, multicast.Write, bcNet) require.True(t, writer.Connect()) defer writer.Disconnect() @@ -35,11 +35,11 @@ func TestGroupCommunication(t *testing.T) { time.Sleep(200 * time.Millisecond) // then - sentMsgs := wMsgHandler.OnSendCalls() + sentMsgs := wMsgHandler.OnSendToMcastCalls() require.Len(t, sentMsgs, 1, "writer didn't send message") require.Equal(t, msg, (sentMsgs[0].Msg).(*wire.MsgPing)) - receivedMsgs := lMsgHandler.OnReceiveCalls() + receivedMsgs := lMsgHandler.OnReceiveFromMcastCalls() require.Len(t, receivedMsgs, 1, "listener didn't receive message") require.Equal(t, msg, (receivedMsgs[0].Msg).(*wire.MsgPing)) } diff --git a/test/config/config_mcast.yaml b/test/config/config_mcast.yaml new file mode 100644 index 000000000..9a6e8f8ef --- /dev/null +++ b/test/config/config_mcast.yaml @@ -0,0 +1,179 @@ +--- +logFormat: tint +logLevel: INFO +profilerAddr: localhost:9999 +prometheus: + enabled: true + endpoint: /metrics + addr: :2112 +grpcMessageSize: 100000000 +messageQueue: + streaming: + enabled: true + fileStorage: false + URL: nats://nats-1:4222,nats://nats-2:4223 +tracing: + enabled: false + sample: 100 + dialAddr: http://jaeger:4317 + +peerRpc: + password: bitcoin + user: bitcoin + host: node1 + port: 18332 + +cache: + engine: redis + redis: + addr: cache:6379 + password: "" + db: 1 + +metamorph: + listenAddr: "[::]:8001" + dialAddr: metamorph:8001 + db: + mode: postgres + postgres: + host: db + port: 5432 + name: main + user: arcuser + password: arcpass + maxIdleConns: 10 + maxOpenConns: 80 + sslMode: disable + processorCacheExpiryTime: 24h + maxRetries: 1000 + processStatusUpdateInterval: 50ms + monitorPeers: true + checkUtxos: false + profilerAddr: "[::]:9992" + health: + serverDialAddr: "[::]:8005" + minimumHealthyConnections: 2 + rejectCallbackContaining: [ "http://localhost", "https://localhost" ] + stats: + notSeenTimeLimit: 10m + notFinalTimeLimit: 20m + bcnet: + mode: hybrid + network: regtest + peers: + - host: node1 + port: + p2p: 18333 + zmq: 28332 + - host: node2 + port: + p2p: 18333 + zmq: 28332 + - host: node3 + port: + p2p: 18333 + zmq: 28332 + mcast: + tx: + address: "[ff05::1]:9999" + reject: + address: "[ff05::2]:9989" + +blocktx: + listenAddr: "[::]:8011" + dialAddr: blocktx:8011 + healthServerDialAddr: "[::]:8006" + db: + mode: postgres + postgres: + host: db + port: 5432 + name: main + user: arcuser + password: arcpass + maxIdleConns: 10 + maxOpenConns: 80 + sslMode: disable + recordRetentionDays: 28 + profilerAddr: "[::]:9993" + registerTxsInterval: 200ms + fillGaps: + enabled: true + interval: 120s + maxAllowedBlockHeightMismatch: 3 + bcnet: + mode: hybrid + network: regtest + peers: + - host: node1 + port: + p2p: 18333 + zmq: 28332 + - host: node2 + port: + p2p: 18333 + zmq: 28332 + - host: node3 + port: + p2p: 18333 + zmq: 28332 + mcast: + block: + address: "[ff05::3]:9979" + +api: + address: "[::]:9090" + wocApiKey: "mainnet_XXXXXXXXXXXXXXXXXXXX" + wocMainnet: false + requestExtendedLogs: true + processorCacheExpiryTime: 24h + defaultPolicy: + excessiveblocksize: 2000000000 + blockmaxsize: 512000000 + maxtxsizepolicy: 100000000 + maxorphantxsize: 1000000000 + datacarriersize: 4294967295 + maxscriptsizepolicy: 100000000 + maxopsperscriptpolicy: 4294967295 + maxscriptnumlengthpolicy: 10000 + maxpubkeyspermultisigpolicy: 4294967295 + maxtxsigopscountspolicy: 4294967295 + maxstackmemoryusagepolicy: 100000000 + maxstackmemoryusageconsensus: 200000000 + limitancestorcount: 10000 + limitcpfpgroupmemberscount: 25 + maxmempool: 2000000000 + maxmempoolsizedisk: 0 + mempoolmaxpercentcpfp: 10 + acceptnonstdoutputs: true + datacarrier: true + minminingtxfee: 1e-8 + maxstdtxvalidationduration: 3 + maxnonstdtxvalidationduration: 1000 + maxtxchainvalidationbudget: 50 + validationclockcpu: true + minconsolidationfactor: 20 + maxconsolidationinputscriptsize: 150 + minconfconsolidationinput: 6 + minconsolidationinputmaturity: 6 + acceptnonstdconsolidationinput: false + +callbacker: + listenAddr: "[::]:8021" + dialAddr: callbacker:8021 + health: + serverDialAddr: "[::]:8022" + db: + mode: postgres + postgres: + host: db + port: 5432 + name: main + user: arcuser + password: arcpass + maxIdleConns: 10 + maxOpenConns: 80 + sslMode: disable + failedCallbackCheckInterval: 1m + delayDuration: 5s + expiration: 24h diff --git a/test/init_test.go b/test/init_test.go index 061bbab8c..f2b40aa61 100644 --- a/test/init_test.go +++ b/test/init_test.go @@ -20,7 +20,6 @@ func TestMain(m *testing.M) { } log.Printf("current block height: %d", info.Blocks) - os.Exit(m.Run()) } @@ -50,7 +49,10 @@ func setupSut() { if info.Blocks < minNumbeOfBlocks { // generate blocks in part to ensure blocktx is able to process all blocks - const blockBatch = 20 // should be less or equal n*10 where n is number of blocktx instances + blockBatch := float64(20) + if os.Getenv("TEST_LOCAL_MCAST") != "" { + blockBatch = float64(4) + } for { _, err = bitcoind.Generate(blockBatch) @@ -72,6 +74,6 @@ func setupSut() { } } - time.Sleep(5 * time.Second) // wait for fillGaps to fill eventual gaps + time.Sleep(15 * time.Second) // wait for fillGaps to fill eventual gaps } } diff --git a/test/submit_02_batch_test.go b/test/submit_02_batch_test.go index e65b49b15..c43a88866 100644 --- a/test/submit_02_batch_test.go +++ b/test/submit_02_batch_test.go @@ -5,6 +5,7 @@ package test import ( "fmt" "net/http" + "os" "testing" "time" @@ -15,6 +16,10 @@ import ( ) func TestBatchChainedTxs(t *testing.T) { + if os.Getenv("TEST_LOCAL_MCAST") != "" { + t.Skip("Multicasting does't support chained txs yet") + } + t.Run("submit batch of chained transactions", func(t *testing.T) { address, privateKey := node_client.FundNewWallet(t, bitcoind) diff --git a/test/utils.go b/test/utils.go index 76da39afa..17b593964 100644 --- a/test/utils.go +++ b/test/utils.go @@ -301,7 +301,7 @@ func respondToCallback(w http.ResponseWriter, success bool) error { func testTxSubmission(t *testing.T, callbackURL string, token string, callbackBatch bool, tx *sdkTx.Transaction) { t.Helper() - + time.Sleep(100 * time.Millisecond) rawTx, err := tx.EFHex() require.NoError(t, err)