diff --git a/.changeset/red-balloons-repeat.md b/.changeset/red-balloons-repeat.md new file mode 100644 index 0000000000..674cae9602 --- /dev/null +++ b/.changeset/red-balloons-repeat.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +Commit NewReportingPlugin retries on error diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index 95d6398f56..648f62a23a 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" @@ -64,40 +65,60 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context return nil } -// NewReportingPlugin returns the ccip CommitReportingPlugin and satisfies the ReportingPluginFactory interface. +type reportingPluginAndInfo struct { + plugin types.ReportingPlugin + pluginInfo types.ReportingPluginInfo +} + +// NewReportingPlugin registers a new ReportingPlugin func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) { - ctx := context.Background() // todo: consider adding some timeout + initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay + maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay - destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay) if err != nil { return nil, types.ReportingPluginInfo{}, err } + return pluginAndInfo.plugin, pluginAndInfo.pluginInfo, err +} - priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } - if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { - return nil, types.ReportingPluginInfo{}, err - } +// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be +// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to +// function, hence why we can only keep retrying it until it succeeds. +func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { + return func() (reportingPluginAndInfo, error) { + ctx := context.Background() // todo: consider adding some timeout - pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + if err != nil { + return reportingPluginAndInfo{}, err + } - gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) + if err != nil { + return reportingPluginAndInfo{}, err + } + if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { + return reportingPluginAndInfo{}, err + } - err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } + + gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } - lggr := rf.config.lggr.Named("CommitReportingPlugin") - return &CommitReportingPlugin{ + err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader) + if err != nil { + return reportingPluginAndInfo{}, err + } + + lggr := rf.config.lggr.Named("CommitReportingPlugin") + plugin := &CommitReportingPlugin{ sourceChainSelector: rf.config.sourceChainSelector, sourceNative: rf.config.sourceNative, onRampReader: rf.config.onRampReader, @@ -112,8 +133,9 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin metricsCollector: rf.config.metricsCollector, chainHealthcheck: rf.config.chainHealthcheck, priceService: rf.config.priceService, - }, - types.ReportingPluginInfo{ + } + + pluginInfo := types.ReportingPluginInfo{ Name: "CCIPCommit", UniqueReports: false, // See comment in CommitStore constructor. Limits: types.ReportingPluginLimits{ @@ -121,5 +143,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin MaxObservationLength: ccip.MaxObservationLength, MaxReportLength: MaxCommitReportLength, }, - }, nil + } + + return reportingPluginAndInfo{plugin, pluginInfo}, nil + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go new file mode 100644 index 0000000000..825026bd17 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go @@ -0,0 +1,100 @@ +package ccipcommit + +import ( + "errors" + "testing" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" + dbMocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdb/mocks" +) + +// Assert that NewReportingPlugin keeps retrying until it succeeds. +// +// NewReportingPlugin makes several calls (e.g. CommitStoreReader.ChangeConfig) that can fail. We use mocks to cause the +// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin +// retries a sufficient number of times to get through the transient errors and eventually succeed. +func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) { + commitConfig := CommitPluginStaticConfig{} + + // For this unit test, ensure that there is no delay between retries + commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: 0 * time.Nanosecond, + MaxDelay: 0 * time.Nanosecond, + } + + // Set up the OffRampReader mock + mockCommitStore := new(mocks.CommitStoreReader) + + // The first call is set to return an error, the following calls return a nil error + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address(""), errors.New("")). + Once() + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address("0x7c6e4F0BDe29f83BC394B75a7f313B7E5DbD2d77"), nil). + Times(5) + + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, errors.New("")). + Once() + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, nil). + Times(3) + + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, errors.New("")). + Once() + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, nil). + Times(2) + + commitConfig.commitStore = mockCommitStore + + mockPriceService := new(dbMocks.PriceService) + + mockPriceService. + On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("")). + Once() + mockPriceService. + On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything). + Return(nil) + + commitConfig.priceService = mockPriceService + + priceRegistryProvider := new(ccipdataprovidermocks.PriceRegistry) + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, errors.New("")). + Once() + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, nil). + Once() + commitConfig.priceRegistryProvider = priceRegistryProvider + + commitConfig.lggr, _ = logger.NewLogger() + + factory := NewCommitReportingPluginFactory(commitConfig) + reportingConfig := types.ReportingPluginConfig{} + reportingConfig.OnchainConfig = []byte{1, 2, 3} + reportingConfig.OffchainConfig = []byte{1, 2, 3} + + // Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above) + _, _, err := factory.NewReportingPlugin(reportingConfig) + assert.Equal(t, nil, err) +} diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 97730ce85b..ba87278b4b 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -6,19 +6,19 @@ import ( "fmt" "math/big" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" - chainselectors "github.com/smartcontractkit/chain-selectors" - libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" "go.uber.org/multierr" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + chainselectors "github.com/smartcontractkit/chain-selectors" + libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" - + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/commit_store" @@ -46,6 +46,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} + func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { orm, err := cciporm.NewORM(ds) if err != nil { @@ -271,8 +273,8 @@ func jobSpecToCommitPluginConfig(ctx context.Context, orm cciporm.ORM, lggr logg "sourceNative", sourceNative, "sourceRouter", sourceRouter.Address()) return &CommitPluginStaticConfig{ - lggr: commitLggr, - onRampReader: onRampReader, + lggr: commitLggr, + newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig, onRampReader: onRampReader, offRamp: offRampReader, sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative), sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector, diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 3c96940b8d..2f0fc4e795 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -50,7 +50,8 @@ type update struct { } type CommitPluginStaticConfig struct { - lggr logger.Logger + lggr logger.Logger + newReportingPluginRetryConfig ccipdata.RetryConfig // Source onRampReader ccipdata.OnRampReader sourceChainSelector uint64