Skip to content

Commit

Permalink
Introduce job spec flag for custom reverted pipeline (#11529)
Browse files Browse the repository at this point in the history
* Introduce job spec flag for custom reverted pipeline

* Disable the flag for V2+

* Rename file after merge conflict
  • Loading branch information
kidambisrinivas authored Dec 13, 2023
1 parent 8b2c48d commit c274c23
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 25 deletions.
21 changes: 14 additions & 7 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ func TestORM_CreateJob_VRFV2(t *testing.T) {
var batchFulfillmentEnabled bool
require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`))
require.False(t, batchFulfillmentEnabled)
var customRevertsPipelineEnabled bool
require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`))
require.False(t, customRevertsPipelineEnabled)
var batchFulfillmentGasMultiplier float64
require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`))
require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier)
Expand Down Expand Up @@ -514,13 +517,14 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
fromAddresses := []string{cltest.NewEIP55Address().String(), cltest.NewEIP55Address().String()}
jb, err := vrfcommon.ValidatedVRFSpec(testspecs.GenerateVRFSpec(
testspecs.VRFSpecParams{
VRFVersion: vrfcommon.V2Plus,
RequestedConfsDelay: 10,
FromAddresses: fromAddresses,
ChunkSize: 25,
BackoffInitialDelay: time.Minute,
BackoffMaxDelay: time.Hour,
GasLanePrice: assets.GWei(100),
VRFVersion: vrfcommon.V2Plus,
RequestedConfsDelay: 10,
FromAddresses: fromAddresses,
ChunkSize: 25,
BackoffInitialDelay: time.Minute,
BackoffMaxDelay: time.Hour,
GasLanePrice: assets.GWei(100),
CustomRevertsPipelineEnabled: true,
}).
Toml())
require.NoError(t, err)
Expand All @@ -534,6 +538,9 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
var batchFulfillmentEnabled bool
require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`))
require.False(t, batchFulfillmentEnabled)
var customRevertsPipelineEnabled bool
require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`))
require.True(t, customRevertsPipelineEnabled)
var batchFulfillmentGasMultiplier float64
require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`))
require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier)
Expand Down
3 changes: 3 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ type VRFSpec struct {
// for fulfilling requests. If set to true, batchCoordinatorAddress must be set in
// the job spec.
BatchFulfillmentEnabled bool `toml:"batchFulfillmentEnabled"`
// CustomRevertsPipelineEnabled indicates to the vrf job to run the
// custom reverted txns pipeline along with VRF listener
CustomRevertsPipelineEnabled bool `toml:"customRevertsPipelineEnabled"`
// BatchFulfillmentGasMultiplier is used to determine the final gas estimate for the batch
// fulfillment.
BatchFulfillmentGasMultiplier tomlutils.Float64 `toml:"batchFulfillmentGasMultiplier"`
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,14 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
evm_chain_id, from_addresses, poll_period, requested_confs_delay,
request_timeout, chunk_size, batch_coordinator_address, batch_fulfillment_enabled,
batch_fulfillment_gas_multiplier, backoff_initial_delay, backoff_max_delay, gas_lane_price,
vrf_owner_address,
vrf_owner_address, custom_reverts_pipeline_enabled,
created_at, updated_at)
VALUES (
:coordinator_address, :public_key, :min_incoming_confirmations,
:evm_chain_id, :from_addresses, :poll_period, :requested_confs_delay,
:request_timeout, :chunk_size, :batch_coordinator_address, :batch_fulfillment_enabled,
:batch_fulfillment_gas_multiplier, :backoff_initial_delay, :backoff_max_delay, :gas_lane_price,
:vrf_owner_address,
:vrf_owner_address, :custom_reverts_pipeline_enabled,
NOW(), NOW())
RETURNING id;`

Expand Down
1 change: 1 addition & 0 deletions core/services/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func ValidateSpec(ts string) (Type, error) {
if jb.Pipeline.RequiresPreInsert() && !jb.Type.SupportsAsync() {
return "", errors.Errorf("async=true tasks are not supported for %v", jb.Type)
}
// spec.CustomRevertsPipelineEnabled == false, default is custom reverted txns pipeline disabled

if strings.Contains(ts, "<{}>") {
return "", errors.Errorf("'<{}>' syntax is not supported. Please use \"{}\" instead")
Expand Down
3 changes: 3 additions & 0 deletions core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
if vrfOwner != nil {
return nil, errors.New("VRF Owner is not supported for VRF V2 Plus")
}
if jb.VRFSpec.CustomRevertsPipelineEnabled {
return nil, errors.New("Custom Reverted Txns Pipeline is not supported for VRF V2 Plus")
}

// Get the LINKNATIVEFEED address with retries
// This is needed because the RPC endpoint may be down so we need to
Expand Down
5 changes: 5 additions & 0 deletions core/store/migrate/migrations/0214_add_custom_reverts_vrf.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
ALTER TABLE vrf_specs ADD COLUMN custom_reverts_pipeline_enabled boolean DEFAULT FALSE NOT NULL;

-- +goose Down
ALTER TABLE vrf_specs DROP COLUMN custom_reverts_pipeline_enabled;
3 changes: 3 additions & 0 deletions core/testdata/testspecs/v2_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type VRFSpecParams struct {
BatchCoordinatorAddress string
VRFOwnerAddress string
BatchFulfillmentEnabled bool
CustomRevertsPipelineEnabled bool
BatchFulfillmentGasMultiplier float64
MinIncomingConfirmations int
FromAddresses []string
Expand Down Expand Up @@ -403,6 +404,7 @@ evmChainID = "%s"
batchCoordinatorAddress = "%s"
batchFulfillmentEnabled = %v
batchFulfillmentGasMultiplier = %s
customRevertsPipelineEnabled = %v
minIncomingConfirmations = %d
requestedConfsDelay = %d
requestTimeout = "%s"
Expand All @@ -419,6 +421,7 @@ observationSource = """
toml := fmt.Sprintf(template,
jobID, name, coordinatorAddress, params.EVMChainID, batchCoordinatorAddress,
params.BatchFulfillmentEnabled, strconv.FormatFloat(batchFulfillmentGasMultiplier, 'f', 2, 64),
params.CustomRevertsPipelineEnabled,
confirmations, params.RequestedConfsDelay, requestTimeout.String(), publicKey, chunkSize,
params.BackoffInitialDelay.String(), params.BackoffMaxDelay.String(), gasLanePrice.String(),
pollPeriod.String(), observationSource)
Expand Down
38 changes: 22 additions & 16 deletions core/web/presenters/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func NewCronSpec(spec *job.CronSpec) *CronSpec {
type VRFSpec struct {
BatchCoordinatorAddress *ethkey.EIP55Address `json:"batchCoordinatorAddress"`
BatchFulfillmentEnabled bool `json:"batchFulfillmentEnabled"`
CustomRevertsPipelineEnabled *bool `json:"customRevertsPipelineEnabled,omitempty"`
BatchFulfillmentGasMultiplier float64 `json:"batchFulfillmentGasMultiplier"`
CoordinatorAddress ethkey.EIP55Address `json:"coordinatorAddress"`
PublicKey secp256k1.PublicKey `json:"publicKey"`
Expand All @@ -281,26 +282,31 @@ type VRFSpec struct {
BackoffInitialDelay models.Duration `json:"backoffInitialDelay"`
BackoffMaxDelay models.Duration `json:"backoffMaxDelay"`
GasLanePrice *assets.Wei `json:"gasLanePrice"`
VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress"`
RequestedConfsDelay int64 `json:"requestedConfsDelay"`
VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress,omitempty"`
}

