Skip to content

Commit

Permalink
reduce number of leaked go routines.
Browse files Browse the repository at this point in the history
This helps the current race condition a lot.
  • Loading branch information
fredcarle committed May 31, 2024
1 parent b58687a commit 95d46ff
Showing 1 changed file with 56 additions and 20 deletions.
76 changes: 56 additions & 20 deletions net/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type Node struct {
// receives an event when a pushLog request has been processed.
pushLogEvent chan EvtReceivedPushLog

ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
dhtClose func() error
}

// NewNode creates a new network node instance of DefraDB, wired into libp2p.
Expand Down Expand Up @@ -101,8 +102,11 @@ func NewNode(

fin := finalizer.NewFinalizer()

ctx, cancel := context.WithCancel(ctx)

peerstore, err := pstoreds.NewPeerstore(ctx, db.Peerstore(), pstoreds.DefaultOpts())
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
fin.Add(peerstore)
Expand All @@ -111,6 +115,7 @@ func NewNode(
// generate an ephemeral private key
key, err := crypto.GenerateEd25519()
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
options.PrivateKey = key
Expand All @@ -119,6 +124,7 @@ func NewNode(
// unmarshal the private key bytes
privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}

Expand Down Expand Up @@ -149,6 +155,7 @@ func NewNode(

h, err := libp2p.New(libp2pOpts...)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
log.InfoContext(
Expand All @@ -167,12 +174,10 @@ func NewNode(
pubsub.WithFloodPublish(true),
)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
}

ctx, cancel := context.WithCancel(ctx)

peer, err := NewPeer(
ctx,
db,
Expand Down Expand Up @@ -201,6 +206,7 @@ func NewNode(
DB: db,
ctx: ctx,
cancel: cancel,
dhtClose: ddht.Close,
}

n.subscribeToPeerConnectionEvents()
Expand Down Expand Up @@ -268,12 +274,21 @@ func (n *Node) subscribeToPeerConnectionEvents() {
return
}
go func() {
for e := range sub.Out() {
for {
select {
case n.peerEvent <- e.(event.EvtPeerConnectednessChanged):
default:
<-n.peerEvent
n.peerEvent <- e.(event.EvtPeerConnectednessChanged)
case <-n.ctx.Done():
sub.Close()
return
case e, ok := <-sub.Out():
if !ok {
return
}
select {
case n.peerEvent <- e.(event.EvtPeerConnectednessChanged):
default:
<-n.peerEvent
n.peerEvent <- e.(event.EvtPeerConnectednessChanged)
}
}
}
}()
Expand All @@ -290,12 +305,21 @@ func (n *Node) subscribeToPubSubEvents() {
return
}
go func() {
for e := range sub.Out() {
for {
select {
case n.pubSubEvent <- e.(EvtPubSub):
default:
<-n.pubSubEvent
n.pubSubEvent <- e.(EvtPubSub)
case <-n.ctx.Done():
sub.Close()
return
case e, ok := <-sub.Out():
if !ok {
return
}
select {
case n.pubSubEvent <- e.(EvtPubSub):
default:
<-n.pubSubEvent
n.pubSubEvent <- e.(EvtPubSub)
}
}
}
}()
Expand All @@ -312,12 +336,21 @@ func (n *Node) subscribeToPushLogEvents() {
return
}
go func() {
for e := range sub.Out() {
for {
select {
case n.pushLogEvent <- e.(EvtReceivedPushLog):
default:
<-n.pushLogEvent
n.pushLogEvent <- e.(EvtReceivedPushLog)
case <-n.ctx.Done():
sub.Close()
return
case e, ok := <-sub.Out():
if !ok {
return
}
select {
case n.pushLogEvent <- e.(EvtReceivedPushLog):
default:
<-n.pushLogEvent
n.pushLogEvent <- e.(EvtReceivedPushLog)
}
}
}
}()
Expand Down Expand Up @@ -428,5 +461,8 @@ func (n Node) Close() {
if n.Peer != nil {
n.Peer.Close()
}
if n.dhtClose != nil {
n.dhtClose()
}
n.DB.Close()
}

0 comments on commit 95d46ff

Please sign in to comment.