From 61fe0196aa0cb2bed3a71468de582984e97d5598 Mon Sep 17 00:00:00 2001 From: Arkadiusz Osowski Date: Tue, 31 Dec 2024 11:35:18 +0100 Subject: [PATCH] chore: document new logic --- cmd/arc/services/blocktx.go | 27 ++++++++ cmd/arc/services/metamorph.go | 29 +++++++++ internal/blocktx/bcnet/mcast/listener.go | 25 +++++++- internal/metamorph/bcnet/mcast/multicaster.go | 29 +++++++++ internal/metamorph/bcnet/mediator.go | 42 +++++++++---- internal/multicast/group.go | 62 ++++++++++++++----- 6 files changed, 184 insertions(+), 30 deletions(-) diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index da1832043..8dbd1dfd7 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -209,6 +209,33 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf return s, err } +// setupBcNetworkCommunication initializes the Bloctx blockchain network communication layer, configuring it +// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode. +// +// Parameters: +// - `l *slog.Logger`: Logger instance for logging events. +// - `arcConfig *config.ArcConfig`: Configuration object containing blockchain network settings. +// - `store store.BlocktxStore`: A storage interface for blockchain transactions. +// - `blockRequestCh chan<- blocktx_p2p.BlockRequest`: Channel for handling block requests. +// - `blockProcessCh chan<- *bcnet.BlockMessage`: Channel for processing block messages. +// +// Returns: +// - `manager *p2p.PeerManager`: Manages P2P peers. +// - `mcastListener *mcast.Listener`: Handles multicast communication, or `nil` if not in hybrid mode. +// - `err error`: Error if any issue occurs during setup. +// +// Key Details: +// - **Mode Handling**: +// - `"classic"` mode uses `blocktx_p2p.NewMsgHandler` for P2P communication only. +// - `"hybrid"` mode uses `blocktx_p2p.NewHybridMsgHandler` for P2P and multicast communication. +// +// - **Error Cleanup**: Ensures resources like peers and multicast listeners are properly cleaned up on errors. +// - **Peer Management**: Connects to configured peers and initializes a PeerManager for handling P2P connections. +// - **Multicast Communication**: In hybrid mode, joins a multicast group for blockchain updates and uses correct P2P message handler. +// +// Message Handlers: +// - `blocktx_p2p.NewMsgHandler`: Used in classic mode, handles all blockchain communication exclusively via P2P. +// - `blocktx_p2p.NewHybridMsgHandler`: Used in hybrid mode, seamlessly integrates P2P communication with multicast group updates. func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) { defer func() { // cleanup on error diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 8835f1d36..04d3d3e40 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -283,6 +283,35 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingC return s, err } +// setupMtmBcNetworkCommunication initializes the Metamorph blockchain network communication, configuring it +// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode. +// +// Parameters: +// - `l *slog.Logger`: Logger instance for event logging. +// - `s store.MetamorphStore`: Storage interface for Metamorph operations. +// - `arcConfig *config.ArcConfig`: Configuration object for blockchain network settings. +// - `mediatorOpts []bcnet.Option`: Additional options for the mediator. +// +// Returns: +// - `mediator *bcnet.Mediator`: Coordinates communication between P2P and multicast layers. +// - `messenger *p2p.NetworkMessenger`: Handles P2P message delivery - used by mediator only. +// - `manager *p2p.PeerManager`: Manages the lifecycle of P2P peers. +// - `multicaster *mcast.Multicaster`: Handles multicast message broadcasting and listening - used by mediator only. +// - `messageCh chan *metamorph_p2p.TxStatusMessage`: Channel for handling transaction status messages. +// - `err error`: Error if any part of the setup fails. +// +// Key Details: +// - **Mode Handling**: +// - `"classic"`: Uses `metamorph_p2p.NewMsgHandler` for exclusive P2P communication. +// - `"hybrid"`: Uses `metamorph_p2p.NewHybridMsgHandler` for integration of P2P and multicast group communication. +// +// - **Error Cleanup**: Cleans up resources such as the messenger, manager, and multicaster on failure. +// - **Peer Management**: Establishes connections to P2P peers and configures them with write handlers and buffer sizes. +// - **Multicast Communication**: In hybrid mode, joins multicast groups for transaction and rejection messages. +// +// Message Handlers: +// - `metamorph_p2p.NewMsgHandler`: Used in classic mode, handling all communication via P2P. +// - `metamorph_p2p.NewHybridMsgHandler`: Used in hybrid mode, integrating P2P communication with multicast group updates. func setupMtmBcNetworkCommunication(l *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig, mediatorOpts []bcnet.Option) ( mediator *bcnet.Mediator, messenger *p2p.NetworkMessenger, manager *p2p.PeerManager, multicaster *mcast.Multicaster, messageCh chan *metamorph_p2p.TxStatusMessage, err error) { diff --git a/internal/blocktx/bcnet/mcast/listener.go b/internal/blocktx/bcnet/mcast/listener.go index c79440ec2..3526203c8 100644 --- a/internal/blocktx/bcnet/mcast/listener.go +++ b/internal/blocktx/bcnet/mcast/listener.go @@ -16,6 +16,27 @@ var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to bloc var _ multicast.MessageHandlerI = (*Listener)(nil) +// Listener is a multicast message listener specifically designed for processing blocks messages. +// +// Responsibilities: +// - Connects to a multicast group and listens for block messages (`CmdBlock`). +// - Processes incoming block messages by marking them for processing in the storage to ensure that only one instance processes a block at a time. +// - Ignores non-block messages to optimize processing and maintain focus on relevant data. +// +// Key Methods: +// - `NewMcastListener`: Initializes a new Listener instance, setting up the multicast group for reading block messages. +// - `Connect`: Establishes the connection to the multicast group. +// - `Disconnect`: Leaves the multicast group. +// - `OnReceive`: Handles received multicast messages, verifying their type and ensuring proper processing of block messages. +// - `OnSend`: Placeholder for handling messages sent to the multicast group, currently ignored. +// +// Behavior: +// - On receiving a block message (`CmdBlock`): +// 1. Verifies the message type. +// 2. Tries to mark the block as being processed by the current instance. +// 3. Forwards the block message to the `receiveCh` for further handling. +// - Logs and gracefully handles errors in block processing, ensuring robustness in a distributed system. + type Listener struct { hostname string @@ -48,7 +69,7 @@ func (l *Listener) Disconnect() { l.blockGroup.Disconnect() } -// OnReceive should be fire & forget +// OnReceive handles received messages from multicast group func (l *Listener) OnReceive(msg wire.Message) { if msg.Command() == wire.CmdBlock { blockMsg, ok := msg.(*bcnet.BlockMessage) @@ -83,7 +104,7 @@ func (l *Listener) OnReceive(msg wire.Message) { // ignore other messages } -// OnSend should be fire & forget +// OnSend handles sent messages to multicast group func (l *Listener) OnSend(_ wire.Message) { // ignore } diff --git a/internal/metamorph/bcnet/mcast/multicaster.go b/internal/metamorph/bcnet/mcast/multicaster.go index cde598408..2ec1df586 100644 --- a/internal/metamorph/bcnet/mcast/multicaster.go +++ b/internal/metamorph/bcnet/mcast/multicaster.go @@ -15,6 +15,35 @@ var ErrTxRejectedByPeer = errors.New("transaction rejected by peer") var _ multicast.MessageHandlerI = (*Multicaster)(nil) +// Multicaster facilitates the transmission and reception of transaction and reject messages over multicast groups. +// +// Fields: +// - `messageCh`: Channel used to send status messages for transactions, such as acceptance or rejection. +// - `txGroup`: Multicast group for transmitting transaction (`MsgTx`) messages. +// - `rejectGroup`: Multicast group for receiving rejection (`MsgReject`) messages. +// +// Responsibilities: +// - Establishes and manages connections to multicast groups for sending and receiving blockchain transaction messages. +// - Handles the transmission of transactions and processes rejections received from the network. +// +// Key Methods: +// - `NewMulticaster`: Initializes a new `Multicaster` instance with specified multicast group addresses, network, and message channel. +// - `Connect`: Connects to both `txGroup` (for sending) and `rejectGroup` (for receiving). +// - `Disconnect`: Disconnects from all multicast groups, cleaning up resources. +// - `SendTx`: Encodes and sends a raw transaction to the multicast `txGroup`. +// - `OnReceive`: Processes messages received via the `rejectGroup` and updates the `messageCh` with rejection status. +// - `OnSend`: Processes messages sent via the `txGroup` and updates the `messageCh` with sent-to-network status. +// +// Behavior: +// - On receiving a `MsgReject` (`CmdReject`): +// 1. Extracts the rejection reason and transaction hash. +// 2. Sends a `TxStatusMessage` to the `messageCh` indicating rejection status. +// +// - On sending a `MsgTx` (`CmdTx`): +// 1. Extracts the transaction hash. +// 2. Sends a `TxStatusMessage` to the `messageCh` indicating the transaction was sent to the network. +// +// - Ignores unsupported or irrelevant message types for both sending and receiving. type Multicaster struct { logger *slog.Logger messageCh chan<- *metamorph_p2p.TxStatusMessage diff --git a/internal/metamorph/bcnet/mediator.go b/internal/metamorph/bcnet/mediator.go index 688ec6e8c..3b32d3857 100644 --- a/internal/metamorph/bcnet/mediator.go +++ b/internal/metamorph/bcnet/mediator.go @@ -13,6 +13,37 @@ import ( "go.opentelemetry.io/otel/attribute" ) +// Mediator acts as the central communication hub between metamorph processor and blockchain network, +// coordinating the interactions between the peer-to-peer messenger (p2p) and the multicast system (mcast). +// It is responsible for handling transactions and peer interactions depending on the operating mode (classic or hybrid). +// +// Fields: +// - `classic`: A flag indicating if the system is operating in classic mode (`true`) or hybrid mode (`false`). +// - `p2pMessenger`: The component responsible for managing peer-to-peer communications, including requesting and announcing transactions. +// - `mcaster`: The component responsible for sending transactions over multicast networks in hybrid mode. +// +// Methods: +// - `NewMediator`: Initializes a new `Mediator` with the specified logging, mode (classic or hybrid), +// p2p messenger, multicast system, and optional tracing attributes. +// - `AskForTxAsync`: Asynchronously requests a transaction by its hash from the network via P2P. +// - `AnnounceTxAsync`: Asynchronously announces a transaction to the network. +// In classic mode, it uses `p2pMessenger` to announce the transaction. In hybrid mode, it uses `mcaster` to send the transaction via multicast. +// +// Usage: +// - The `Mediator` abstracts the differences between classic (peer-to-peer) and hybrid (peer-to-peer and multicast) modes. +// - In classic mode, transactions are communicated directly with peers, whereas in hybrid mode, multicast groups are used for broader network communication. +// - Tracing functionality allows monitoring the flow of transactions and network requests, providing insights into system performance and behavior. +type Mediator struct { + logger *slog.Logger + classic bool + + p2pMessenger *p2p.NetworkMessenger + mcaster *mcast.Multicaster + + tracingEnabled bool + tracingAttributes []attribute.KeyValue +} + type Option func(*Mediator) func WithTracer(attr ...attribute.KeyValue) Option { @@ -28,17 +59,6 @@ func WithTracer(attr ...attribute.KeyValue) Option { } } -type Mediator struct { - logger *slog.Logger - classic bool - - p2pMessenger *p2p.NetworkMessenger - mcaster *mcast.Multicaster - - tracingEnabled bool - tracingAttributes []attribute.KeyValue -} - func NewMediator(l *slog.Logger, classic bool, messenger *p2p.NetworkMessenger, mcaster *mcast.Multicaster, opts ...Option) *Mediator { mode := "classic" if !classic { diff --git a/internal/multicast/group.go b/internal/multicast/group.go index 8d2f40a5d..6f0bc860a 100644 --- a/internal/multicast/group.go +++ b/internal/multicast/group.go @@ -13,23 +13,22 @@ import ( "golang.org/x/net/ipv6" ) -type ModeFlag uint8 - -const ( - Read ModeFlag = 1 << iota - Write -) - -func (flag ModeFlag) Has(v ModeFlag) bool { - return v&flag != 0 -} - -type MessageHandlerI interface { - // OnReceive should be fire & forget - OnReceive(msg wire.Message) - // OnSend should be fire & forget - OnSend(msg wire.Message) -} +// WORK IN PROGRESS +// +// +// The Group structure represents a multicast communication group for exchanging messages +// over a network. It is designed to support both reading and writing messages through the +// IPv6 multicast group. The Group abstracts the complexities of managing connections, handling +// messages, and interfacing with the multicast group. Key features include: +// +// - **Multicast Address Management:** Supports setting up multicast UDP connections, +// joining groups, and resolving addresses. +// - **Mode Configurability:** Allows configuring the group in read-only, write-only, or +// read-write mode through the `ModeFlag`. +// - **Transparent Communication:** Ensures seamless handling of protocol-specific +// messages via handlers and encapsulated logic. +// - **Error Handling and Logging:** Integrates structured logging for diagnostics and error +// tracking, aiding maintainability and debugging. type Group[T wire.Message] struct { execWg sync.WaitGroup @@ -53,6 +52,35 @@ type Group[T wire.Message] struct { mh MessageHandlerI } +// MessageHandlerI is an interface that defines the contract for handling messages +// in the multicast group communication. It provides two primary methods: +// +// - **OnReceive(msg wire.Message):** Triggered when a message is received from the +// multicast group. This method should be implemented to handle received messages in +// a fire-and-forget manner, ensuring that processing does not block the main communication flow. +// +// - **OnSend(msg wire.Message):** Triggered when a message is successfully sent to the +// multicast group. This method allows implementing custom actions or logging after +// message transmission, also in a fire-and-forget manner. +// +// By defining this interface, the Group structure decouples the message handling logic +// from the underlying communication mechanism, providing extensibility and modularity. +type MessageHandlerI interface { + OnReceive(msg wire.Message) + OnSend(msg wire.Message) +} + +type ModeFlag uint8 + +const ( + Read ModeFlag = 1 << iota + Write +) + +func (flag ModeFlag) Has(v ModeFlag) bool { + return v&flag != 0 +} + func NewGroup[T wire.Message](l *slog.Logger, mh MessageHandlerI, addr string, mode ModeFlag, network wire.BitcoinNet /*TODO: add opts*/) *Group[T] { var tmp T l = l.With(