func NewVRFSpec(spec *job.VRFSpec) *VRFSpec {
return &VRFSpec{
BatchCoordinatorAddress: spec.BatchCoordinatorAddress,
BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled,
CoordinatorAddress: spec.CoordinatorAddress,
PublicKey: spec.PublicKey,
FromAddresses: spec.FromAddresses,
PollPeriod: models.MustMakeDuration(spec.PollPeriod),
MinIncomingConfirmations: spec.MinIncomingConfirmations,
CreatedAt: spec.CreatedAt,
UpdatedAt: spec.UpdatedAt,
EVMChainID: spec.EVMChainID,
ChunkSize: spec.ChunkSize,
RequestTimeout: models.MustMakeDuration(spec.RequestTimeout),
BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay),
BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay),
GasLanePrice: spec.GasLanePrice,
BatchCoordinatorAddress: spec.BatchCoordinatorAddress,
BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled,
BatchFulfillmentGasMultiplier: float64(spec.BatchFulfillmentGasMultiplier),
CustomRevertsPipelineEnabled: &spec.CustomRevertsPipelineEnabled,
CoordinatorAddress: spec.CoordinatorAddress,
PublicKey: spec.PublicKey,
FromAddresses: spec.FromAddresses,
PollPeriod: models.MustMakeDuration(spec.PollPeriod),
MinIncomingConfirmations: spec.MinIncomingConfirmations,
CreatedAt: spec.CreatedAt,
UpdatedAt: spec.UpdatedAt,
EVMChainID: spec.EVMChainID,
ChunkSize: spec.ChunkSize,
RequestTimeout: models.MustMakeDuration(spec.RequestTimeout),
BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay),
BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay),
GasLanePrice: spec.GasLanePrice,
RequestedConfsDelay: spec.RequestedConfsDelay,
VRFOwnerAddress: spec.VRFOwnerAddress,
}
}

