Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed Nov 27, 2024
1 parent e9914e3 commit 42ff91c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
44 changes: 24 additions & 20 deletions packages/taiko-client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,27 +146,31 @@ func (d *Driver) Start() error {

if d.p2pnetwork != nil {
go d.p2pnetwork.DiscoverPeers(d.ctx)
go p2p.JoinTopic[softblocks.TransactionBatch](d.ctx, d.p2pnetwork, p2p.TopicNameSoftBlocks, func(ctx context.Context, txBatch softblocks.TransactionBatch) error {
_, err := d.l2ChainSyncer.BlobSyncer().InsertSoftBlockFromTransactionsBatch(
ctx,
txBatch.BlockID,
txBatch.ID,
d.txListDecompressor.TryDecompress(
d.rpc.L2.ChainID,
new(big.Int).SetUint64(txBatch.BlockID),
txBatch.TransactionsList,
true,
),
txBatch.BatchMarker,
txBatch.BlockParams,
)

if err != nil {
slog.Error("error inserting soft block", "error", err)
go func() {
if err := p2p.JoinTopic[softblocks.TransactionBatch](d.ctx, d.p2pnetwork, p2p.TopicNameSoftBlocks, func(ctx context.Context, txBatch softblocks.TransactionBatch) error {
_, err := d.l2ChainSyncer.BlobSyncer().InsertSoftBlockFromTransactionsBatch(
ctx,
txBatch.BlockID,
txBatch.ID,
d.txListDecompressor.TryDecompress(
d.rpc.L2.ChainID,
new(big.Int).SetUint64(txBatch.BlockID),
txBatch.TransactionsList,
true,
),
txBatch.BatchMarker,
txBatch.BlockParams,
)

if err != nil {
slog.Error("error inserting soft block", "error", err)
}

return err
}); err != nil {
log.Error("Failed to join soft block topic", "error", err)
}

return err
})
}()
}

return nil
Expand Down
19 changes: 8 additions & 11 deletions packages/taiko-client/pkg/p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const rendezvous = "taiko-p2p"

const TopicNameSoftBlocks = "soft-blocks"

const defaultMessageDiscoveryInterval = 1 * time.Second

type topicHandlerFunc[T any] func(context.Context, T) error

type Network struct {
Expand All @@ -36,15 +34,15 @@ type Network struct {
routingDiscovery *discovery.RoutingDiscovery
topics map[string]*pubsub.Topic
topicHandlers map[string]any
bootstrapNodeUrl string
bootstrapNodeURL string
localFullAddr string
fullAddr string
peers []*peer.AddrInfo
peersMutex sync.Mutex
receivedMessages int
}

func NewNetwork(ctx context.Context, bootstrapNodeUrl string, port uint64) (*Network, error) {
func NewNetwork(ctx context.Context, bootstrapNodeURL string, port uint64) (*Network, error) {
host, err := libp2p.New(
libp2p.ListenAddrs(multiaddr.StringCast(fmt.Sprintf("/ip4/0.0.0.0/tcp/%v", port))),
)
Expand All @@ -69,15 +67,15 @@ func NewNetwork(ctx context.Context, bootstrapNodeUrl string, port uint64) (*Net
host: host,
topics: make(map[string]*pubsub.Topic),
topicHandlers: make(map[string]any),
bootstrapNodeUrl: bootstrapNodeUrl,
bootstrapNodeURL: bootstrapNodeURL,
localFullAddr: localFullAddr,
fullAddr: fullAddr,
peers: make([]*peer.AddrInfo, 0),
}

n.acceptIncomingPeers()

err = bootstrapDHT(ctx, n, bootstrapNodeUrl, host, kademliaDHT)
err = bootstrapDHT(ctx, n, bootstrapNodeURL, host, kademliaDHT)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,14 +128,14 @@ func (n *Network) startAdvertising(ctx context.Context) {
case <-ctx.Done():
return
case <-t.C:
n.routingDiscovery.Advertise(ctx, rendezvous)
_, _ = n.routingDiscovery.Advertise(ctx, rendezvous)
}
}
}

func (n *Network) acceptIncomingPeers() {
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
ConnectedF: func(_ network.Network, conn network.Conn) {
peerID := conn.RemotePeer()

addrInfo := peer.AddrInfo{
Expand Down Expand Up @@ -206,7 +204,7 @@ func (n *Network) DiscoverPeers(ctx context.Context) {
// Publish a message to the network
func Publish[T any](ctx context.Context, n *Network, topicName string, msg T) error {
if n.topics[topicName] == nil {
return errors.New("Topic not found")
return errors.New("topic not registered")
}

data, err := json.Marshal(msg)
Expand All @@ -223,7 +221,7 @@ func Publish[T any](ctx context.Context, n *Network, topicName string, msg T) er
return nil
}

func JoinTopic[T any](ctx context.Context, n *Network, topicName string, topicHandler topicHandlerFunc[T]) error {
func JoinTopic[T any](_ context.Context, n *Network, topicName string, topicHandler topicHandlerFunc[T]) error {
topic, err := n.ps.Join(topicName)
if err != nil {
return err
Expand Down Expand Up @@ -298,7 +296,6 @@ func bootstrapDHT(ctx context.Context, n *Network, addr string, host host.Host,
if err := host.Connect(ctx, *peerInfo); err != nil {
slog.Error("unable to connect to bootstrap peer", "peerID", peerInfo.ID, "err", err)
} else {

n.peersMutex.Lock()
// Check if peer is already added
for _, p := range n.peers {
Expand Down
9 changes: 5 additions & 4 deletions packages/taiko-client/pkg/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,23 @@ func Test_Network(t *testing.T) {

defer cancel()

assert.Nil(t, JoinTopic(context.Background(), n, "test", func(ctx context.Context, data []byte) error {
assert.Nil(t, JoinTopic(context.Background(), n, "test", func(_ context.Context, data []byte) error {
slog.Info("Node n received message", "data", string(data))
assert.Equal(t, data, []byte("hello"))
return nil
}))

assert.Nil(t, JoinTopic(context.Background(), n2, "test", func(ctx context.Context, data []byte) error {
assert.Nil(t, JoinTopic(context.Background(), n2, "test", func(_ context.Context, data []byte) error {
slog.Info("Node n2 received message", "data", string(data))
assert.Equal(t, data, []byte("hello"))
return nil
}))

go SubscribeToTopic[[]byte](ctx, n, "test")
go func() {
assert.Nil(t, SubscribeToTopic[[]byte](ctx, n, "test"))
}()

assert.Nil(t, Publish(context.Background(), n2, "test", []byte("hello")))

assert.Equal(t, 1, n.receivedMessages)

}

0 comments on commit 42ff91c

Please sign in to comment.