Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move medianpoc to feeds #9

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/chainlink-feeds/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/hashicorp/go-plugin"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-feeds/median"
)
Expand Down Expand Up @@ -35,6 +37,14 @@ func main() {
GRPCOpts: s.GRPCOpts,
},
},
reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.MedianProvider]{
PluginServer: p,
BrokerConfig: loop.BrokerConfig{
Logger: s.Logger,
StopCh: stop,
GRPCOpts: s.GRPCOpts,
},
},
},
GRPCServer: s.GRPCOpts.NewServer,
})
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.21.3

require (
github.com/hashicorp/go-plugin v1.5.2
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545
github.com/smartcontractkit/libocr v0.0.0-20231130143053-c5102a9c0fb7
github.com/stretchr/testify v1.8.4
)

require (
Expand Down Expand Up @@ -40,7 +42,6 @@ require (
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
Expand All @@ -53,7 +54,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,16 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad h1:ysPjfbCPJuVxxFZa1Ifv8OPE20pzvnEHjJrPDUo4gT0=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad/go.mod h1:IdlfCN9rUs8Q/hrOYe8McNBIwEOHEsi0jilb3Cw77xs=
github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306 h1:ko88+ZznniNJZbZPWAvHQU8SwKAdHngdDZ+pvVgB5ss=
github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306/go.mod h1:w1sAEES3g3PuV/RzUrgow20W2uErMly84hhD3um1WL4=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg=
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/libocr v0.0.0-20231130143053-c5102a9c0fb7 h1:AA7vf29c6lFsZm+MtIEtXtg6VUOQV6waJo5MUuHfRjQ=
github.com/smartcontractkit/libocr v0.0.0-20231130143053-c5102a9c0fb7/go.mod h1:WcuWFMskcGK0MYZuH5hEhGJOzdJRUFeNEM4PAKlejI4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
Expand Down Expand Up @@ -265,8 +267,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
97 changes: 97 additions & 0 deletions median/data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package median

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/shopspring/decimal"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

type BridgeMetaData struct {
LatestAnswer *big.Int `json:"latestAnswer"`
UpdatedAt *big.Int `json:"updatedAt"` // A unix timestamp
}

func MarshalBridgeMetaData(latestAnswer *big.Int, updatedAt *big.Int) (map[string]interface{}, error) {
b, err := json.Marshal(&BridgeMetaData{LatestAnswer: latestAnswer, UpdatedAt: updatedAt})
if err != nil {
return nil, err
}
var mp map[string]interface{}
err = json.Unmarshal(b, &mp)
if err != nil {
return nil, err
}
return mp, nil
}

type DataSource struct {
pipelineRunner types.PipelineRunnerService
spec string
lggr logger.Logger

current BridgeMetaData
mu sync.RWMutex
}

func (d *DataSource) Observe(ctx context.Context, reportTimestamp ocrtypes.ReportTimestamp) (*big.Int, error) {
md, err := MarshalBridgeMetaData(d.currentAnswer())
if err != nil {
d.lggr.Warnw("unable to attach metadata for run", "err", err)
}

// NOTE: job metadata is automatically attached by the pipeline runner service
vars := types.Vars{
Vars: map[string]interface{}{
"jobRun": md,
},
}

results, err := d.pipelineRunner.ExecuteRun(ctx, d.spec, vars, types.Options{})
if err != nil {
return nil, err
}

finalResults := results.FinalResults()
if len(finalResults) == 0 {
return nil, errors.New("pipeline execution failed: not enough results")
}

finalResult := finalResults[0]
if finalResult.Error != nil {
return nil, fmt.Errorf("pipeline execution failed: %w", finalResult.Error)
}

asDecimal, ok := (finalResult.Value).(decimal.Decimal)
if !ok {
return nil, errors.New("cannot convert observation to decimal")
}

resultAsBigInt := asDecimal.BigInt()
d.updateAnswer(resultAsBigInt)
return resultAsBigInt, nil
}

func (d *DataSource) currentAnswer() (*big.Int, *big.Int) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.current.LatestAnswer, d.current.UpdatedAt
}

func (d *DataSource) updateAnswer(latestAnswer *big.Int) {
d.mu.Lock()
defer d.mu.Unlock()
d.current = BridgeMetaData{
LatestAnswer: latestAnswer,
UpdatedAt: big.NewInt(time.Now().Unix()),
}
}
117 changes: 117 additions & 0 deletions median/data_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package median

import (
"context"
"errors"
"math/big"
"testing"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-common/pkg/types"
)

type mockPipelineRunner struct {
results types.TaskResults
err error
spec string
vars types.Vars
options types.Options
}

func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) (types.TaskResults, error) {
m.spec = spec
m.vars = vars
m.options = options
return m.results, m.err
}

func TestDataSource(t *testing.T) {
lggr := logger.Test(t)
expect := int64(3)
pr := &mockPipelineRunner{
results: types.TaskResults{
{
TaskValue: types.TaskValue{
Value: decimal.NewFromInt(expect),
Error: nil,
IsTerminal: true,
},
Index: 2,
},
{
TaskValue: types.TaskValue{
Value: decimal.NewFromInt(6),
Error: nil,
IsTerminal: false,
},
Index: 1,
},
},
}
spec := "SPEC"
ds := &DataSource{
pipelineRunner: pr,
spec: spec,
lggr: lggr,
}
res, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, big.NewInt(expect), res)
assert.Equal(t, spec, pr.spec)
assert.Equal(t, big.NewInt(expect), ds.current.LatestAnswer)
}

func TestDataSource_ResultErrors(t *testing.T) {
lggr := logger.Test(t)
pr := &mockPipelineRunner{
results: types.TaskResults{
{
TaskValue: types.TaskValue{
Error: errors.New("something went wrong"),
IsTerminal: true,
},
Index: 0,
},
},
}
spec := "SPEC"
ds := &DataSource{
pipelineRunner: pr,
spec: spec,
lggr: lggr,
}
_, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{})
assert.ErrorContains(t, err, "something went wrong")
}

func TestDataSource_ResultNotAnInt(t *testing.T) {
lggr := logger.Test(t)

expect := "string-result"
pr := &mockPipelineRunner{
results: types.TaskResults{
{
TaskValue: types.TaskValue{
Value: expect,
IsTerminal: true,
},
Index: 0,
},
},
}
spec := "SPEC"
ds := &DataSource{
pipelineRunner: pr,
spec: spec,
lggr: lggr,
}
_, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{})
assert.ErrorContains(t, err, "cannot convert observation to decimal")
}
Loading