Expand Down
87 changes: 87 additions & 0 deletions core/web/presenters/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/assets"
evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
clnull "github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/signatures/secp256k1"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/web/presenters"
)
Expand Down Expand Up @@ -58,6 +60,7 @@ func TestJob(t *testing.T) {
trustedBlockhashStoreBatchSize := int32(20)

var specGasLimit uint32 = 1000
vrfPubKey, _ := secp256k1.NewPublicKeyFromHex("0xede539e216e3a50e69d1c68aa9cc472085876c4002f6e1e6afee0ea63b50a78b00")

testCases := []struct {
name string
Expand Down Expand Up @@ -469,6 +472,90 @@ func TestJob(t *testing.T) {
}
}`,
},
{
name: "vrf job spec",
job: job.Job{
ID: 1,
Name: null.StringFrom("vrf_test"),
Type: job.VRF,
SchemaVersion: 1,
ExternalJobID: uuid.MustParse("0eec7e1d-d0d2-476c-a1a8-72dfb6633f47"),
VRFSpec: &job.VRFSpec{
BatchCoordinatorAddress: &contractAddress,
BatchFulfillmentEnabled: true,
CustomRevertsPipelineEnabled: true,
MinIncomingConfirmations: 1,
CoordinatorAddress: contractAddress,
CreatedAt: timestamp,
UpdatedAt: timestamp,
EVMChainID: evmChainID,
FromAddresses: []ethkey.EIP55Address{fromAddress},
PublicKey: vrfPubKey,
RequestedConfsDelay: 10,
ChunkSize: 25,
BatchFulfillmentGasMultiplier: 1,
GasLanePrice: evmassets.GWei(200),
VRFOwnerAddress: nil,
},
PipelineSpec: &pipeline.Spec{
ID: 1,
DotDagSource: "",
},
},
want: fmt.Sprintf(`
{
"data": {
"type": "jobs",
"id": "1",
"attributes": {
"name": "vrf_test",
"type": "vrf",
"schemaVersion": 1,
"maxTaskDuration": "0s",
"externalJobID": "0eec7e1d-d0d2-476c-a1a8-72dfb6633f47",
"directRequestSpec": null,
"fluxMonitorSpec": null,
"gasLimit": null,
"forwardingAllowed": false,
"cronSpec": null,
"offChainReportingOracleSpec": null,
"offChainReporting2OracleSpec": null,
"keeperSpec": null,
"vrfSpec": {
"batchCoordinatorAddress": "%s",
"batchFulfillmentEnabled": true,
"customRevertsPipelineEnabled": true,
"confirmations": 1,
"coordinatorAddress": "%s",
"createdAt": "2000-01-01T00:00:00Z",
"updatedAt": "2000-01-01T00:00:00Z",
"evmChainID": "42",
"fromAddresses": ["%s"],
"pollPeriod": "0s",
"publicKey": "%s",
"requestedConfsDelay": 10,
"requestTimeout": "0s",
"chunkSize": 25,
"batchFulfillmentGasMultiplier": 1,
"backoffInitialDelay": "0s",
"backoffMaxDelay": "0s",
"gasLanePrice": "200 gwei"
},
"webhookSpec": null,
"blockhashStoreSpec": null,
"blockHeaderFeederSpec": null,
"bootstrapSpec": null,
"pipelineSpec": {
"id": 1,
"jobID": 0,
"dotDagSource": ""
},
"gatewaySpec": null,
"errors": []
}
}
}`, contractAddress, contractAddress, fromAddress, vrfPubKey.String()),
},
{
name: "blockhash store spec",
job: job.Job{
Expand Down
5 changes: 5 additions & 0 deletions core/web/resolver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ func (r *VRFSpecResolver) BatchFulfillmentGasMultiplier() float64 {
return float64(r.spec.BatchFulfillmentGasMultiplier)
}

// CustomRevertsPipelineEnabled resolves the spec's custom reverts pipeline enabled flag.
func (r *VRFSpecResolver) CustomRevertsPipelineEnabled() *bool {
return &r.spec.CustomRevertsPipelineEnabled
}

// ChunkSize resolves the spec's chunk size.
func (r *VRFSpecResolver) ChunkSize() int32 {
return int32(r.spec.ChunkSize)
Expand Down
3 changes: 3 additions & 0 deletions core/web/resolver/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func TestResolver_VRFSpec(t *testing.T) {
VRFSpec: &job.VRFSpec{
BatchCoordinatorAddress: &batchCoordinatorAddress,
BatchFulfillmentEnabled: true,
CustomRevertsPipelineEnabled: true,
MinIncomingConfirmations: 1,
CoordinatorAddress: coordinatorAddress,
CreatedAt: f.Timestamp(),
Expand Down Expand Up @@ -617,6 +618,7 @@ func TestResolver_VRFSpec(t *testing.T) {
batchCoordinatorAddress
batchFulfillmentEnabled
batchFulfillmentGasMultiplier
customRevertsPipelineEnabled
chunkSize
backoffInitialDelay
backoffMaxDelay
Expand Down Expand Up @@ -644,6 +646,7 @@ func TestResolver_VRFSpec(t *testing.T) {
"batchCoordinatorAddress": "0x0ad9FE7a58216242a8475ca92F222b0640E26B63",
"batchFulfillmentEnabled": true,
"batchFulfillmentGasMultiplier": 1,
"customRevertsPipelineEnabled": true,
"chunkSize": 25,
"backoffInitialDelay": "1m0s",
"backoffMaxDelay": "1h0m0s",
Expand Down
1 change: 1 addition & 0 deletions core/web/schema/type/spec.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type VRFSpec {
batchCoordinatorAddress: String
batchFulfillmentEnabled: Boolean!
batchFulfillmentGasMultiplier: Float!
customRevertsPipelineEnabled: Boolean
chunkSize: Int!
backoffInitialDelay: String!
backoffMaxDelay: String!
Expand Down

0 comments on commit c274c23

Please sign in to comment.