Skip to content

Commit

Permalink
Logging and misc LLO fixes
Browse files Browse the repository at this point in the history
Fixes: MERC-6626
  • Loading branch information
samsondav committed Nov 19, 2024
1 parent 3cadac2 commit 940b7e6
Show file tree
Hide file tree
Showing 57 changed files with 365 additions and 145 deletions.
5 changes: 5 additions & 0 deletions .changeset/mean-dots-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config var Mercury.Transmitter.TransmitConcurrency #added
6 changes: 6 additions & 0 deletions .changeset/shiny-owls-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": patch
---

Logging improvements for LLO
#internal
6 changes: 3 additions & 3 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryConfig: cfg.Mercury(),
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
4 changes: 4 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ TransmitQueueMaxSize = 10_000 # Default
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
# TransmitConcurrency is the max number of concurrent transmits to each server.
#
# Only has effect with LLO jobs.
TransmitConcurrency = 100 # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
Expand Down
1 change: 1 addition & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type MercuryTLS interface {
type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
}

type Mercury interface {
Expand Down
4 changes: 4 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
TransmitConcurrency *uint32
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
Expand All @@ -1339,6 +1340,9 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
if v := f.TransmitConcurrency; v != nil {
m.TransmitConcurrency = v
}
}

type Mercury struct {
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
MailMon: mailMon,
DS: ds,
},
CSAETHKeystore: keyStore,
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
MercuryConfig: cfg.Mercury(),
}

if cfg.EVMEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.29 // indirect
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec // indirect
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f // indirect
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect
github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect
github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1413,8 +1413,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE=
github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c=
github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 h1:0ewLMbAz3rZrovdRUCgd028yOXX8KigB4FndAUdI2kM=
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 {
return *m.c.TransmitConcurrency
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ func TestConfig_Marshal(t *testing.T) {
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
TransmitConcurrency: ptr(uint32(456)),
},
VerboseLogging: ptr(true),
}
Expand Down Expand Up @@ -1348,6 +1349,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
4 changes: 2 additions & 2 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *RelayerFactory) NewDummy(config DummyFactoryConfig) (loop.Relayer, erro
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
MercuryConfig coreconfig.Mercury
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LOOPRelayAdapter, error) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
MercuryConfig: config.MercuryConfig,
CapabilitiesRegistry: r.CapabilitiesRegistry,
HTTPClient: r.HTTPClient,
RetirementReportCache: r.RetirementReportCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
5 changes: 3 additions & 2 deletions core/services/llo/codecs.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package llo

import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/services/llo/evm"
)

// NOTE: All supported codecs must be specified here
func NewReportCodecs() map[llotypes.ReportFormat]llo.ReportCodec {
func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec {
codecs := make(map[llotypes.ReportFormat]llo.ReportCodec)

codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.ReportCodecPremiumLegacy{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr)

return codecs
}
3 changes: 2 additions & 1 deletion core/services/llo/codecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"github.com/stretchr/testify/assert"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_NewReportCodecs(t *testing.T) {
c := NewReportCodecs()
c := NewReportCodecs(logger.TestLogger(t))

assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported")
assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported")
Expand Down
61 changes: 36 additions & 25 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package llo
import (
"context"
"fmt"
"slices"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -85,11 +87,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour

// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamValues))
var svmu sync.Mutex
var errs []ErrObservationFailed
var errmu sync.Mutex
now := time.Now()

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
Expand All @@ -100,6 +98,13 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

var wg sync.WaitGroup
wg.Add(len(streamValues))

var mu sync.Mutex
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()
Expand All @@ -108,17 +113,17 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

stream, exists := d.registry.Get(streamID)
if !exists {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
Expand All @@ -129,44 +134,50 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

successfulStreamIDs = append(successfulStreamIDs, streamID)
if val != nil {
svmu.Lock()
defer svmu.Unlock()
streamValues[streamID] = val
}
}(streamID)
}

wg.Wait()
elapsed := time.Since(now)

// Failed observations are always logged at warn level
var failedStreamIDs []streams.StreamID
if len(errs) > 0 {
// Only log on errors or if VerboseLogging is turned on
if len(errs) > 0 || opts.VerboseLogging() {
slices.Sort(successfulStreamIDs)
sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errs))

failedStreamIDs := make([]streams.StreamID, len(errs))
errStrs := make([]string, len(errs))
for i, e := range errs {
errStrs[i] = e.String()
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

if opts.VerboseLogging() {
successes := make([]streams.StreamID, 0, len(streamValues))
for strmID := range streamValues {
successes = append(successes, strmID)
lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)

if opts.VerboseLogging() {
lggr = logger.With(lggr, "streamValues", streamValues)
}

if len(errs) == 0 && opts.VerboseLogging() {
lggr.Infow("Observation succeeded for all streams")
} else if len(errs) > 0 {
lggr.Warnw("Observation failed for streams")
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr)
}

return nil
Expand Down
13 changes: 10 additions & 3 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-data-streams/llo"
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"

corelogger "github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)
Expand Down Expand Up @@ -91,7 +92,13 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
if cfg.ShouldRetireCache == nil {
return nil, errors.New("ShouldRetireCache must not be nil")
}
reportCodecs := NewReportCodecs()
var codecLggr logger.Logger
if cfg.ReportingPluginConfig.VerboseLogging {
codecLggr = logger.Named(lggr, "ReportCodecs")
} else {
codecLggr = corelogger.NullLogger
}
reportCodecs := NewReportCodecs(codecLggr)

var t TelemeterService
if cfg.CaptureEATelemetry {
Expand Down Expand Up @@ -126,7 +133,7 @@ func (d *delegate) Start(ctx context.Context) error {
case 1:
lggr = logger.With(lggr, "instanceType", "Green")
}
ocrLogger := logger.NewOCRWrapper(lggr, d.cfg.TraceLogging, func(msg string) {
ocrLogger := logger.NewOCRWrapper(NewSuppressedLogger(lggr, d.cfg.ReportingPluginConfig.VerboseLogging), d.cfg.TraceLogging, func(msg string) {
// TODO: do we actually need to DB-persist errors?
// MERC-3524
})
Expand All @@ -144,7 +151,7 @@ func (d *delegate) Start(ctx context.Context) error {
OffchainKeyring: d.cfg.OffchainKeyring,
OnchainKeyring: d.cfg.OnchainKeyring,
ReportingPluginFactory: datastreamsllo.NewPluginFactory(
d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "LLOReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs,
d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "ReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs,
),
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": d.cfg.JobName.ValueOrZero()}, prometheus.DefaultRegisterer),
})
Expand Down
10 changes: 10 additions & 0 deletions core/services/llo/evm/fees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ func Test_Fees(t *testing.T) {
fee := CalculateFee(tokenPriceInUSD, BaseUSDFee)
assert.Equal(t, big.NewInt(0), fee)
})

t.Run("ridiculously high value rounds down fee to zero", func(t *testing.T) {
// 20dp
tokenPriceInUSD, err := decimal.NewFromString("12984833000000000000")
require.NoError(t, err)
BaseUSDFee, err = decimal.NewFromString("0.1")
require.NoError(t, err)
fee := CalculateFee(tokenPriceInUSD, BaseUSDFee)
assert.Equal(t, big.NewInt(0), fee)
})
}
Loading

0 comments on commit 940b7e6

Please sign in to comment.