Skip to content

Commit

Permalink
Merge pull request #5466 from oasisprotocol/kostko/fix/light-client-b…
Browse files Browse the repository at this point in the history
…lock-peers

go/consensus/cometbft/light: Try multiple sources when fetching blocks
  • Loading branch information
kostko authored Nov 24, 2023
2 parents 12ec9ca + a0bd142 commit 2870101
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 54 deletions.
1 change: 1 addition & 0 deletions .changelog/5466.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/cometbft/light: Try multiple sources when fetching blocks
4 changes: 4 additions & 0 deletions .changelog/5466.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add `num_light_blocks_kept` configuration option

Located under `consensus.prune`, it allows configuring the number of light
blocks that are kept in the local trusted store (defaulting to 10000).
4 changes: 4 additions & 0 deletions go/consensus/api/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type LightService interface {

// LightClient is a consensus light client interface.
type LightClient interface {
// GetStoredLightBlock retrieves a light block from the local light block store without doing
// any remote queries.
GetStoredLightBlock(height int64) (*LightBlock, error)

// GetLightBlock queries peers for a specific light block.
GetLightBlock(ctx context.Context, height int64) (*LightBlock, rpc.PeerFeedback, error)

Expand Down
15 changes: 5 additions & 10 deletions go/consensus/cometbft/abci/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/config"
nodedb "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
)

const (
// PruneDefault is the default PruneStrategy.
PruneDefault = pruneNone

pruneNone = "none"
pruneKeepN = "keep_n"

// LogEventABCIPruneDelete is a log event value that signals an ABCI pruning
// delete event.
LogEventABCIPruneDelete = "cometbft/abci/prune"
Expand All @@ -38,19 +33,19 @@ const (
func (s PruneStrategy) String() string {
switch s {
case PruneNone:
return pruneNone
return config.PruneStrategyNone
case PruneKeepN:
return pruneKeepN
return config.PruneStrategyKeepN
default:
return "[unknown]"
}
}

func (s *PruneStrategy) FromString(str string) error {
switch strings.ToLower(str) {
case pruneNone:
case config.PruneStrategyNone:
*s = PruneNone
case pruneKeepN:
case config.PruneStrategyKeepN:
*s = PruneKeepN
default:
return fmt.Errorf("abci/pruner: unknown pruning strategy: '%v'", str)
Expand Down
8 changes: 7 additions & 1 deletion go/consensus/cometbft/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func New(
}

// NewLightClient creates a new CometBFT light client service.
func NewLightClient(ctx context.Context, dataDir string, genesis *genesisAPI.Document, consensus consensusAPI.Backend, p2p rpc.P2P) (lightAPI.ClientService, error) {
func NewLightClient(
ctx context.Context,
dataDir string,
genesis *genesisAPI.Document,
consensus consensusAPI.Backend,
p2p rpc.P2P,
) (lightAPI.ClientService, error) {
return light.New(ctx, dataDir, genesis, consensus, p2p)
}
16 changes: 13 additions & 3 deletions go/consensus/cometbft/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ type SubmissionConfig struct {
MaxFee uint64 `yaml:"max_fee"`
}

const (
// PruneStrategyNone is the identifier of the strategy that disables pruning.
PruneStrategyNone = "none"
// PruneStrategyKeepN is the identifier of the strategy that keeps the last N versions.
PruneStrategyKeepN = "keep_n"
)

// PruneConfig is the CometBFT ABCI state pruning configuration structure.
type PruneConfig struct {
// ABCI state pruning strategy.
Expand All @@ -95,6 +102,8 @@ type PruneConfig struct {
NumKept uint64 `yaml:"num_kept"`
// ABCI state pruning interval.
Interval time.Duration `yaml:"interval"`
// Light blocks kept in trusted store.
NumLightBlocksKept uint16 `yaml:"num_light_blocks_kept"`
}

// CheckpointerConfig is the CometBFT ABCI state pruning configuration structure.
Expand Down Expand Up @@ -200,9 +209,10 @@ func DefaultConfig() Config {
HaltHeight: 0,
UpgradeStopDelay: 60 * time.Second,
Prune: PruneConfig{
Strategy: "none",
NumKept: 3600,
Interval: 2 * time.Minute,
Strategy: PruneStrategyNone,
NumKept: 3600,
Interval: 2 * time.Minute,
NumLightBlocksKept: 10000,
},
Checkpointer: CheckpointerConfig{
Disabled: false,
Expand Down
4 changes: 0 additions & 4 deletions go/consensus/cometbft/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db"
lightAPI "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/light/api"
"github.com/oasisprotocol/oasis-core/go/consensus/metrics"
lightP2P "github.com/oasisprotocol/oasis-core/go/consensus/p2p/light"
genesisAPI "github.com/oasisprotocol/oasis-core/go/genesis/api"
cmflags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
cmmetrics "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/metrics"
Expand Down Expand Up @@ -473,9 +472,6 @@ func (t *fullService) RegisterP2PService(p2p p2pAPI.Service) error {
}
t.p2p = p2p

// Register consensus protocol server.
t.p2p.RegisterProtocolServer(lightP2P.NewServer(t.p2p, t.genesis.ChainContext(), t))

return nil
}

Expand Down
21 changes: 21 additions & 0 deletions go/consensus/cometbft/light/api/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api

import (
"context"
"fmt"

cmtlight "github.com/cometbft/cometbft/light"
cmtlightprovider "github.com/cometbft/cometbft/light/provider"
Expand Down Expand Up @@ -41,6 +42,9 @@ type Provider interface {

// PeerID returns the identifier of the peer backing the provider.
PeerID() string

// RefreshPeer notifies the provider to attempt to replace a peer with a fresh one.
RefreshPeer()
}

// ClientConfig is the configuration for the light client.
Expand All @@ -51,3 +55,20 @@ type ClientConfig struct {
// TrustOptions are CometBFT light client trust options.
TrustOptions cmtlight.TrustOptions
}

// NewLightBlock creates a new consensus.LightBlock from a CometBFT light block.
func NewLightBlock(clb *cmttypes.LightBlock) (*consensus.LightBlock, error) {
plb, err := clb.ToProto()
if err != nil {
return nil, fmt.Errorf("failed to marshal light block: %w", err)
}
meta, err := plb.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal light block: %w", err)
}

return &consensus.LightBlock{
Height: clb.Height,
Meta: meta,
}, nil
}
63 changes: 55 additions & 8 deletions go/consensus/cometbft/light/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,70 @@ type lightClient struct {
tmc *cmtlight.Client
}

func tryProviders[R any](
ctx context.Context,
lc *lightClient,
fn func(api.Provider) (R, rpc.PeerFeedback, error),
) (R, rpc.PeerFeedback, error) {
// Primary provider.
providers := append([]api.Provider{}, lc.tmc.Primary().(api.Provider))
// Additional providers.
for _, provider := range lc.tmc.Witnesses() {
providers = append(providers, provider.(api.Provider))
}

var (
result R
err error
)
for _, provider := range providers {
if ctx.Err() != nil {
return result, nil, ctx.Err()
}

var pf rpc.PeerFeedback
result, pf, err = fn(provider)
if err == nil {
return result, pf, nil
}
}

// Trigger primary provider refresh if everything fails.
providers[0].RefreshPeer()

return result, nil, err
}

// GetStoredBlock implements api.Client.
func (lc *lightClient) GetStoredLightBlock(height int64) (*consensus.LightBlock, error) {
clb, err := lc.tmc.TrustedLightBlock(height)
if err != nil {
return nil, err
}
return api.NewLightBlock(clb)
}

// GetLightBlock implements api.Client.
func (lc *lightClient) GetLightBlock(ctx context.Context, height int64) (*consensus.LightBlock, rpc.PeerFeedback, error) {
return lc.getPrimary().GetLightBlock(ctx, height)
return tryProviders(ctx, lc, func(p api.Provider) (*consensus.LightBlock, rpc.PeerFeedback, error) {
return p.GetLightBlock(ctx, height)
})
}

// GetParameters implements api.Client.
func (lc *lightClient) GetParameters(ctx context.Context, height int64) (*consensus.Parameters, rpc.PeerFeedback, error) {
return lc.getPrimary().GetParameters(ctx, height)
return tryProviders(ctx, lc, func(p api.Provider) (*consensus.Parameters, rpc.PeerFeedback, error) {
return p.GetParameters(ctx, height)
})
}

// SubmitEvidence implements api.Client.
func (lc *lightClient) SubmitEvidence(ctx context.Context, evidence *consensus.Evidence) (rpc.PeerFeedback, error) {
return lc.getPrimary().SubmitEvidence(ctx, evidence)
_, pf, err := tryProviders(ctx, lc, func(p api.Provider) (any, rpc.PeerFeedback, error) {
pf, err := p.SubmitEvidence(ctx, evidence)
return nil, pf, err
})
return pf, err
}

// GetVerifiedLightBlock implements Client.
Expand All @@ -49,7 +100,7 @@ func (lc *lightClient) GetVerifiedLightBlock(ctx context.Context, height int64)

// GetVerifiedLightBlock implements Client.
func (lc *lightClient) GetVerifiedParameters(ctx context.Context, height int64) (*cmtproto.ConsensusParams, error) {
p, pf, err := lc.getPrimary().GetParameters(ctx, height)
p, pf, err := lc.GetParameters(ctx, height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,10 +140,6 @@ func (lc *lightClient) GetVerifiedParameters(ctx context.Context, height int64)
return &paramsPB, nil
}

func (lc *lightClient) getPrimary() api.Provider {
return lc.tmc.Primary().(api.Provider)
}

// NewInternalClient creates an internal and non-persistent light client.
//
// This client is instantiated from the provided (obtained out of bound) trusted block
Expand Down
38 changes: 29 additions & 9 deletions go/consensus/cometbft/light/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ type providersPool struct {
// NewLightClientProvider implements LightClientProvidersPool.
func (p *providersPool) NewLightClientProvider() api.Provider {
provider := &lightClientProvider{
initCh: make(chan struct{}),
chainID: p.chainID,
p2pMgr: p.p2pMgr,
rc: p.rc,
logger: logging.GetLogger("cometbft/light/p2p"),
pool: p,
initCh: make(chan struct{}),
chainID: p.chainID,
p2pMgr: p.p2pMgr,
rc: p.rc,
refreshCh: make(chan struct{}, 1),
logger: logging.GetLogger("cometbft/light/p2p"),
pool: p,
}

go provider.worker(p.ctx)
Expand Down Expand Up @@ -87,6 +88,8 @@ type lightClientProvider struct {
p2pMgr rpc.PeerManager
rc rpc.Client

refreshCh chan struct{}

// Guarded by lock.
l sync.RWMutex
peerID *core.PeerID
Expand All @@ -106,6 +109,9 @@ func (lp *lightClientProvider) worker(ctx context.Context) {

for {
select {
case <-lp.refreshCh:
// Explicitly requested peer refresh.
lp.refreshPeer()
case update := <-ch:
peerID := lp.getPeer()
switch {
Expand Down Expand Up @@ -185,7 +191,7 @@ func (lp *lightClientProvider) Initialized() <-chan struct{} {
return lp.initCh
}

// Initialized implements api.Provider.
// PeerID implements api.Provider.
func (lp *lightClientProvider) PeerID() string {
peer := lp.getPeer()
if peer == nil {
Expand All @@ -196,6 +202,14 @@ func (lp *lightClientProvider) PeerID() string {
return peer.String()
}

// RefreshPeer implements api.Provider.
func (lp *lightClientProvider) RefreshPeer() {
select {
case lp.refreshCh <- struct{}{}:
default:
}
}

// MalevolentProvider implements api.Provider.
func (lp *lightClientProvider) MalevolentProvider(peerID string) {
lp.p2pMgr.RecordBadPeer(core.PeerID(peerID))
Expand Down Expand Up @@ -231,6 +245,12 @@ func (lp *lightClientProvider) SubmitEvidence(ctx context.Context, evidence *con
return pf, nil
}

// GetStoredLightBlock implements api.Provider.
func (lp *lightClientProvider) GetStoredLightBlock(_ int64) (*consensus.LightBlock, error) {
// The remote client provider stores no blocks locally.
return nil, consensus.ErrVersionNotFound
}

// GetLightBlock implements api.Provider.
func (lp *lightClientProvider) GetLightBlock(ctx context.Context, height int64) (*consensus.LightBlock, rpc.PeerFeedback, error) {
peerID := lp.getPeer()
Expand All @@ -247,7 +267,7 @@ func (lp *lightClientProvider) GetLightBlock(ctx context.Context, height int64)
// Ensure peer returned the block for the queried height.
if rsp.Height != height {
pf.RecordBadPeer()
return nil, nil, err
return nil, nil, consensus.ErrVersionNotFound
}

return &rsp, pf, nil
Expand All @@ -269,7 +289,7 @@ func (lp *lightClientProvider) GetParameters(ctx context.Context, height int64)
// Ensure peer returned the parameters for the queried height.
if rsp.Height != height {
pf.RecordBadPeer()
return nil, nil, err
return nil, nil, consensus.ErrVersionNotFound
}

return &rsp, pf, nil
Expand Down
Loading

0 comments on commit 2870101

Please sign in to comment.