From ae018dc23b485a770a7b0580375e2f42f819e276 Mon Sep 17 00:00:00 2001 From: Javad Date: Thu, 16 Jan 2025 18:25:52 +0330 Subject: [PATCH] feat: add publisher raw block --- www/zmq/publisher_block_info.go | 2 + www/zmq/publisher_raw_block.go | 29 +++++++++++-- www/zmq/publisher_raw_block_test.go | 65 +++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 www/zmq/publisher_raw_block_test.go diff --git a/www/zmq/publisher_block_info.go b/www/zmq/publisher_block_info.go index b3118fba9..963e3b47c 100644 --- a/www/zmq/publisher_block_info.go +++ b/www/zmq/publisher_block_info.go @@ -33,6 +33,8 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) { if err := b.zmqSocket.Send(message); err != nil { b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName()) + + return } b.logger.Debug("zmq published message success", diff --git a/www/zmq/publisher_raw_block.go b/www/zmq/publisher_raw_block.go index 1b75a852c..4d7b26335 100644 --- a/www/zmq/publisher_raw_block.go +++ b/www/zmq/publisher_raw_block.go @@ -1,6 +1,8 @@ package zmq import ( + "bytes" + "github.com/go-zeromq/zmq4" "github.com/pactus-project/pactus/types/block" "github.com/pactus-project/pactus/util/logger" @@ -20,7 +22,28 @@ func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { } } -func (*rawBlockPub) onNewBlock(_ *block.Block) { - // TODO implement me - panic("implement me") +func (r *rawBlockPub) onNewBlock(blk *block.Block) { + rawHeader := make([]byte, 0) + buf := bytes.NewBuffer(rawHeader) + + if err := blk.Header().Encode(buf); err != nil { + r.logger.Error("failed to encode block header", "err", err, "publisher", r.TopicName()) + + return + } + + rawMsg := r.makeTopicMsg(buf.Bytes(), blk.Height()) + message := zmq4.NewMsg(rawMsg) + + if err := r.zmqSocket.Send(message); err != nil { + r.logger.Error("zmq publish message error", "err", err, "publisher", r.TopicName()) + + return + } + + r.logger.Debug("zmq published message success", + "publisher", r.TopicName(), + "block_height", blk.Height()) + + r.seqNo++ } diff --git a/www/zmq/publisher_raw_block_test.go b/www/zmq/publisher_raw_block_test.go new file mode 100644 index 000000000..997d41155 --- /dev/null +++ b/www/zmq/publisher_raw_block_test.go @@ -0,0 +1,65 @@ +package zmq + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "testing" + + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/require" +) + +func TestRawBlockPublisher(t *testing.T) { + port := testsuite.FindFreePort() + addr := fmt.Sprintf("tcp://localhost:%d", port) + conf := DefaultConfig() + conf.ZmqPubRawBlock = addr + + td := setup(t, conf) + defer td.closeServer() + + td.server.Publishers() + + sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false)) + + err := sub.Dial(addr) + require.NoError(t, err) + + err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawBlock.Bytes())) + require.NoError(t, err) + + blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight()) + + td.eventCh <- blk + + received, err := sub.Recv() + require.NoError(t, err) + + require.NotNil(t, received.Frames) + require.GreaterOrEqual(t, len(received.Frames), 1) + + msg := received.Frames[0] + + topic := msg[:2] + blockHeader := msg[2 : len(msg)-8] + height := binary.BigEndian.Uint32(msg[140 : len(msg)-4]) + seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:]) + + buf := bytes.NewBuffer(blockHeader) + header := new(block.Header) + + require.NoError(t, header.Decode(buf)) + + require.NotNil(t, header) + require.Equal(t, uint32(0), seqNo) + require.Equal(t, blk.Height(), height) + require.Equal(t, TopicRawBlock.Bytes(), topic) + require.Equal(t, header.PrevBlockHash(), blk.Header().PrevBlockHash()) + require.Equal(t, header.StateRoot(), blk.Header().StateRoot()) + + require.NoError(t, sub.Close()) +}