Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: PRT - QoS Manager Refactor - Part 2 #1914

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/protocol/provideroptimizer"
"github.com/lavanet/lava/v4/protocol/qos"
"github.com/lavanet/lava/v4/utils"
"github.com/lavanet/lava/v4/utils/rand"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
Expand Down Expand Up @@ -725,8 +726,8 @@ func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress s
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return lavasession.NewConsumerSessionManager(
&lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0},
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"),
nil, nil, consumerPublicAddress,
lavasession.NewActiveSubscriptionProvidersStorage(),
qos.NewQoSManager(provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare")),
)
}
6 changes: 3 additions & 3 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ type ConflictHandlerInterface interface {
}

type ProviderInfo struct {
ProviderAddress string
ProviderQoSExcellenceSummery sdk.Dec // the number represents the average qos for this provider session
ProviderStake sdk.Coin
ProviderAddress string
ProviderReputationSummery sdk.Dec // the number represents the average qos for this provider session
ProviderStake sdk.Coin
}

type RelayResult struct {
Expand Down
6 changes: 4 additions & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/protocol/performance"
"github.com/lavanet/lava/v4/protocol/provideroptimizer"
"github.com/lavanet/lava/v4/protocol/qos"
"github.com/lavanet/lava/v4/protocol/rpcconsumer"
"github.com/lavanet/lava/v4/protocol/rpcprovider"
"github.com/lavanet/lava/v4/protocol/rpcprovider/reliabilitymanager"
Expand Down Expand Up @@ -234,7 +235,8 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare")
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage())
qosManager := qos.NewQoSManager(optimizer)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage(), qosManager)
consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList)

// Just setting the providers available extensions and policy so the consumer is aware of them
Expand Down Expand Up @@ -279,7 +281,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc
consumerCmdFlags := common.ConsumerCmdFlags{}
rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil, nil)
require.NoError(t, err)
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil)
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil, consumerSessionManager)
require.NoError(t, err)

// wait for consumer to finish initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/v4/protocol/chainlib"
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/provideroptimizer"
"github.com/lavanet/lava/v4/protocol/qos"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -202,13 +204,23 @@ func TestConsensusHashesInsertion(t *testing.T) {

func TestQoS(t *testing.T) {
decToSet, _ := sdk.NewDecFromStr("0.05") // test values fit 0.05 Availability requirements
lavasession.AvailabilityPercentage = decToSet
qos.AvailabilityPercentage = decToSet
rand.InitRandomSeed()
chainsToTest := []string{"APT1", "LAV1", "ETH1"}

waitForDoneChan := func(doneChan <-chan struct{}) {
select {
case <-doneChan:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for qos calculation to finish")
}
}

for i := 0; i < 10; i++ {
for _, chainID := range chainsToTest {
t.Run(chainID, func(t *testing.T) {
ctx := context.Background()
qosManager := qos.NewQoSManager(&provideroptimizer.ProviderOptimizer{})
chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, chainID, "0", func(http.ResponseWriter, *http.Request) {}, nil, "../../../", nil)
if closeServer != nil {
defer closeServer()
Expand Down Expand Up @@ -282,54 +294,65 @@ func TestQoS(t *testing.T) {
currentLatency := time.Millisecond
expectedLatency := time.Millisecond
latestServicedBlock := expectedBH
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.TotalSyncScore)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
waitForDoneChan(qosManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1))
require.Equal(t, uint64(1), qosManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(1), qosManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(1), qosManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(1), qosManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport := qosManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.OneDec(), lastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.TotalSyncScore)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

singleConsumerSession.QoSInfo.TotalRelays++ // this is how we add a failure
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(3), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.TotalSyncScore)

require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
waitForDoneChan(qosManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1))
require.Equal(t, uint64(2), qosManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(2), qosManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(2), qosManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(2), qosManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = qosManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.OneDec(), lastQoSReport.Availability)
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

waitForDoneChan(qosManager.AddFailedRelay(epoch, singleConsumerSession.SessionId)) // this is how we add a failure
waitForDoneChan(qosManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1))
require.Equal(t, uint64(3), qosManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(4), qosManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), qosManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), qosManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = qosManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.OneDec(), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH - 1 // is one block below threshold
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(5), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
require.Equal(t, int64(4), singleConsumerSession.QoSInfo.TotalSyncScore)

require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.75"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
waitForDoneChan(qosManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1))
require.Equal(t, uint64(4), qosManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, uint64(5), qosManager.GetTotalRelays(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(3), qosManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId))
require.Equal(t, int64(4), qosManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId))

