Skip to content

Commit

Permalink
Merge pull request #37 from ClarkChenc/handle_heimdallclient_graceful…
Browse files Browse the repository at this point in the history
…_shutdown

Handle heimdallclient shutdown when retry heimdall data
  • Loading branch information
zhang0125 authored Feb 1, 2023
2 parents 963711a + e50e858 commit 4ecc3dd
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 5 deletions.
11 changes: 11 additions & 0 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ var (
// errOutOfRangeChain is returned if an authorization list is attempted to
// be modified via out-of-range or non-contiguous headers.
errOutOfRangeChain = errors.New("out of range or non-contiguous chain")

// errShutdownDetected is returned if a shutdown signal is detected
errShutdownDetected = errors.New("shutdown detected")
)

// SignerFn is a signer callback function to request a header to be signed by a
Expand Down Expand Up @@ -888,6 +891,11 @@ func (c *Bor) APIs(chain consensus.ChainHeaderReader) []rpc.API {
}}
}

// StopClient implements consensus.Engine. It will close any information fetching client before closing engine.
func (c *Bor) StopClient() {
c.HeimdallClient.Close()
}

// Close implements consensus.Engine. It's a noop for bor as there are no background threads.
func (c *Bor) Close() error {
return nil
Expand Down Expand Up @@ -1137,6 +1145,9 @@ func (c *Bor) CommitStates(
"fromID", lastStateID+1,
"to", to.Format(time.RFC3339))
eventRecords, err := c.HeimdallClient.FetchStateSyncEvents(lastStateID+1, to.Unix())
if err != nil {
return nil, err
}
if c.config.OverrideStateSyncRecords != nil {
if val, ok := c.config.OverrideStateSyncRecords[strconv.FormatUint(number, 10)]; ok {
if val < len(eventRecords) {
Expand Down
34 changes: 29 additions & 5 deletions consensus/bor/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ type IHeimdallClient interface {
Fetch(path string, query string) (*ResponseWithHeight, error)
FetchWithRetry(path string, query string) (*ResponseWithHeight, error)
FetchStateSyncEvents(fromID uint64, to int64) ([]*EventRecordWithTime, error)
Close()
}

type HeimdallClient struct {
urlString string
client http.Client
closeCh chan struct{}
}

func NewHeimdallClient(urlString string) (*HeimdallClient, error) {
Expand All @@ -40,10 +42,16 @@ func NewHeimdallClient(urlString string) (*HeimdallClient, error) {
client: http.Client{
Timeout: time.Duration(5 * time.Second),
},
closeCh: make(chan struct{}),
}
return h, nil
}

func (h *HeimdallClient) Close() {
close(h.closeCh)
h.client.CloseIdleConnections()
}

func (h *HeimdallClient) FetchStateSyncEvents(fromID uint64, to int64) ([]*EventRecordWithTime, error) {
eventRecords := make([]*EventRecordWithTime, 0)
for {
Expand Down Expand Up @@ -96,13 +104,29 @@ func (h *HeimdallClient) FetchWithRetry(rawPath string, rawQuery string) (*Respo
u.Path = rawPath
u.RawQuery = rawQuery

retryCount := 1
res, err := h.internalFetch(u)
if err == nil && res != nil {
return res, nil
}

retryTicker := time.NewTicker(5 * time.Second)
defer retryTicker.Stop()

for {
res, err := h.internalFetch(u)
if err == nil && res != nil {
return res, nil
select {
case <-h.closeCh:
log.Info("Shutdown detected, heimdall client terminates request")
return nil, errShutdownDetected

case <-retryTicker.C:
log.Info("Retrying again in 5 seconds to fetch Heimdall data", "path", u.Path, "retryCount", retryCount)
retryCount += 1
res, err := h.internalFetch(u)
if err == nil && res != nil {
return res, nil
}
}
log.Info("Retrying again in 5 seconds for next Heimdall span", "path", u.Path)
time.Sleep(5 * time.Second)
}
}

Expand Down
3 changes: 3 additions & 0 deletions consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ func (c *Clique) SealHash(header *types.Header) common.Hash {
return SealHash(header)
}

// StopClient implements consensus.Engine. It will close any information fetching client before closing engine.
func (c *Clique) StopClient() {}

// Close implements consensus.Engine. It's a noop for clique as there are no background threads.
func (c *Clique) Close() error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ type Engine interface {
// APIs returns the RPC APIs this consensus engine provides.
APIs(chain ChainHeaderReader) []rpc.API

// StopClient terminates any background client maintained by the consensus engine.
// It is used to release resources related with information fetching.
StopClient()

// Close terminates any background threads maintained by the consensus engine.
Close() error
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/ethash/ethash.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ func NewShared() *Ethash {
return &Ethash{shared: sharedEthash}
}

func (ethhash *Ethash) StopClient() {}

// Close closes the exit channel to notify all backend threads exiting.
func (ethash *Ethash) Close() error {
ethash.closeOnce.Do(func() {
Expand Down
4 changes: 4 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,19 @@ func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.ethDialCandidates.Close()
s.snapDialCandidates.Close()

s.engine.StopClient()
s.handler.Stop()

// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)

s.txPool.Stop()
s.miner.Stop()
s.blockchain.Stop()
s.engine.Close()

rawdb.PopUncleanShutdownMarker(s.chainDb)
s.chainDb.Close()
s.eventMux.Stop()
Expand Down
2 changes: 2 additions & 0 deletions tests/bor/mocks/IHeimdallClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4ecc3dd

Please sign in to comment.