From 54a01366608e7276c467834d2fb9d3e164cf6c75 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 5 Aug 2024 16:55:18 +0300 Subject: [PATCH 1/6] CNS-1004: implement qos excellence score aggregation from relay payments --- proto/lavanet/lava/pairing/relay.proto | 7 ++++ utils/convert.go | 10 ++++++ x/pairing/keeper/msg_server_relay_payment.go | 30 ++++++++++++++++ x/pairing/keeper/reputation.go | 26 ++++++++++++++ x/pairing/types/QualityOfServiceReport.go | 26 ++++++++++++++ x/pairing/types/qos_score.go | 38 ++++++++++++++++++++ x/pairing/types/reputation.go | 7 ++++ 7 files changed, 144 insertions(+) create mode 100644 utils/convert.go diff --git a/proto/lavanet/lava/pairing/relay.proto b/proto/lavanet/lava/pairing/relay.proto index 33cae38d02..3ab345ab04 100644 --- a/proto/lavanet/lava/pairing/relay.proto +++ b/proto/lavanet/lava/pairing/relay.proto @@ -94,16 +94,23 @@ message RelayReply { } message QualityOfServiceReport{ + // Latency of provider answers in milliseconds, range 0-inf, lower is better string latency = 1 [ (gogoproto.moretags) = "yaml:\"Latency\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", (gogoproto.nullable) = false ]; + + // Percentage of times the provider returned a non-error response, range 0-1, higher is better string availability = 2 [ (gogoproto.moretags) = "yaml:\"availability\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", (gogoproto.nullable) = false ]; + + // Amount of time the provider is not synced (have the latest block) in milliseconds, range 0-inf, lower is better. + // Example: in ETH we have 15sec block time. So sync = 15000 means that the provider is one block + // behind the actual latest block. string sync = 3 [ (gogoproto.moretags) = "yaml:\"sync\"", (gogoproto.customtype) = "github.com/cosmos/cosmos-sdk/types.Dec", diff --git a/utils/convert.go b/utils/convert.go new file mode 100644 index 0000000000..6fdd89ead4 --- /dev/null +++ b/utils/convert.go @@ -0,0 +1,10 @@ +package utils + +import "math" + +func SafeUint64ToInt64Convert(val uint64) int64 { + if val > math.MaxInt64 { + val = math.MaxInt64 + } + return int64(val) +} diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index 9933b18a46..559429c79d 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -172,6 +172,36 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen k.handleBadgeCu(ctx, badgeData, relay.Provider, relay.CuSum, newBadgeTimerExpiry) } + // update the reputation's epoch QoS score + // the excellece QoS report can be nil when the provider and consumer geolocations are not equal + if relay.QosExcellenceReport != nil { + sub, found := k.subscriptionKeeper.GetSubscription(ctx, project.Subscription) + if !found { + return nil, utils.LavaFormatError("RelayPayment: could not get cluster for reputation score update", fmt.Errorf("relay consumer's subscription not found"), + utils.LogAttr("consumer", clientAddr), + utils.LogAttr("project", project.Index), + utils.LogAttr("subscription", project.Subscription), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + ) + } + + syncFactor := k.ReputationLatencyOverSyncFactor(ctx) + score, err := relay.QosExcellenceReport.ComputeQosExcellenceForReputation(syncFactor) + if err != nil { + return nil, utils.LavaFormatWarning("RelayPayment: could not compute qos excellence score", err, + utils.LogAttr("consumer", clientAddr), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + utils.LogAttr("qos_excellence_report", relay.QosExcellenceReport.String()), + utils.LogAttr("sync_factor", syncFactor.String()), + ) + } + + // note the current weight used is by relay num. In the future, it might change + k.UpdateReputationEpochQosScore(ctx, relay.SpecId, sub.Cluster, relay.Provider, score, utils.SafeUint64ToInt64Convert(relay.RelayNum)) + } + // TODO: add support for spec changes spec, found := k.specKeeper.GetSpec(ctx, relay.SpecId) if !found || !spec.Enabled { diff --git a/x/pairing/keeper/reputation.go b/x/pairing/keeper/reputation.go index 5b46bc4fe8..c57c7dabf0 100644 --- a/x/pairing/keeper/reputation.go +++ b/x/pairing/keeper/reputation.go @@ -4,6 +4,7 @@ import ( "fmt" "cosmossdk.io/collections" + "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/v2/utils" "github.com/lavanet/lava/v2/x/pairing/types" @@ -107,6 +108,31 @@ func (k Keeper) GetAllReputation(ctx sdk.Context) []types.ReputationGenesis { return entries } +// UpdateReputationEpochQosScore updates the epoch QoS score of the provider's reputation using the score from the relay +// payment's QoS excellence report +func (k Keeper) UpdateReputationEpochQosScore(ctx sdk.Context, chainID string, cluster string, provider string, score math.LegacyDec, weight int64) { + // get current reputation and get parameters for the epoch score update + r, found := k.GetReputation(ctx, chainID, cluster, provider) + truncate := false + if found { + stabilizationPeriod := k.ReputationVarianceStabilizationPeriod(ctx) + if r.ShouldTruncate(stabilizationPeriod, ctx.BlockTime().UTC().Unix()) { + truncate = true + } + } else { + // new reputation score is not truncated and its decay factor is equal to 1 + r = types.NewReputation(ctx) + } + + // calculate the updated QoS epoch score + updatedEpochScore := r.EpochScore.Update(score, truncate, weight) + + // update the reputation and set + r.EpochScore = updatedEpochScore + r.TimeLastUpdated = ctx.BlockTime().UTC().Unix() + k.SetReputation(ctx, chainID, cluster, provider, r) +} + // GetReputationScore returns the current reputation pairing score func (k Keeper) GetReputationScore(ctx sdk.Context, chainID string, cluster string, provider string) (val types.ReputationPairingScore, found bool) { block := uint64(ctx.BlockHeight()) diff --git a/x/pairing/types/QualityOfServiceReport.go b/x/pairing/types/QualityOfServiceReport.go index d5bb0b4836..e06b1d7550 100644 --- a/x/pairing/types/QualityOfServiceReport.go +++ b/x/pairing/types/QualityOfServiceReport.go @@ -3,7 +3,9 @@ package types import ( "fmt" + "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/v2/utils" ) func (qos *QualityOfServiceReport) ComputeQoS() (sdk.Dec, error) { @@ -24,3 +26,27 @@ func (qos *QualityOfServiceReport) ComputeQoSExcellence() (sdk.Dec, error) { } return qos.Availability.Quo(qos.Sync).Quo(qos.Latency).ApproxRoot(3) } + +// ComputeQosExcellenceForReputation computes the score from the QoS excellence report to update the provider's reputation +// report score = latency + sync*syncFactor + ((1/availability) - 1) * FailureCost (note: larger value is worse) +func (qos QualityOfServiceReport) ComputeQosExcellenceForReputation(syncFactor math.LegacyDec) (math.LegacyDec, error) { + if qos.Availability.LT(sdk.ZeroDec()) || + qos.Latency.LT(sdk.ZeroDec()) || + qos.Sync.LT(sdk.ZeroDec()) || syncFactor.LT(sdk.ZeroDec()) { + return sdk.ZeroDec(), utils.LavaFormatWarning("ComputeQosExcellenceForReputation: compute failed", fmt.Errorf("QoS excellence scores is below 0"), + utils.LogAttr("availability", qos.Availability.String()), + utils.LogAttr("sync", qos.Sync.String()), + utils.LogAttr("latency", qos.Latency.String()), + ) + } + + latency := qos.Latency + sync := qos.Sync.Mul(syncFactor) + availability := math.LegacyNewDec(FailureCost) + if !qos.Availability.IsZero() { + availability = availability.Mul((math.LegacyOneDec().Quo(qos.Availability)).Sub(math.LegacyOneDec())) + } else { + availability = math.LegacyMaxSortableDec.QuoInt64(2) // on qs.Availability = 0 we take the largest score possible + } + return latency.Add(sync).Add(availability), nil +} diff --git a/x/pairing/types/qos_score.go b/x/pairing/types/qos_score.go index a9b5fc9bc9..e6d2a83225 100644 --- a/x/pairing/types/qos_score.go +++ b/x/pairing/types/qos_score.go @@ -8,6 +8,9 @@ import ( ) var ( + FailureCost int64 = 3000 // failed relay cost for QoS excellence report computation in milliseconds + TruncateStdMargin int64 = 3 // number of standard deviations that determine the truncation limit + // default QoS score is: score = 1.25, var = 0 DefaultQosScore = QosScore{ Score: Frac{Num: math.LegacyNewDec(5), Denom: math.LegacyNewDec(4)}, @@ -41,3 +44,38 @@ func NewQosScore(score Frac, variance Frac) QosScore { func (qs QosScore) Equal(other QosScore) bool { return qs.Score.Equal(other.Score) && qs.Variance.Equal(other.Variance) } + +// Update updates a QosScore with a new score from the QoS excellence report. The new score is truncated by the +// current variance. Then, it's updated using the weight (which is currently the relay num) +func (qs QosScore) Update(score math.LegacyDec, truncate bool, weight int64) QosScore { + if truncate { + score = qs.truncate(score) + } + + // updated_score_num = qos_score_num + score * weight + // updated_score_denom = qos_score_denom + weight + qs.Score.Num = qs.Score.Num.Add(score.MulInt64(weight)) + qs.Score.Denom = qs.Score.Denom.Add(math.LegacyNewDec(weight)) + + // updated_variance_num = qos_variance_num + (qos_score_num - score)^2 * weight + // updated_score_denom = qos_score_denom + weight + qs.Variance.Num = qs.Variance.Num.Add((qs.Score.Num.Sub(score)).Power(2).MulInt64(weight)) + qs.Variance.Denom = qs.Variance.Denom.Add(math.LegacyNewDec(weight)) + + return qs +} + +// Truncate truncates the QoS excellece report score by the current QoS score variance +func (qs QosScore) truncate(score math.LegacyDec) math.LegacyDec { + std, err := qs.Variance.Num.ApproxSqrt() + if err != nil { + utils.LavaFormatError("QosScore truncate: truncate failed, could not calculate qos variance sqrt", err, + utils.LogAttr("qos_score_variance", qs.Variance.String()), + ) + return score + } + + // truncated score = max(min(score, qos_score_num + 3*std), qos_score_num - 3*std) + return math.LegacyMaxDec(math.LegacyMinDec(score, qs.Score.Num.Add(std.MulInt64(TruncateStdMargin))), + qs.Score.Num.Sub(std.MulInt64(TruncateStdMargin))) +} diff --git a/x/pairing/types/reputation.go b/x/pairing/types/reputation.go index 9d34cd91cc..cd4baf299f 100644 --- a/x/pairing/types/reputation.go +++ b/x/pairing/types/reputation.go @@ -54,6 +54,13 @@ func (r Reputation) Equal(other Reputation) bool { r.TimeLastUpdated == other.TimeLastUpdated && r.CreationTime == other.CreationTime } +// ShouldTruncate checks if the ReputationVarianceStabilizationPeriod has passed since +// the reputation's creation. If so, QoS score reports should be truncated before they're added to the +// reputation's epoch QoS score. +func (r Reputation) ShouldTruncate(stabilizationPeriod int64, currentTime int64) bool { + return r.CreationTime+stabilizationPeriod < currentTime +} + // ReputationScoreKey returns a key for the reputations fixation store (reputationsFS) func ReputationScoreKey(chainID string, cluster string, provider string) string { return chainID + " " + cluster + " " + provider From e3f018300048f5e40077fff543f71397c3fd5488 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 5 Aug 2024 16:55:26 +0300 Subject: [PATCH 2/6] CNS-1004: unit tests --- testutil/common/tester.go | 9 + .../keeper/msg_server_relay_payment_test.go | 190 ++++++++++++++++++ .../types/QualityOfServiceReport_test.go | 86 ++++++-- x/pairing/types/reputation_test.go | 39 ++++ 4 files changed, 307 insertions(+), 17 deletions(-) create mode 100644 x/pairing/types/reputation_test.go diff --git a/testutil/common/tester.go b/testutil/common/tester.go index 941958f62c..3e7bad4d67 100644 --- a/testutil/common/tester.go +++ b/testutil/common/tester.go @@ -1053,6 +1053,15 @@ func (ts *Tester) GetNextMonth(from time.Time) int64 { return utils.NextMonth(from).UTC().Unix() } +func (ts *Tester) BlockTimeDefault() time.Duration { + return ts.Keepers.Downtime.GetParams(ts.Ctx).DowntimeDuration +} + +func (ts *Tester) EpochTimeDefault() time.Duration { + epochBlocks := ts.Keepers.Epochstorage.GetParams(ts.Ctx).EpochBlocks + return ts.BlockTimeDefault() * time.Duration(epochBlocks) +} + func (ts *Tester) AdvanceToBlock(block uint64) { if block < ts.BlockHeight() { panic("AdvanceToBlock: block in the past: " + diff --git a/x/pairing/keeper/msg_server_relay_payment_test.go b/x/pairing/keeper/msg_server_relay_payment_test.go index 15c1076728..3b23f1f193 100644 --- a/x/pairing/keeper/msg_server_relay_payment_test.go +++ b/x/pairing/keeper/msg_server_relay_payment_test.go @@ -1067,3 +1067,193 @@ func TestPairingCaching(t *testing.T) { require.Equal(t, totalCU*3, sub.Sub.MonthCuTotal-sub.Sub.MonthCuLeft) } } + +// TestUpdateReputationEpochQosScore tests the update of the reputation's epoch qos score +// Scenarios: +// 1. provider1 sends relay -> its reputation is updated (epoch score and time last updated), +// also, provider2 reputation is not updated +func TestUpdateReputationEpochQosScore(t *testing.T) { + ts := newTester(t) + ts.setupForPayments(2, 1, 0) // 2 providers, 1 client, default providers-to-pair + + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, 0) + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + _, provider2 := ts.GetAccount(common.PROVIDER, 1) + qos := &types.QualityOfServiceReport{ + Latency: sdk.ZeroDec(), + Availability: sdk.OneDec(), + Sync: sdk.ZeroDec(), + } + + res, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster := res.Sub.Cluster + + // set default reputations for both providers. Advance epoch to change the current block time + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider2, types.NewReputation(ts.Ctx)) + ts.AdvanceEpoch() + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), 1) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: []*types.RelaySession{relaySession}, + } + ts.relayPaymentWithoutPay(payment, true) + + // get both providers reputation: provider1 should have its epoch score and time last updated changed, + // provider2 should have nothing change from the default + r1, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + r2, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider2) + require.True(t, found) + + require.Greater(t, r1.TimeLastUpdated, r2.TimeLastUpdated) + epochScore1, err := r1.EpochScore.Score.Resolve() + require.NoError(t, err) + epochScore2, err := r2.EpochScore.Score.Resolve() + require.NoError(t, err) + variance1, err := r1.EpochScore.Variance.Resolve() + require.NoError(t, err) + variance2, err := r2.EpochScore.Variance.Resolve() + require.NoError(t, err) + require.True(t, epochScore1.LT(epochScore2)) // score is lower because QoS is excellent + require.True(t, variance1.GT(variance2)) // variance is higher because the QoS is significantly differnet from DefaultQos +} + +// TestUpdateReputationEpochQosScoreTruncation tests the following scenarios: +// 1. stabilization period has not passed -> no truncation +// 2. stabilization period passed -> with truncation (score update is smaller than the first one) +// note, this test works since we use a bad QoS report (compared to default) so we know that the score should +// increase (which is considered worse) +func TestUpdateReputationEpochQosScoreTruncation(t *testing.T) { + // these will be used to compare the score change with/without truncation + scoreUpdates := []sdk.Dec{} + + // we set the stabilization period to 2 epochs time. Advancing one epoch means we won't truncate, + // advancing 3 means we will truncate. + epochsToAdvance := []uint64{1, 3} + + for i := range epochsToAdvance { + ts := newTester(t) + ts.setupForPayments(1, 1, 0) // 1 provider, 1 client, default providers-to-pair + + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, 0) + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + qos := &types.QualityOfServiceReport{ + Latency: sdk.NewDec(1000), + Availability: sdk.OneDec(), + Sync: sdk.NewDec(1000), + } + + resQCurrent, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster := resQCurrent.Sub.Cluster + + // set stabilization period to be 2*epoch time + resQParams, err := ts.Keepers.Pairing.Params(ts.GoCtx, &types.QueryParamsRequest{}) + require.NoError(t, err) + resQParams.Params.ReputationVarianceStabilizationPeriod = int64(ts.EpochTimeDefault().Seconds()) + ts.Keepers.Pairing.SetParams(ts.Ctx, resQParams.Params) + + // set default reputation + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + + // advance epochs + ts.AdvanceEpochs(epochsToAdvance[i]) + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), 1) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: []*types.RelaySession{relaySession}, + } + ts.relayPaymentWithoutPay(payment, true) + + // get update of epoch score + r, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + epochScoreNoTruncation, err := r.EpochScore.Score.Resolve() + require.NoError(t, err) + defaultEpochScore, err := types.DefaultQosScore.Score.Resolve() + require.NoError(t, err) + scoreUpdates = append(scoreUpdates, epochScoreNoTruncation.Sub(defaultEpochScore)) + } + + // require that the score update that was not truncated is larger than the one that was truncated + require.True(t, scoreUpdates[0].GT(scoreUpdates[1])) +} + +// TestUpdateReputationEpochQosScoreTruncation tests the following scenario: +// 1. relay num is the reputation update weight. More relays = bigger update +func TestUpdateReputationEpochQosScoreRelayNumWeight(t *testing.T) { + // these will be used to compare the score change with high/low relay numbers + scoreUpdates := []sdk.Dec{} + + // we set the stabilization period to 2 epochs time. Advancing one epoch means we won't truncate, + // advancing 3 means we will truncate. + relayNums := []uint64{100, 1} + + for i := range relayNums { + ts := newTester(t) + ts.setupForPayments(1, 1, 0) // 1 provider, 1 client, default providers-to-pair + + consumerAcc, consumer := ts.GetAccount(common.CONSUMER, 0) + _, provider1 := ts.GetAccount(common.PROVIDER, 0) + qos := &types.QualityOfServiceReport{ + Latency: sdk.NewDec(1000), + Availability: sdk.OneDec(), + Sync: sdk.NewDec(1000), + } + + resQCurrent, err := ts.QuerySubscriptionCurrent(consumer) + require.NoError(t, err) + cluster := resQCurrent.Sub.Cluster + + // set stabilization period to be 2*epoch time to avoid truncation + resQParams, err := ts.Keepers.Pairing.Params(ts.GoCtx, &types.QueryParamsRequest{}) + require.NoError(t, err) + resQParams.Params.ReputationVarianceStabilizationPeriod = int64(ts.EpochTimeDefault().Seconds()) + ts.Keepers.Pairing.SetParams(ts.Ctx, resQParams.Params) + + // set default reputation + ts.Keepers.Pairing.SetReputation(ts.Ctx, ts.spec.Index, cluster, provider1, types.NewReputation(ts.Ctx)) + ts.AdvanceEpoch() + + // send relay payment msg from provider1 + relaySession := ts.newRelaySession(provider1, 0, 100, ts.BlockHeight(), relayNums[i]) + relaySession.QosExcellenceReport = qos + sig, err := sigs.Sign(consumerAcc.SK, *relaySession) + require.NoError(t, err) + relaySession.Sig = sig + + payment := types.MsgRelayPayment{ + Creator: provider1, + Relays: []*types.RelaySession{relaySession}, + } + ts.relayPaymentWithoutPay(payment, true) + + // get update of epoch score + r, found := ts.Keepers.Pairing.GetReputation(ts.Ctx, ts.spec.Index, cluster, provider1) + require.True(t, found) + epochScoreNoTruncation, err := r.EpochScore.Score.Resolve() + require.NoError(t, err) + defaultEpochScore, err := types.DefaultQosScore.Score.Resolve() + require.NoError(t, err) + scoreUpdates = append(scoreUpdates, epochScoreNoTruncation.Sub(defaultEpochScore)) + } + + // require that the score update that was with 1000 relay num is larger than the one with one relay num + require.True(t, scoreUpdates[0].GT(scoreUpdates[1])) +} diff --git a/x/pairing/types/QualityOfServiceReport_test.go b/x/pairing/types/QualityOfServiceReport_test.go index 83a15ce6d6..9ed38e134d 100644 --- a/x/pairing/types/QualityOfServiceReport_test.go +++ b/x/pairing/types/QualityOfServiceReport_test.go @@ -3,11 +3,12 @@ package types import ( "testing" + "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" ) -func TestQosReport(t *testing.T) { +func createTestQosReportScores(forReputation bool) ([]math.LegacyDec, error) { qos1 := &QualityOfServiceReport{ Latency: sdk.MustNewDecFromStr("1.5"), Availability: sdk.MustNewDecFromStr("1"), @@ -29,20 +30,71 @@ func TestQosReport(t *testing.T) { Sync: sdk.MustNewDecFromStr("0.5"), } - qos1Res, errQos1 := qos1.ComputeQoSExcellence() - qos2Res, errQos2 := qos2.ComputeQoSExcellence() - qos3Res, errQos3 := qos3.ComputeQoSExcellence() - qos4Res, errQos4 := qos4.ComputeQoSExcellence() - require.NoError(t, errQos1) - require.NoError(t, errQos2) - require.NoError(t, errQos3) - require.NoError(t, errQos4) - require.True(t, qos1Res.LT(qos2Res)) - require.True(t, qos1Res.LT(qos3Res)) - require.True(t, qos1Res.LT(qos4Res)) - - require.True(t, qos2Res.GT(qos3Res)) - require.True(t, qos2Res.GT(qos4Res)) - - require.True(t, qos4Res.LT(qos3Res)) + res := []math.LegacyDec{} + if forReputation { + syncFactor := sdk.MustNewDecFromStr("0.5") + qos1Res, errQos1 := qos1.ComputeQosExcellenceForReputation(syncFactor) + if errQos1 != nil { + return nil, errQos1 + } + qos2Res, errQos2 := qos2.ComputeQosExcellenceForReputation(syncFactor) + if errQos2 != nil { + return nil, errQos2 + } + qos3Res, errQos3 := qos3.ComputeQosExcellenceForReputation(syncFactor) + if errQos3 != nil { + return nil, errQos3 + } + qos4Res, errQos4 := qos4.ComputeQosExcellenceForReputation(syncFactor) + if errQos4 != nil { + return nil, errQos4 + } + res = append(res, qos1Res, qos2Res, qos3Res, qos4Res) + } else { + qos1Res, errQos1 := qos1.ComputeQoSExcellence() + if errQos1 != nil { + return nil, errQos1 + } + qos2Res, errQos2 := qos2.ComputeQoSExcellence() + if errQos2 != nil { + return nil, errQos2 + } + qos3Res, errQos3 := qos3.ComputeQoSExcellence() + if errQos3 != nil { + return nil, errQos3 + } + qos4Res, errQos4 := qos4.ComputeQoSExcellence() + if errQos4 != nil { + return nil, errQos4 + } + res = append(res, qos1Res, qos2Res, qos3Res, qos4Res) + } + + return res, nil +} + +func TestQosReport(t *testing.T) { + res, err := createTestQosReportScores(false) + require.NoError(t, err) + require.True(t, res[0].LT(res[1])) + require.True(t, res[0].LT(res[2])) + require.True(t, res[0].LT(res[3])) + + require.True(t, res[1].GT(res[2])) + require.True(t, res[1].GT(res[3])) + + require.True(t, res[3].LT(res[2])) +} + +func TestQosReportForReputation(t *testing.T) { + res, err := createTestQosReportScores(true) + require.NoError(t, err) + require.True(t, res[0].GT(res[1])) + require.True(t, res[0].GT(res[2])) + require.True(t, res[0].LT(res[3])) + + require.True(t, res[1].LT(res[2])) + require.True(t, res[1].LT(res[3])) + + require.True(t, res[3].GT(res[2])) } diff --git a/x/pairing/types/reputation_test.go b/x/pairing/types/reputation_test.go new file mode 100644 index 0000000000..944b633874 --- /dev/null +++ b/x/pairing/types/reputation_test.go @@ -0,0 +1,39 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestShouldTruncate tests the should truncate method +func TestShouldTruncate(t *testing.T) { + tests := []struct { + name string + creationTime int64 + stabilizationPeriod int64 + currentTime int64 + truncate bool + }{ + { + name: "stabilization time not passed", + creationTime: 1, + stabilizationPeriod: 1, + currentTime: 3, + truncate: true, + }, + { + name: "stabilization time passed", + creationTime: 3, + stabilizationPeriod: 1, + currentTime: 3, + truncate: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reputation := Reputation{CreationTime: tt.creationTime} + require.Equal(t, tt.truncate, reputation.ShouldTruncate(tt.stabilizationPeriod, tt.currentTime)) + }) + } +} From 32a544c6880eae5e2f3449ad8bfc684aaf7081f9 Mon Sep 17 00:00:00 2001 From: Oren Date: Tue, 6 Aug 2024 19:46:30 +0300 Subject: [PATCH 3/6] CNS-1004: add stake to reputation --- x/pairing/keeper/msg_server_relay_payment.go | 12 +++++++++++- x/pairing/keeper/reputation.go | 3 ++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index 559429c79d..5a5e6fad5a 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -198,8 +198,18 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen ) } + stakeEntry, found := k.epochStorageKeeper.GetStakeEntryByAddressCurrent(ctx, relay.SpecId, relay.Provider) + if !found { + return nil, utils.LavaFormatWarning("RelayPayment: could not get stake entry for reputation", fmt.Errorf("stake entry not found"), + utils.LogAttr("consumer", clientAddr), + utils.LogAttr("chain", relay.SpecId), + utils.LogAttr("provider", relay.Provider), + ) + } + effectiveStake := sdk.NewCoin(stakeEntry.Stake.Denom, stakeEntry.EffectiveStake()) + // note the current weight used is by relay num. In the future, it might change - k.UpdateReputationEpochQosScore(ctx, relay.SpecId, sub.Cluster, relay.Provider, score, utils.SafeUint64ToInt64Convert(relay.RelayNum)) + k.UpdateReputationEpochQosScore(ctx, relay.SpecId, sub.Cluster, relay.Provider, score, utils.SafeUint64ToInt64Convert(relay.RelayNum), effectiveStake) } // TODO: add support for spec changes diff --git a/x/pairing/keeper/reputation.go b/x/pairing/keeper/reputation.go index 2facff6914..b982ea91c1 100644 --- a/x/pairing/keeper/reputation.go +++ b/x/pairing/keeper/reputation.go @@ -94,7 +94,7 @@ func (k Keeper) GetAllReputation(ctx sdk.Context) []types.ReputationGenesis { // UpdateReputationEpochQosScore updates the epoch QoS score of the provider's reputation using the score from the relay // payment's QoS excellence report -func (k Keeper) UpdateReputationEpochQosScore(ctx sdk.Context, chainID string, cluster string, provider string, score math.LegacyDec, weight int64) { +func (k Keeper) UpdateReputationEpochQosScore(ctx sdk.Context, chainID string, cluster string, provider string, score math.LegacyDec, weight int64, stake sdk.Coin) { // get current reputation and get parameters for the epoch score update r, found := k.GetReputation(ctx, chainID, cluster, provider) truncate := false @@ -114,6 +114,7 @@ func (k Keeper) UpdateReputationEpochQosScore(ctx sdk.Context, chainID string, c // update the reputation and set r.EpochScore = updatedEpochScore r.TimeLastUpdated = ctx.BlockTime().UTC().Unix() + r.Stake = stake k.SetReputation(ctx, chainID, cluster, provider, r) } From 1a599a017ebc67d4d9f673ff4329daa4ec3f7110 Mon Sep 17 00:00:00 2001 From: Oren Date: Wed, 7 Aug 2024 11:35:54 +0300 Subject: [PATCH 4/6] CNS-1004: add stake check in unit test --- x/pairing/keeper/msg_server_relay_payment_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x/pairing/keeper/msg_server_relay_payment_test.go b/x/pairing/keeper/msg_server_relay_payment_test.go index 3b23f1f193..9de1ca65b6 100644 --- a/x/pairing/keeper/msg_server_relay_payment_test.go +++ b/x/pairing/keeper/msg_server_relay_payment_test.go @@ -1125,6 +1125,10 @@ func TestUpdateReputationEpochQosScore(t *testing.T) { require.NoError(t, err) require.True(t, epochScore1.LT(epochScore2)) // score is lower because QoS is excellent require.True(t, variance1.GT(variance2)) // variance is higher because the QoS is significantly differnet from DefaultQos + + entry, found := ts.Keepers.Epochstorage.GetStakeEntryByAddressCurrent(ts.Ctx, ts.spec.Index, provider1) + require.True(t, found) + require.True(t, entry.Stake.IsEqual(r1.Stake)) } // TestUpdateReputationEpochQosScoreTruncation tests the following scenarios: From 994e762331f8196bf3e3f3d8cfdbbd817f8e72c5 Mon Sep 17 00:00:00 2001 From: Oren Date: Tue, 13 Aug 2024 11:18:26 +0300 Subject: [PATCH 5/6] CNS-1004: minor adds --- x/pairing/keeper/msg_server_relay_payment.go | 2 +- x/pairing/types/qos_score.go | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index 44d6daa29e..c36f6741cf 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -198,7 +198,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen ) } - stakeEntry, found := k.epochStorageKeeper.GetStakeEntryByAddressCurrent(ctx, relay.SpecId, relay.Provider) + stakeEntry, found := k.epochStorageKeeper.GetStakeEntryCurrent(ctx, relay.SpecId, relay.Provider) if !found { return nil, utils.LavaFormatWarning("RelayPayment: could not get stake entry for reputation", fmt.Errorf("stake entry not found"), utils.LogAttr("consumer", clientAddr), diff --git a/x/pairing/types/qos_score.go b/x/pairing/types/qos_score.go index 68a77bd379..746ee1ecfb 100644 --- a/x/pairing/types/qos_score.go +++ b/x/pairing/types/qos_score.go @@ -45,6 +45,15 @@ func (qs QosScore) Equal(other QosScore) bool { return qs.Score.Equal(other.Score) && qs.Variance.Equal(other.Variance) } +// Validate validates that both nums are non-negative and both denoms are strictly positive (non-zero) +func (qs QosScore) Validate() bool { + if qs.Score.Num.IsNegative() || !qs.Score.Denom.IsPositive() || qs.Variance.Num.IsNegative() || + !qs.Variance.Denom.IsPositive() { + return false + } + return true +} + // Update updates a QosScore with a new score from the QoS excellence report. The new score is truncated by the // current variance. Then, it's updated using the weight (which is currently the relay num) func (qs QosScore) Update(score math.LegacyDec, truncate bool, weight int64) QosScore { From 8a34d7476fa702b8c2b5e166964e13b1b91d66e4 Mon Sep 17 00:00:00 2001 From: Oren Date: Wed, 4 Sep 2024 12:25:14 +0300 Subject: [PATCH 6/6] CNS-1004: fix after merge --- x/pairing/types/QualityOfServiceReport.go | 2 +- x/pairing/types/relay.pb.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/x/pairing/types/QualityOfServiceReport.go b/x/pairing/types/QualityOfServiceReport.go index e06b1d7550..51b836e967 100644 --- a/x/pairing/types/QualityOfServiceReport.go +++ b/x/pairing/types/QualityOfServiceReport.go @@ -5,7 +5,7 @@ import ( "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/lavanet/lava/v2/utils" + "github.com/lavanet/lava/v3/utils" ) func (qos *QualityOfServiceReport) ComputeQoS() (sdk.Dec, error) { diff --git a/x/pairing/types/relay.pb.go b/x/pairing/types/relay.pb.go index df0b7b802d..c9eb9831bb 100644 --- a/x/pairing/types/relay.pb.go +++ b/x/pairing/types/relay.pb.go @@ -765,9 +765,14 @@ func (m *RelayReply) GetMetadata() []Metadata { } type QualityOfServiceReport struct { - Latency github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,1,opt,name=latency,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"latency" yaml:"Latency"` + // Latency of provider answers in milliseconds, range 0-inf, lower is better + Latency github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,1,opt,name=latency,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"latency" yaml:"Latency"` + // Percentage of times the provider returned a non-error response, range 0-1, higher is better Availability github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,2,opt,name=availability,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"availability" yaml:"availability"` - Sync github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,3,opt,name=sync,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"sync" yaml:"sync"` + // Amount of time the provider is not synced (have the latest block) in milliseconds, range 0-inf, lower is better. + // Example: in ETH we have 15sec block time. So sync = 15000 means that the provider is one block + // behind the actual latest block. + Sync github_com_cosmos_cosmos_sdk_types.Dec `protobuf:"bytes,3,opt,name=sync,proto3,customtype=github.com/cosmos/cosmos-sdk/types.Dec" json:"sync" yaml:"sync"` } func (m *QualityOfServiceReport) Reset() { *m = QualityOfServiceReport{} }