lastQoSReport = qosManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.75"), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
// add in a loop so availability goes above 95%
doneChan := make(<-chan struct{})
for i := 5; i < 100; i++ {
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
doneChan = qosManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
}
require.Equal(t, sdk.MustNewDecFromStr("0.8"), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)
waitForDoneChan(doneChan)

lastQoSReport = qosManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId)
require.Equal(t, sdk.MustNewDecFromStr("0.8"), lastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), lastQoSReport.Sync)
require.Equal(t, sdk.OneDec(), lastQoSReport.Latency)

finalizationInsertionsSpreadBlocks := []finalizationTestInsertion{
finalizationInsertionForProviders(chainID, epoch, 200, 0, 1, true, "", blocksInFinalizationProof, blockDistanceForFinalizedData)[0],
Expand Down
69 changes: 69 additions & 0 deletions protocol/lavaprotocol/qos_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 12 additions & 7 deletions protocol/lavaprotocol/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ func NewRelayData(ctx context.Context, connectionType, apiUrl string, data []byt
return relayData
}

func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.RelayPrivateData, chainID, providerPublicAddress string, singleConsumerSession *lavasession.SingleConsumerSession, epoch int64, reportedProviders []*pairingtypes.ReportedProvider) *pairingtypes.RelaySession {
type QoSManager interface {
GetLastQoSReport(epoch uint64, sessionId int64) *pairingtypes.QualityOfServiceReport
GetLastReputationQoSReportRaw(epoch uint64, sessionId int64) *pairingtypes.QualityOfServiceReport
}

func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.RelayPrivateData, chainID, providerPublicAddress string, singleConsumerSession *lavasession.SingleConsumerSession, epoch int64, reportedProviders []*pairingtypes.ReportedProvider, qosManager QoSManager) *pairingtypes.RelaySession {
copyQoSServiceReport := func(reportToCopy *pairingtypes.QualityOfServiceReport) *pairingtypes.QualityOfServiceReport {
if reportToCopy != nil {
QOS := *reportToCopy
Expand All @@ -71,11 +76,11 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re
return nil
}

copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastQoSReport)
copiedExcellenceQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastExcellenceQoSReportRaw) // copy raw report for the node
copiedQOS := copyQoSServiceReport(qosManager.GetLastQoSReport(uint64(epoch), singleConsumerSession.SessionId))
copiedReputation := copyQoSServiceReport(qosManager.GetLastReputationQoSReportRaw(uint64(epoch), singleConsumerSession.SessionId)) // copy raw report for the node

// validate and fix QoS excellence report before sending it to the node
copiedExcellenceQOS.ValidateAndFixQoSExcellence()
copiedReputation.ValidateAndFixQoSExcellence()

return &pairingtypes.RelaySession{
SpecId: chainID,
Expand All @@ -90,14 +95,14 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re
LavaChainId: lavaChainID,
Sig: nil,
Badge: nil,
QosExcellenceReport: copiedExcellenceQOS,
QosExcellenceReport: copiedReputation,
}
}

func ConstructRelayRequest(ctx context.Context, privKey *btcec.PrivateKey, lavaChainID, chainID string, relayRequestData *pairingtypes.RelayPrivateData, providerPublicAddress string, consumerSession *lavasession.SingleConsumerSession, epoch int64, reportedProviders []*pairingtypes.ReportedProvider) (*pairingtypes.RelayRequest, error) {
func ConstructRelayRequest(ctx context.Context, privKey *btcec.PrivateKey, lavaChainID, chainID string, relayRequestData *pairingtypes.RelayPrivateData, providerPublicAddress string, consumerSession *lavasession.SingleConsumerSession, epoch int64, reportedProviders []*pairingtypes.ReportedProvider, qosManager QoSManager) (*pairingtypes.RelayRequest, error) {
relayRequest := &pairingtypes.RelayRequest{
RelayData: relayRequestData,
RelaySession: ConstructRelaySession(lavaChainID, relayRequestData, chainID, providerPublicAddress, consumerSession, epoch, reportedProviders),
RelaySession: ConstructRelaySession(lavaChainID, relayRequestData, chainID, providerPublicAddress, consumerSession, epoch, reportedProviders, qosManager),
}
sig, err := sigs.Sign(privKey, *relayRequest.RelaySession)
if err != nil {
Expand Down
Loading
Loading