Skip to content

Commit

Permalink
bump libocr; add context (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Oct 9, 2024
1 parent 167715a commit 8166e65
Show file tree
Hide file tree
Showing 80 changed files with 864 additions and 1,009 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
go.opentelemetry.io/otel v1.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs=
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA=
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 h1:e38V5FYE7DA1JfKXeD5Buo/7lczALuVXlJ8YNTAUxcw=
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM=
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 h1:NzZGjaqez21I3DU7objl3xExTH4fxYvzTqar8DC6360=
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newFactory(s *requests.Store, c *capability, batchSize int, outcomePruningT
}, nil
}

func (o *factory) NewReportingPlugin(config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
func (o *factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
rp, err := newReportingPlugin(o.store, o.capability, o.batchSize, config, o.outcomePruningThreshold, o.lggr)
info := ocr3types.ReportingPluginInfo{
Name: "OCR3 Capability Plugin",
Expand Down
27 changes: 17 additions & 10 deletions pkg/capabilities/consensus/ocr3/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"
"time"

"github.com/smartcontractkit/libocr/quorumhelper"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -149,15 +150,15 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc
return proto.MarshalOptions{Deterministic: true}.Marshal(obs)
}

func (r *reportingPlugin) ValidateObservation(outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error {
func (r *reportingPlugin) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error {
return nil
}

func (r *reportingPlugin) ObservationQuorum(outctx ocr3types.OutcomeContext, query types.Query) (ocr3types.Quorum, error) {
return ocr3types.QuorumTwoFPlusOne, nil
func (r *reportingPlugin) ObservationQuorum(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (bool, error) {
return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, r.config.N, r.config.F, aos), nil
}

func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, attributedObservations []types.AttributedObservation) (ocr3types.Outcome, error) {
func (r *reportingPlugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, attributedObservations []types.AttributedObservation) (ocr3types.Outcome, error) {
// execution ID -> oracle ID -> list of observations
execIDToOracleObservations := map[string]map[ocrcommon.OracleID][]values.Value{}
seenWorkflowIDs := map[string]int{}
Expand Down Expand Up @@ -350,14 +351,14 @@ func marshalReportInfo(info *pbtypes.ReportInfo, keyID string) ([]byte, error) {
return ip, nil
}

func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
func (r *reportingPlugin) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[[]byte], error) {
o := &pbtypes.Outcome{}
err := proto.Unmarshal(outcome, o)
if err != nil {
return nil, err
}

reports := []ocr3types.ReportWithInfo[[]byte]{}
reports := []ocr3types.ReportPlus[[]byte]{}

for _, report := range o.CurrentReports {
if report == nil {
Expand Down Expand Up @@ -438,8 +439,12 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
continue
}

rawReport, err = encoder.Encode(context.Background(), *mv)
rawReport, err = encoder.Encode(ctx, *mv)
if err != nil {
if cerr := ctx.Err(); cerr != nil {
r.lggr.Errorw("report encoding cancelled", "err", cerr)
return nil, cerr
}
r.lggr.Errorw("could not encode report for workflow", "error", err)
continue
}
Expand All @@ -452,9 +457,11 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
}

// Append every report, even if shouldReport = false, to let the transmitter mark the step as complete.
reports = append(reports, ocr3types.ReportWithInfo[[]byte]{
Report: rawReport,
Info: infob,
reports = append(reports, ocr3types.ReportPlus[[]byte]{
ReportWithInfo: ocr3types.ReportWithInfo[[]byte]{
Report: rawReport,
Info: infob,
},
})
}

Expand Down
37 changes: 22 additions & 15 deletions pkg/capabilities/consensus/ocr3/reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/requests"

pbtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -300,7 +299,7 @@ func TestReportingPlugin_Outcome(t *testing.T) {
},
}

outcome, err := rp.Outcome(ocr3types.OutcomeContext{}, qb, aos)
outcome, err := rp.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, qb, aos)
require.NoError(t, err)

opb := &pbtypes.Outcome{}
Expand All @@ -316,6 +315,7 @@ func TestReportingPlugin_Outcome(t *testing.T) {
}

func TestReportingPlugin_Outcome_NilDerefs(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.Test(t)
s := requests.NewStore()
mcap := &mockCapability{
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestReportingPlugin_Outcome_NilDerefs(t *testing.T) {
{},
}

_, err = rp.Outcome(ocr3types.OutcomeContext{}, qb, aos)
_, err = rp.Outcome(ctx, ocr3types.OutcomeContext{}, qb, aos)
require.NoError(t, err)

obs := &pbtypes.Observations{
Expand All @@ -368,7 +368,7 @@ func TestReportingPlugin_Outcome_NilDerefs(t *testing.T) {
Observer: commontypes.OracleID(1),
},
}
_, err = rp.Outcome(ocr3types.OutcomeContext{}, qb, aos)
_, err = rp.Outcome(ctx, ocr3types.OutcomeContext{}, qb, aos)
require.NoError(t, err)
}

Expand Down Expand Up @@ -408,22 +408,25 @@ func TestReportingPlugin_Reports_ShouldReportFalse(t *testing.T) {
}
pl, err := proto.Marshal(outcome)
require.NoError(t, err)
reports, err := rp.Reports(sqNr, pl)
reports, err := rp.Reports(tests.Context(t), sqNr, pl)
require.NoError(t, err)

assert.Len(t, reports, 1)
gotRep := reports[0]
assert.Len(t, gotRep.Report, 0)
assert.Len(t, gotRep.ReportWithInfo.Report, 0)

ib := gotRep.Info
ib := gotRep.ReportWithInfo.Info
info, err := extractReportInfo(ib)
require.NoError(t, err)

assert.EqualExportedValues(t, info.Id, id)
assert.EqualExportedValues(t, id, info.Id)
assert.False(t, info.ShouldReport)

require.Nil(t, gotRep.TransmissionScheduleOverride)
}

func TestReportingPlugin_Reports_NilDerefs(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.Test(t)
s := requests.NewStore()
mcap := &mockCapability{
Expand Down Expand Up @@ -461,7 +464,7 @@ func TestReportingPlugin_Reports_NilDerefs(t *testing.T) {
}
pl, err := proto.Marshal(outcome)
require.NoError(t, err)
_, err = rp.Reports(sqNr, pl)
_, err = rp.Reports(ctx, sqNr, pl)
require.NoError(t, err)
}

Expand Down Expand Up @@ -511,14 +514,14 @@ func TestReportingPlugin_Reports_ShouldReportTrue(t *testing.T) {
}
pl, err := proto.Marshal(outcome)
require.NoError(t, err)
reports, err := rp.Reports(sqNr, pl)
reports, err := rp.Reports(tests.Context(t), sqNr, pl)
require.NoError(t, err)

assert.Len(t, reports, 1)
gotRep := reports[0]

rep := &pb.Value{}
err = proto.Unmarshal(gotRep.Report, rep)
err = proto.Unmarshal(gotRep.ReportWithInfo.Report, rep)
require.NoError(t, err)

// The workflow ID and execution ID get added to the report.
Expand All @@ -538,15 +541,18 @@ func TestReportingPlugin_Reports_ShouldReportTrue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, nm, fp)

ib := gotRep.Info
ib := gotRep.ReportWithInfo.Info
info, err := extractReportInfo(ib)
require.NoError(t, err)

assert.EqualExportedValues(t, info.Id, id)
assert.True(t, info.ShouldReport)

require.Nil(t, gotRep.TransmissionScheduleOverride)
}

func TestReportingPlugin_Outcome_ShouldPruneOldOutcomes(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.Test(t)
s := requests.NewStore()
mcap := &mockCapability{
Expand Down Expand Up @@ -642,13 +648,13 @@ func TestReportingPlugin_Outcome_ShouldPruneOldOutcomes(t *testing.T) {
},
}

outcome1, err := rp.Outcome(ocr3types.OutcomeContext{SeqNr: 100}, qb, aos)
outcome1, err := rp.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 100}, qb, aos)
require.NoError(t, err)
opb1 := &pbtypes.Outcome{}
err = proto.Unmarshal(outcome1, opb1)
require.NoError(t, err)

outcome2, err := rp.Outcome(ocr3types.OutcomeContext{SeqNr: defaultOutcomePruningThreshold + 100, PreviousOutcome: outcome1}, qb, aos2)
outcome2, err := rp.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: defaultOutcomePruningThreshold + 100, PreviousOutcome: outcome1}, qb, aos2)
require.NoError(t, err)
opb2 := &pbtypes.Outcome{}
err = proto.Unmarshal(outcome2, opb2)
Expand All @@ -663,6 +669,7 @@ func TestReportingPlugin_Outcome_ShouldPruneOldOutcomes(t *testing.T) {
}

func TestReportPlugin_Outcome_ShouldReturnMedianTimestamp(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.Test(t)
s := requests.NewStore()
mcap := &mockCapability{
Expand Down Expand Up @@ -785,7 +792,7 @@ func TestReportPlugin_Outcome_ShouldReturnMedianTimestamp(t *testing.T) {
},
}

outcome, err := rp.Outcome(ocr3types.OutcomeContext{SeqNr: 100}, qb, aos)
outcome, err := rp.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 100}, qb, aos)
require.NoError(t, err)
opb1 := &pbtypes.Outcome{}
err = proto.Unmarshal(outcome, opb1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *ContractTransmitter) Transmit(ctx context.Context, configDigest types.C
return err
}

func (c *ContractTransmitter) FromAccount() (types.Account, error) {
func (c *ContractTransmitter) FromAccount(ctx context.Context) (types.Account, error) {
return types.Account(c.fromAccount), nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/codec/modifier_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

var anyTestBytes = []byte("any test bytes")
Expand All @@ -29,7 +30,7 @@ const anyForEncoding = true
func TestModifierCodec(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctx := tests.Context(t)
mod, err := codec.NewModifierCodec(&testCodec{}, testModifier{})
require.NoError(t, err)

Expand Down
94 changes: 0 additions & 94 deletions pkg/loop/adapters/relay/adapter.go

This file was deleted.

Loading

0 comments on commit 8166e65

Please sign in to comment.