From 95d46ff410cdee5530babfd4dce3a7b95f3f907a Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Fri, 31 May 2024 00:33:05 -0400 Subject: [PATCH] reduce number of leaked go routines. This helps the current race condition a lot. --- net/node.go | 76 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/net/node.go b/net/node.go index ffd60e52fb..7b403720e6 100644 --- a/net/node.go +++ b/net/node.go @@ -70,8 +70,9 @@ type Node struct { // receives an event when a pushLog request has been processed. pushLogEvent chan EvtReceivedPushLog - ctx context.Context - cancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc + dhtClose func() error } // NewNode creates a new network node instance of DefraDB, wired into libp2p. @@ -101,8 +102,11 @@ func NewNode( fin := finalizer.NewFinalizer() + ctx, cancel := context.WithCancel(ctx) + peerstore, err := pstoreds.NewPeerstore(ctx, db.Peerstore(), pstoreds.DefaultOpts()) if err != nil { + cancel() return nil, fin.Cleanup(err) } fin.Add(peerstore) @@ -111,6 +115,7 @@ func NewNode( // generate an ephemeral private key key, err := crypto.GenerateEd25519() if err != nil { + cancel() return nil, fin.Cleanup(err) } options.PrivateKey = key @@ -119,6 +124,7 @@ func NewNode( // unmarshal the private key bytes privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey) if err != nil { + cancel() return nil, fin.Cleanup(err) } @@ -149,6 +155,7 @@ func NewNode( h, err := libp2p.New(libp2pOpts...) if err != nil { + cancel() return nil, fin.Cleanup(err) } log.InfoContext( @@ -167,12 +174,10 @@ func NewNode( pubsub.WithFloodPublish(true), ) if err != nil { + cancel() return nil, fin.Cleanup(err) } } - - ctx, cancel := context.WithCancel(ctx) - peer, err := NewPeer( ctx, db, @@ -201,6 +206,7 @@ func NewNode( DB: db, ctx: ctx, cancel: cancel, + dhtClose: ddht.Close, } n.subscribeToPeerConnectionEvents() @@ -268,12 +274,21 @@ func (n *Node) subscribeToPeerConnectionEvents() { return } go func() { - for e := range sub.Out() { + for { select { - case n.peerEvent <- e.(event.EvtPeerConnectednessChanged): - default: - <-n.peerEvent - n.peerEvent <- e.(event.EvtPeerConnectednessChanged) + case <-n.ctx.Done(): + sub.Close() + return + case e, ok := <-sub.Out(): + if !ok { + return + } + select { + case n.peerEvent <- e.(event.EvtPeerConnectednessChanged): + default: + <-n.peerEvent + n.peerEvent <- e.(event.EvtPeerConnectednessChanged) + } } } }() @@ -290,12 +305,21 @@ func (n *Node) subscribeToPubSubEvents() { return } go func() { - for e := range sub.Out() { + for { select { - case n.pubSubEvent <- e.(EvtPubSub): - default: - <-n.pubSubEvent - n.pubSubEvent <- e.(EvtPubSub) + case <-n.ctx.Done(): + sub.Close() + return + case e, ok := <-sub.Out(): + if !ok { + return + } + select { + case n.pubSubEvent <- e.(EvtPubSub): + default: + <-n.pubSubEvent + n.pubSubEvent <- e.(EvtPubSub) + } } } }() @@ -312,12 +336,21 @@ func (n *Node) subscribeToPushLogEvents() { return } go func() { - for e := range sub.Out() { + for { select { - case n.pushLogEvent <- e.(EvtReceivedPushLog): - default: - <-n.pushLogEvent - n.pushLogEvent <- e.(EvtReceivedPushLog) + case <-n.ctx.Done(): + sub.Close() + return + case e, ok := <-sub.Out(): + if !ok { + return + } + select { + case n.pushLogEvent <- e.(EvtReceivedPushLog): + default: + <-n.pushLogEvent + n.pushLogEvent <- e.(EvtReceivedPushLog) + } } } }() @@ -428,5 +461,8 @@ func (n Node) Close() { if n.Peer != nil { n.Peer.Close() } + if n.dhtClose != nil { + n.dhtClose() + } n.DB.Close() }