Skip to content

Commit

Permalink
Changes based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Nov 21, 2024
1 parent d43b392 commit 40baf65
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 34 deletions.
26 changes: 12 additions & 14 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,30 +124,28 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
r.LaunchThread(func(ctx context.Context) {
for {
runChan := make(chan struct{}, 1)
err := stopwaiter.CallIterativelyWith[struct{}](
&r.StopWaiterSafe,
func(ctx context.Context, ignored struct{}) time.Duration {
err := r.run(ctx, hadError)
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in inbox reader because transaction streamer has stopped")
return
close(runChan)
}
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
}
interval := time.Second
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
})

return time.Second
},
runChan,
)
if err != nil {
return err
}
// Ensure we read the init message before other things start up
for i := 0; ; i++ {
batchCount, err := r.tracker.GetBatchCount()
Expand Down
31 changes: 12 additions & 19 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,29 +870,22 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) {
err, "newRedisUrl", c.config.NewRedisUrl)
}
}

c.LaunchThread(func(ctx context.Context) {
for {
chooseRedisAndUpdateChan := make(chan struct{}, 1)
err := stopwaiter.CallIterativelyWith[struct{}](
&c.StopWaiterSafe,
func(ctx context.Context, ignored struct{}) time.Duration {
interval, err := c.chooseRedisAndUpdate(ctx, newRedisCoordinator)
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in sequencer because transaction streamer has stopped")
return
}
if ctx.Err() != nil {
return
}
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
close(chooseRedisAndUpdateChan)
}
}
})
return interval
},
chooseRedisAndUpdateChan,
)
if err != nil {
log.Warn("error in starting iterative call to chooseRedisAndUpdate", "err", err)
}
if c.config.ChosenHealthcheckAddr != "" {
c.StopWaiter.LaunchThread(c.launchHealthcheckServer)
}
Expand Down
6 changes: 5 additions & 1 deletion util/stopwaiter/stopwaiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func CallIterativelyWith[T any](
return s.LaunchThreadSafe(func(ctx context.Context) {
var defaultVal T
var val T
var ok bool
for {
interval := foo(ctx, val)
if ctx.Err() != nil {
Expand All @@ -246,7 +247,10 @@ func CallIterativelyWith[T any](
timer.Stop()
return
case <-timer.C:
case val = <-triggerChan:
case val, ok = <-triggerChan:
if !ok {
return
}
}
}
})
Expand Down

0 comments on commit 40baf65

Please sign in to comment.