Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GROW-1241-health-command-single-provider-arg #1475

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion protocol/monitoring/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ func (al *Alerting) ProvidersAlerts(healthResults *HealthResults) {
latestBlock := healthResults.LatestBlocks[specId]
if latestBlock > data.Block {
gap := latestBlock - data.Block
timeGap := time.Duration(gap*healthResults.Specs[specId].AverageBlockTime) * time.Millisecond
specHealthResult, ok := healthResults.Specs[specId]
if !ok {
utils.LavaFormatFatal("Invalid specid - missing in healthResults", nil, utils.Attribute{Key: "specId", Value: specId})
}
timeGap := time.Duration(gap*specHealthResult.AverageBlockTime) * time.Millisecond
if timeGap > al.allowedTimeGapVsReference {
attrs = append(attrs, AlertAttribute{entity: provider, data: fmt.Sprintf("block gap: %s/%s", utils.StrValue(data.Block), utils.StrValue(latestBlock))})
}
Expand Down
153 changes: 142 additions & 11 deletions protocol/monitoring/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func RunHealth(ctx context.Context,
consumerEndpoints []*HealthRPCEndpoint,
referenceEndpoints []*HealthRPCEndpoint,
prometheusListenAddr string,
resultsPostGUID string,
singleProviderSpecsInterfacesData map[string][]string,
) (*HealthResults, error) {
specQuerier := spectypes.NewQueryClient(clientCtx)
healthResults := &HealthResults{
Expand All @@ -89,6 +91,8 @@ func RunHealth(ctx context.Context,
UnhealthyProviders: map[LavaEntity]string{},
UnhealthyConsumers: map[LavaEntity]string{},
Specs: map[string]*spectypes.Spec{},
ResultsPostGUID: resultsPostGUID,
ProviderAddresses: providerAddresses,
}
currentBlock := int64(0)
for i := 0; i < BasicQueryRetries; i++ {
Expand Down Expand Up @@ -125,6 +129,24 @@ func RunHealth(ctx context.Context,
getAllProviders = true
}

// we can limit the specs we are checking if those where given as arguments
var lookupSpecsFromArg []string

if singleProviderSpecsInterfacesData != nil {
lookupSpecsFromArg = make([]string, 0, len(singleProviderSpecsInterfacesData))
for k := range singleProviderSpecsInterfacesData {
k = strings.ToUpper(strings.TrimSpace(k))
if len(k) > 2 {
lookupSpecsFromArg = append(lookupSpecsFromArg, k)
}
}
if len(lookupSpecsFromArg) == 0 {
lookupSpecsFromArg = nil
}
} else {
lookupSpecsFromArg = nil
}

errCh := make(chan error, 1)

// get a list of all necessary specs for the test
Expand All @@ -150,6 +172,19 @@ func RunHealth(ctx context.Context,
for _, specInfo := range specsResp.ChainInfoList {
healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID})
}
} else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 {
for _, providerAddress := range providerAddresses {
for spec, apiInterfaces := range singleProviderSpecsInterfacesData {
healthResults.setSpec(&spectypes.Spec{Index: spec})
for _, apiInterface := range apiInterfaces {
healthResults.SetProviderData(LavaEntity{
Address: providerAddress,
SpecId: spec,
ApiInterface: apiInterface,
}, ReplyData{})
}
}
}
mikecot marked this conversation as resolved.
Show resolved Hide resolved
} else {
var wgproviders sync.WaitGroup
wgproviders.Add(len(providerAddresses))
Expand All @@ -163,11 +198,13 @@ func RunHealth(ctx context.Context,
Delegator: providerAddress,
WithPending: false,
})

cancel()
if err != nil || response == nil {
time.Sleep(QuerySleepTime)
continue
}

delegations := response.GetDelegations()
for _, delegation := range delegations {
if delegation.Provider == providerAddress {
Expand Down Expand Up @@ -246,14 +283,23 @@ func RunHealth(ctx context.Context,
return nil, utils.LavaFormatWarning("[-] populating specs", <-errCh)
}
pairingQuerier := pairingtypes.NewQueryClient(clientCtx)
utils.LavaFormatDebug("[+] getting provider entries")

utils.LavaFormatDebug("[+] Starting to get provider entries")

stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{}
var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries
wgspecs.Add(len(healthResults.getSpecs()))
if lookupSpecsFromArg == nil {
wgspecs.Add(len(healthResults.getSpecs()))
}

processSpecProviders := func(specId string) {
defer wgspecs.Done()
if lookupSpecsFromArg == nil {
defer wgspecs.Done()
}

var err error
for i := 0; i < BasicQueryRetries; i++ {
utils.LavaFormatDebug("[+] Attempting to query providers", utils.LogAttr("attempt", i+1), utils.LogAttr("specId", specId))
var response *pairingtypes.QueryProvidersResponse
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
response, err = pairingQuerier.Providers(queryCtx, &pairingtypes.QueryProvidersRequest{
Expand All @@ -262,55 +308,114 @@ func RunHealth(ctx context.Context,
})
cancel()
if err != nil || response == nil {
utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response))
time.Sleep(QuerySleepTime)
continue
}

for _, providerEntry := range response.StakeEntry {
if len(providerAddresses) > 0 {
found := false
for _, address := range providerAddresses {
if address == providerEntry.Address {
found = true
break
}
}
if !found {
continue
}
}

providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
}

apiInterfaces := chainIdToApiInterfaces[specId]
// just to check if this is a provider we need to check we need one of the apiInterfaces
if len(apiInterfaces) == 0 {
utils.LavaFormatError("invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId))
utils.LavaFormatError("[!] invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId))
// shouldn't happen
continue
}

lookupKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
ApiInterface: apiInterfaces[0],
}

mutex.Lock() // Lock before updating stakeEntries

if _, ok := healthResults.getProviderData(lookupKey); ok || getAllProviders {
if providerEntry.StakeAppliedBlock > uint64(currentBlock) {
healthResults.FreezeProvider(providerKey)
} else {
stakeEntries[providerKey] = providerEntry
}
}

mutex.Unlock()
}
return
break
}
if err != nil {
utils.LavaFormatError("[!] Error after retries", err)
select {
case errCh <- err:
utils.LavaFormatDebug("[+] Error sent to channel", utils.LogAttr("error", err))
default:
utils.LavaFormatDebug("[!] Error channel full, error not sent", utils.LogAttr("error", err))
}
}
}
// get provider stake entries
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
// get provider stake entries for each spec or only for the ones given as arguments
if lookupSpecsFromArg != nil {
for specId := range healthResults.getSpecs() {
utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed
for _, arg := range lookupSpecsFromArg {
if arg == strings.ToUpper(specId) {
utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found
processSpecProviders(specId)
break
}
}
}
} else {
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
}
}
wgspecs.Wait()

if lookupSpecsFromArg == nil {
wgspecs.Wait()
}

// check for mismtaches in the pairings query and the arguments
// This flow can be triggered with the following command:
// lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510
if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 {
for _, address := range providerAddresses {
for specId, apiInterfaces := range singleProviderSpecsInterfacesData {
for _, apiInterface := range apiInterfaces {
lookupKey := LavaEntity{
Address: address,
SpecId: specId,
ApiInterface: apiInterface,
}
if _, ok := stakeEntries[lookupKey]; !ok {
healthResults.SetUnhealthyProvider(lookupKey, "no pairings found")
}
}
}
}
}

if len(errCh) > 0 {
return nil, utils.LavaFormatWarning("[-] processing providers entries", <-errCh)
}

utils.LavaFormatDebug("[+] checking subscriptions")
err = checkSubscriptions(ctx, clientCtx, subscriptionAddresses, healthResults)
if err != nil {
Expand Down Expand Up @@ -543,11 +648,23 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
for _, endpoint := range providerEntry.Endpoints {
checkOneProvider := func(endpoint epochstoragetypes.Endpoint, apiInterface string, addon string, providerEntry epochstoragetypes.StakeEntry) (time.Duration, string, int64, error) {
cswp := lavasession.ConsumerSessionsWithProvider{}
relayerClientPt, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err != nil {

var relayerClientPt *pairingtypes.RelayerClient
var conn *grpc.ClientConn
var err error

for i := 0; i < 3; i++ {
relayerClientPt, conn, err = cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err == nil {
break
}
utils.LavaFormatDebug("failed connecting to provider endpoint", utils.LogAttr("error", err), utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
}

if err != nil {
return 0, "", 0, err
}

defer conn.Close()
relayerClient := *relayerClientPt
guid := uint64(rand.Int63())
Expand Down Expand Up @@ -588,17 +705,31 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
}
return relayLatency, versions, probeResp.GetLatestBlock(), nil
}

endpointServices := endpoint.GetSupportedServices()
if len(endpointServices) == 0 {
utils.LavaFormatWarning("endpoint has no supported services", nil, utils.Attribute{Key: "endpoint", Value: endpoint})
}

for _, endpointService := range endpointServices {
providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: providerEntry.Chain,
ApiInterface: endpointService.ApiInterface,
}

probeLatency, version, latestBlockFromProbe, err := checkOneProvider(endpoint, endpointService.ApiInterface, endpointService.Addon, providerEntry)

utils.LavaFormatDebug("[+] checked provider",
utils.LogAttr("endpoint", endpoint),
utils.LogAttr("apiInterface", endpointService.ApiInterface),
utils.LogAttr("addon", endpointService.Addon),
utils.LogAttr("providerEntry", providerEntry),
utils.LogAttr("probeLatency", probeLatency),
utils.LogAttr("version", version),
utils.LogAttr("latestBlockFromProbe", latestBlockFromProbe),
utils.LogAttr("error", err))

if err != nil {
errMsg := prettifyProviderError(err)
healthResults.SetUnhealthyProvider(providerKey, errMsg)
Expand Down
Loading
Loading