Skip to content

Commit

Permalink
chore: document new logic
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 31, 2024
1 parent 21082c8 commit 61fe019
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 30 deletions.
27 changes: 27 additions & 0 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,33 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
return s, err
}

// setupBcNetworkCommunication initializes the Bloctx blockchain network communication layer, configuring it
// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode.
//
// Parameters:
// - `l *slog.Logger`: Logger instance for logging events.
// - `arcConfig *config.ArcConfig`: Configuration object containing blockchain network settings.
// - `store store.BlocktxStore`: A storage interface for blockchain transactions.
// - `blockRequestCh chan<- blocktx_p2p.BlockRequest`: Channel for handling block requests.
// - `blockProcessCh chan<- *bcnet.BlockMessage`: Channel for processing block messages.
//
// Returns:
// - `manager *p2p.PeerManager`: Manages P2P peers.
// - `mcastListener *mcast.Listener`: Handles multicast communication, or `nil` if not in hybrid mode.
// - `err error`: Error if any issue occurs during setup.
//
// Key Details:
// - **Mode Handling**:
// - `"classic"` mode uses `blocktx_p2p.NewMsgHandler` for P2P communication only.
// - `"hybrid"` mode uses `blocktx_p2p.NewHybridMsgHandler` for P2P and multicast communication.
//
// - **Error Cleanup**: Ensures resources like peers and multicast listeners are properly cleaned up on errors.
// - **Peer Management**: Connects to configured peers and initializes a PeerManager for handling P2P connections.
// - **Multicast Communication**: In hybrid mode, joins a multicast group for blockchain updates and uses correct P2P message handler.
//
// Message Handlers:
// - `blocktx_p2p.NewMsgHandler`: Used in classic mode, handles all blockchain communication exclusively via P2P.
// - `blocktx_p2p.NewHybridMsgHandler`: Used in hybrid mode, seamlessly integrates P2P communication with multicast group updates.
func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) {
defer func() {
// cleanup on error
Expand Down
29 changes: 29 additions & 0 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,35 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingC
return s, err
}

// setupMtmBcNetworkCommunication initializes the Metamorph blockchain network communication, configuring it
// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode.
//
// Parameters:
// - `l *slog.Logger`: Logger instance for event logging.
// - `s store.MetamorphStore`: Storage interface for Metamorph operations.
// - `arcConfig *config.ArcConfig`: Configuration object for blockchain network settings.
// - `mediatorOpts []bcnet.Option`: Additional options for the mediator.
//
// Returns:
// - `mediator *bcnet.Mediator`: Coordinates communication between P2P and multicast layers.
// - `messenger *p2p.NetworkMessenger`: Handles P2P message delivery - used by mediator only.
// - `manager *p2p.PeerManager`: Manages the lifecycle of P2P peers.
// - `multicaster *mcast.Multicaster`: Handles multicast message broadcasting and listening - used by mediator only.
// - `messageCh chan *metamorph_p2p.TxStatusMessage`: Channel for handling transaction status messages.
// - `err error`: Error if any part of the setup fails.
//
// Key Details:
// - **Mode Handling**:
// - `"classic"`: Uses `metamorph_p2p.NewMsgHandler` for exclusive P2P communication.
// - `"hybrid"`: Uses `metamorph_p2p.NewHybridMsgHandler` for integration of P2P and multicast group communication.
//
// - **Error Cleanup**: Cleans up resources such as the messenger, manager, and multicaster on failure.
// - **Peer Management**: Establishes connections to P2P peers and configures them with write handlers and buffer sizes.
// - **Multicast Communication**: In hybrid mode, joins multicast groups for transaction and rejection messages.
//
// Message Handlers:
// - `metamorph_p2p.NewMsgHandler`: Used in classic mode, handling all communication via P2P.
// - `metamorph_p2p.NewHybridMsgHandler`: Used in hybrid mode, integrating P2P communication with multicast group updates.
func setupMtmBcNetworkCommunication(l *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig, mediatorOpts []bcnet.Option) (
mediator *bcnet.Mediator, messenger *p2p.NetworkMessenger, manager *p2p.PeerManager, multicaster *mcast.Multicaster,
messageCh chan *metamorph_p2p.TxStatusMessage, err error) {
Expand Down
25 changes: 23 additions & 2 deletions internal/blocktx/bcnet/mcast/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to bloc

var _ multicast.MessageHandlerI = (*Listener)(nil)

// Listener is a multicast message listener specifically designed for processing blocks messages.
//
// Responsibilities:
// - Connects to a multicast group and listens for block messages (`CmdBlock`).
// - Processes incoming block messages by marking them for processing in the storage to ensure that only one instance processes a block at a time.
// - Ignores non-block messages to optimize processing and maintain focus on relevant data.
//
// Key Methods:
// - `NewMcastListener`: Initializes a new Listener instance, setting up the multicast group for reading block messages.
// - `Connect`: Establishes the connection to the multicast group.
// - `Disconnect`: Leaves the multicast group.
// - `OnReceive`: Handles received multicast messages, verifying their type and ensuring proper processing of block messages.
// - `OnSend`: Placeholder for handling messages sent to the multicast group, currently ignored.
//
// Behavior:
// - On receiving a block message (`CmdBlock`):
// 1. Verifies the message type.
// 2. Tries to mark the block as being processed by the current instance.
// 3. Forwards the block message to the `receiveCh` for further handling.
// - Logs and gracefully handles errors in block processing, ensuring robustness in a distributed system.

type Listener struct {
hostname string

Expand Down Expand Up @@ -48,7 +69,7 @@ func (l *Listener) Disconnect() {
l.blockGroup.Disconnect()
}

// OnReceive should be fire & forget
// OnReceive handles received messages from multicast group
func (l *Listener) OnReceive(msg wire.Message) {
if msg.Command() == wire.CmdBlock {
blockMsg, ok := msg.(*bcnet.BlockMessage)
Expand Down Expand Up @@ -83,7 +104,7 @@ func (l *Listener) OnReceive(msg wire.Message) {
// ignore other messages
}

// OnSend should be fire & forget
// OnSend handles sent messages to multicast group
func (l *Listener) OnSend(_ wire.Message) {
// ignore
}
29 changes: 29 additions & 0 deletions internal/metamorph/bcnet/mcast/multicaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@ var ErrTxRejectedByPeer = errors.New("transaction rejected by peer")

var _ multicast.MessageHandlerI = (*Multicaster)(nil)

// Multicaster facilitates the transmission and reception of transaction and reject messages over multicast groups.
//
// Fields:
// - `messageCh`: Channel used to send status messages for transactions, such as acceptance or rejection.
// - `txGroup`: Multicast group for transmitting transaction (`MsgTx`) messages.
// - `rejectGroup`: Multicast group for receiving rejection (`MsgReject`) messages.
//
// Responsibilities:
// - Establishes and manages connections to multicast groups for sending and receiving blockchain transaction messages.
// - Handles the transmission of transactions and processes rejections received from the network.
//
// Key Methods:
// - `NewMulticaster`: Initializes a new `Multicaster` instance with specified multicast group addresses, network, and message channel.
// - `Connect`: Connects to both `txGroup` (for sending) and `rejectGroup` (for receiving).
// - `Disconnect`: Disconnects from all multicast groups, cleaning up resources.
// - `SendTx`: Encodes and sends a raw transaction to the multicast `txGroup`.
// - `OnReceive`: Processes messages received via the `rejectGroup` and updates the `messageCh` with rejection status.
// - `OnSend`: Processes messages sent via the `txGroup` and updates the `messageCh` with sent-to-network status.
//
// Behavior:
// - On receiving a `MsgReject` (`CmdReject`):
// 1. Extracts the rejection reason and transaction hash.
// 2. Sends a `TxStatusMessage` to the `messageCh` indicating rejection status.
//
// - On sending a `MsgTx` (`CmdTx`):
// 1. Extracts the transaction hash.
// 2. Sends a `TxStatusMessage` to the `messageCh` indicating the transaction was sent to the network.
//
// - Ignores unsupported or irrelevant message types for both sending and receiving.
type Multicaster struct {
logger *slog.Logger
messageCh chan<- *metamorph_p2p.TxStatusMessage
Expand Down
42 changes: 31 additions & 11 deletions internal/metamorph/bcnet/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// Mediator acts as the central communication hub between metamorph processor and blockchain network,
// coordinating the interactions between the peer-to-peer messenger (p2p) and the multicast system (mcast).
// It is responsible for handling transactions and peer interactions depending on the operating mode (classic or hybrid).
//
// Fields:
// - `classic`: A flag indicating if the system is operating in classic mode (`true`) or hybrid mode (`false`).
// - `p2pMessenger`: The component responsible for managing peer-to-peer communications, including requesting and announcing transactions.
// - `mcaster`: The component responsible for sending transactions over multicast networks in hybrid mode.
//
// Methods:
// - `NewMediator`: Initializes a new `Mediator` with the specified logging, mode (classic or hybrid),
// p2p messenger, multicast system, and optional tracing attributes.
// - `AskForTxAsync`: Asynchronously requests a transaction by its hash from the network via P2P.
// - `AnnounceTxAsync`: Asynchronously announces a transaction to the network.
// In classic mode, it uses `p2pMessenger` to announce the transaction. In hybrid mode, it uses `mcaster` to send the transaction via multicast.
//
// Usage:
// - The `Mediator` abstracts the differences between classic (peer-to-peer) and hybrid (peer-to-peer and multicast) modes.
// - In classic mode, transactions are communicated directly with peers, whereas in hybrid mode, multicast groups are used for broader network communication.
// - Tracing functionality allows monitoring the flow of transactions and network requests, providing insights into system performance and behavior.
type Mediator struct {
logger *slog.Logger
classic bool

p2pMessenger *p2p.NetworkMessenger
mcaster *mcast.Multicaster

tracingEnabled bool
tracingAttributes []attribute.KeyValue
}

type Option func(*Mediator)

func WithTracer(attr ...attribute.KeyValue) Option {
Expand All @@ -28,17 +59,6 @@ func WithTracer(attr ...attribute.KeyValue) Option {
}
}

type Mediator struct {
logger *slog.Logger
classic bool

p2pMessenger *p2p.NetworkMessenger
mcaster *mcast.Multicaster

tracingEnabled bool
tracingAttributes []attribute.KeyValue
}

func NewMediator(l *slog.Logger, classic bool, messenger *p2p.NetworkMessenger, mcaster *mcast.Multicaster, opts ...Option) *Mediator {
mode := "classic"
if !classic {
Expand Down
62 changes: 45 additions & 17 deletions internal/multicast/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@ import (
"golang.org/x/net/ipv6"
)

type ModeFlag uint8

const (
Read ModeFlag = 1 << iota
Write
)

func (flag ModeFlag) Has(v ModeFlag) bool {
return v&flag != 0
}

type MessageHandlerI interface {
// OnReceive should be fire & forget
OnReceive(msg wire.Message)
// OnSend should be fire & forget
OnSend(msg wire.Message)
}
// WORK IN PROGRESS
//
//
// The Group structure represents a multicast communication group for exchanging messages
// over a network. It is designed to support both reading and writing messages through the
// IPv6 multicast group. The Group abstracts the complexities of managing connections, handling
// messages, and interfacing with the multicast group. Key features include:
//
// - **Multicast Address Management:** Supports setting up multicast UDP connections,
// joining groups, and resolving addresses.
// - **Mode Configurability:** Allows configuring the group in read-only, write-only, or
// read-write mode through the `ModeFlag`.
// - **Transparent Communication:** Ensures seamless handling of protocol-specific
// messages via handlers and encapsulated logic.
// - **Error Handling and Logging:** Integrates structured logging for diagnostics and error
// tracking, aiding maintainability and debugging.

type Group[T wire.Message] struct {
execWg sync.WaitGroup
Expand All @@ -53,6 +52,35 @@ type Group[T wire.Message] struct {
mh MessageHandlerI
}

// MessageHandlerI is an interface that defines the contract for handling messages
// in the multicast group communication. It provides two primary methods:
//
// - **OnReceive(msg wire.Message):** Triggered when a message is received from the
// multicast group. This method should be implemented to handle received messages in
// a fire-and-forget manner, ensuring that processing does not block the main communication flow.
//
// - **OnSend(msg wire.Message):** Triggered when a message is successfully sent to the
// multicast group. This method allows implementing custom actions or logging after
// message transmission, also in a fire-and-forget manner.
//
// By defining this interface, the Group structure decouples the message handling logic
// from the underlying communication mechanism, providing extensibility and modularity.
type MessageHandlerI interface {
OnReceive(msg wire.Message)
OnSend(msg wire.Message)
}

type ModeFlag uint8

const (
Read ModeFlag = 1 << iota
Write
)

func (flag ModeFlag) Has(v ModeFlag) bool {
return v&flag != 0
}

func NewGroup[T wire.Message](l *slog.Logger, mh MessageHandlerI, addr string, mode ModeFlag, network wire.BitcoinNet /*TODO: add opts*/) *Group[T] {
var tmp T
l = l.With(
Expand Down

0 comments on commit 61fe019

Please sign in to comment.