diff --git a/protocol/monitoring/alerting.go b/protocol/monitoring/alerting.go index 6cf686100f..99f429b1b5 100644 --- a/protocol/monitoring/alerting.go +++ b/protocol/monitoring/alerting.go @@ -354,10 +354,19 @@ func (al *Alerting) ProvidersAlerts(healthResults *HealthResults) { for provider, data := range healthResults.ProviderData { specId := provider.SpecId if al.allowedTimeGapVsReference > 0 { - latestBlock := healthResults.LatestBlocks[specId] + latestBlock, ok := healthResults.LatestBlocks[specId] + if !ok { + utils.LavaFormatError("Invalid spec id - missing in healthResults", nil, utils.Attribute{Key: "specId", Value: specId}) + return + } if latestBlock > data.Block { gap := latestBlock - data.Block - timeGap := time.Duration(gap*healthResults.Specs[specId].AverageBlockTime) * time.Millisecond + specHealthResult, ok := healthResults.Specs[specId] + if !ok { + utils.LavaFormatError("Invalid spec id - missing in healthResults", nil, utils.Attribute{Key: "specId", Value: specId}) + return + } + timeGap := time.Duration(gap*specHealthResult.AverageBlockTime) * time.Millisecond if timeGap > al.allowedTimeGapVsReference { attrs = append(attrs, AlertAttribute{entity: provider, data: fmt.Sprintf("block gap: %s/%s", utils.StrValue(data.Block), utils.StrValue(latestBlock))}) } diff --git a/protocol/monitoring/health.go b/protocol/monitoring/health.go index 64209c8531..fb254d9b34 100644 --- a/protocol/monitoring/health.go +++ b/protocol/monitoring/health.go @@ -2,13 +2,12 @@ package monitoring import ( "context" + "encoding/json" "fmt" "strings" "sync" "time" - "github.com/goccy/go-json" - "github.com/cosmos/cosmos-sdk/client" "github.com/gogo/status" lvutil "github.com/lavanet/lava/v2/ecosystem/lavavisor/pkg/util" @@ -19,7 +18,6 @@ import ( "github.com/lavanet/lava/v2/protocol/rpcprovider" "github.com/lavanet/lava/v2/utils" "github.com/lavanet/lava/v2/utils/rand" - dualstakingtypes "github.com/lavanet/lava/v2/x/dualstaking/types" epochstoragetypes "github.com/lavanet/lava/v2/x/epochstorage/types" pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" protocoltypes "github.com/lavanet/lava/v2/x/protocol/types" @@ -78,6 +76,8 @@ func RunHealth(ctx context.Context, consumerEndpoints []*HealthRPCEndpoint, referenceEndpoints []*HealthRPCEndpoint, prometheusListenAddr string, + resultsPostGUID string, + singleProviderSpecsInterfacesData map[string][]string, ) (*HealthResults, error) { specQuerier := spectypes.NewQueryClient(clientCtx) healthResults := &HealthResults{ @@ -89,6 +89,8 @@ func RunHealth(ctx context.Context, UnhealthyProviders: map[LavaEntity]string{}, UnhealthyConsumers: map[LavaEntity]string{}, Specs: map[string]*spectypes.Spec{}, + ResultsPostGUID: resultsPostGUID, + ProviderAddresses: providerAddresses, } currentBlock := int64(0) for i := 0; i < BasicQueryRetries; i++ { @@ -125,11 +127,32 @@ func RunHealth(ctx context.Context, getAllProviders = true } + // we can limit the specs we are checking if those where given as arguments + var lookupSpecsFromArg []string + + if singleProviderSpecsInterfacesData != nil { + lookupSpecsFromArg = make([]string, 0, len(singleProviderSpecsInterfacesData)) + for k := range singleProviderSpecsInterfacesData { + k = strings.ToUpper(strings.TrimSpace(k)) + if len(k) > 2 { + lookupSpecsFromArg = append(lookupSpecsFromArg, k) + } + } + if len(lookupSpecsFromArg) == 0 { + lookupSpecsFromArg = nil + } + } else { + lookupSpecsFromArg = nil + } + errCh := make(chan error, 1) + stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{} + // get a list of all necessary specs for the test - dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx) + // dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx) if getAllProviders { + // fmt.Println("line 155 getAllProviders") // var specResp *spectypes.QueryGetSpecResponse var specsResp *spectypes.QueryShowAllChainsResponse for i := 0; i < BasicQueryRetries; i++ { @@ -150,45 +173,107 @@ func RunHealth(ctx context.Context, for _, specInfo := range specsResp.ChainInfoList { healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID}) } + } else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 { + // fmt.Println("line 176 singleProviderSpecsInterfacesData", singleProviderSpecsInterfacesData) + // fmt.Println("line 176 providerAddresses", providerAddresses) + for _, providerAddress := range providerAddresses { + // fmt.Println("line 180 providerAddress", providerAddress) + for spec, apiInterfaces := range singleProviderSpecsInterfacesData { + // fmt.Println("line 182 spec", spec) + // fmt.Println("line 182 apiInterfaces", apiInterfaces) + healthResults.setSpec(&spectypes.Spec{Index: spec}) + for _, apiInterface := range apiInterfaces { + healthResults.SetProviderData(LavaEntity{ + Address: providerAddress, + SpecId: spec, + ApiInterface: apiInterface, + }, ReplyData{}) + } + } + } + } else { + // fmt.Println("line 195 else") var wgproviders sync.WaitGroup wgproviders.Add(len(providerAddresses)) processProvider := func(providerAddress string) { defer wgproviders.Done() - var err error + // var err error for i := 0; i < BasicQueryRetries; i++ { - var response *dualstakingtypes.QueryDelegatorProvidersResponse + // var response *dualstakingtypes.QueryDelegatorProvidersResponse queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{ - Delegator: providerAddress, - WithPending: false, + // response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{ + // Delegator: providerAddress, + // WithPending: false, + // }) + // fmt.Println("!!line 208 response", response) + + // responseJSON1, err := json.MarshalIndent(response, "", " ") + // if err != nil { + // fmt.Println("!!Error marshaling response to JSON:", err) + // } else { + // fmt.Println("!!line 214 response (JSON):") + // fmt.Println(string(responseJSON1)) + // } + + pairingQuerier := pairingtypes.NewQueryClient(clientCtx) + + response, err := pairingQuerier.Provider(queryCtx, &pairingtypes.QueryProviderRequest{ + Address: providerAddress, + // ShowFrozen: true, ? }) + + // Print response2 in JSON format + // responseJSON, err := json.MarshalIndent(response2, "", " ") + // if err != nil { + // fmt.Println("!!Error marshaling response2 to JSON:", err) + // } else { + // fmt.Println("!!line 214 response2 (JSON):") + // fmt.Println(string(responseJSON)) + // } + + // fmt.Println("line 214 err2", err2) + cancel() - if err != nil || response == nil { - time.Sleep(QuerySleepTime) - continue - } - delegations := response.GetDelegations() - for _, delegation := range delegations { - if delegation.Provider == providerAddress { - healthResults.setSpec(&spectypes.Spec{Index: delegation.ChainID}) - for _, apiInterface := range chainIdToApiInterfaces[delegation.ChainID] { - healthResults.SetProviderData(LavaEntity{ - Address: providerAddress, - SpecId: delegation.ChainID, - ApiInterface: apiInterface, - }, ReplyData{}) + if err != nil { + fmt.Println("Error querying provider:", err) + } else if response != nil && len(response.StakeEntries) > 0 { + for _, stakeEntry := range response.StakeEntries { + + // Set the spec + healthResults.setSpec(&spectypes.Spec{Index: stakeEntry.Chain}) + + // Process endpoints and API interfaces + for _, endpoint := range stakeEntry.Endpoints { + for _, apiInterface := range endpoint.ApiInterfaces { + providerWithSpecAndAPI := LavaEntity{ + Address: stakeEntry.Address, + SpecId: stakeEntry.Chain, + ApiInterface: apiInterface, + } + healthResults.SetProviderData(providerWithSpecAndAPI, ReplyData{}) + + // if stakeEntry.StakeAppliedBlock > uint64(currentBlock) { + if stakeEntry.StakeAppliedBlock > uint64(currentBlock) || stakeEntry.IsFrozen() { + healthResults.FreezeProvider(providerWithSpecAndAPI) + // fmt.Printf("Froze provider %+v\n", providerWithSpecAndAPI) + } else { + stakeEntries[providerWithSpecAndAPI] = stakeEntry + // fmt.Printf("Added stake entry for %+v\n", providerWithSpecAndAPI) + } + } } } - } - return - } - if err != nil { - select { - case errCh <- err: - default: + } else { + fmt.Println("No provider data received") } } + // if err != nil { + // select { + // case errCh <- err: + // default: + // } + // } } for _, providerAddress := range providerAddresses { @@ -237,7 +322,9 @@ func RunHealth(ctx context.Context, wgspecs.Add(len(specs)) // populate the specs utils.LavaFormatDebug("[+] populating specs") + // fmt.Println("line 1111 specs", specs) for specId := range specs { + // fmt.Println("line 1112 specId", specId) go processSpec(specId) } @@ -245,72 +332,151 @@ func RunHealth(ctx context.Context, if len(errCh) > 0 { return nil, utils.LavaFormatWarning("[-] populating specs", <-errCh) } - pairingQuerier := pairingtypes.NewQueryClient(clientCtx) - utils.LavaFormatDebug("[+] getting provider entries") - stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{} - var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries - wgspecs.Add(len(healthResults.getSpecs())) - processSpecProviders := func(specId string) { - defer wgspecs.Done() - var err error - for i := 0; i < BasicQueryRetries; i++ { - var response *pairingtypes.QueryProvidersResponse - queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - response, err = pairingQuerier.Providers(queryCtx, &pairingtypes.QueryProvidersRequest{ - ChainID: specId, - ShowFrozen: true, - }) - cancel() - if err != nil || response == nil { - time.Sleep(QuerySleepTime) - continue - } - - for _, providerEntry := range response.StakeEntry { - providerKey := LavaEntity{ - Address: providerEntry.Address, - SpecId: specId, - } - apiInterfaces := chainIdToApiInterfaces[specId] - // just to check if this is a provider we need to check we need one of the apiInterfaces - if len(apiInterfaces) == 0 { - utils.LavaFormatError("invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId)) - // shouldn't happen - continue - } - lookupKey := LavaEntity{ - Address: providerEntry.Address, - SpecId: specId, - ApiInterface: apiInterfaces[0], - } - - mutex.Lock() // Lock before updating stakeEntries - if _, ok := healthResults.getProviderData(lookupKey); ok || getAllProviders { - if providerEntry.StakeAppliedBlock > uint64(currentBlock) { - healthResults.FreezeProvider(providerKey) - } else { - stakeEntries[providerKey] = providerEntry + // pairingQuerier := pairingtypes.NewQueryClient(clientCtx) + + utils.LavaFormatDebug("[+] Starting to get provider entries") + + // var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries + if lookupSpecsFromArg == nil { + wgspecs.Add(len(healthResults.getSpecs())) + } + + // processSpecProviders := func(specId string) { + // if lookupSpecsFromArg == nil { + // defer wgspecs.Done() + // } + + // var err error + // for i := 0; i < BasicQueryRetries; i++ { + // utils.LavaFormatDebug("[+] Attempting to query providers", utils.LogAttr("attempt", i+1), utils.LogAttr("specId", specId)) + // var response *pairingtypes.QueryProvidersResponse + // queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + // response, err = pairingQuerier.Providers(queryCtx, &pairingtypes.QueryProvidersRequest{ + // ChainID: specId, + // ShowFrozen: true, + // }) + + // fmt.Println("response", response) + + // cancel() + // if err != nil || response == nil { + // utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response)) + // time.Sleep(QuerySleepTime) + // continue + // } + + // for _, providerEntry := range response.StakeEntry { + // if len(providerAddresses) > 0 { + // found := false + // for _, address := range providerAddresses { + // if address == providerEntry.Address { + // found = true + // break + // } + // } + // if !found { + // continue + // } + // } + + // providerKey := LavaEntity{ + // Address: providerEntry.Address, + // SpecId: specId, + // } + + // apiInterfaces := chainIdToApiInterfaces[specId] + // // just to check if this is a provider we need to check we need one of the apiInterfaces + // if len(apiInterfaces) == 0 { + // utils.LavaFormatError("[!] invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId)) + // // shouldn't happen + // continue + // } + + // lookupKey := LavaEntity{ + // Address: providerEntry.Address, + // SpecId: specId, + // ApiInterface: apiInterfaces[0], + // } + + // mutex.Lock() // Lock before updating stakeEntries + + // if _, ok := healthResults.getProviderData(lookupKey); ok || getAllProviders { + // if providerEntry.StakeAppliedBlock > uint64(currentBlock) { + // healthResults.FreezeProvider(providerKey) + // } else { + // stakeEntries[providerKey] = providerEntry + // } + // } + + // mutex.Unlock() + // } + // break + // } + // if err != nil { + // utils.LavaFormatError("[!] Error after retries", err) + // select { + // case errCh <- err: + // utils.LavaFormatDebug("[+] Error sent to channel", utils.LogAttr("error", err)) + // default: + // utils.LavaFormatDebug("[!] Error channel full, error not sent", utils.LogAttr("error", err)) + // } + // } + // } + + // get provider stake entries for each spec or only for the ones given as arguments + // if lookupSpecsFromArg != nil { + // // fmt.Println("lookupSpecsFromArg", lookupSpecsFromArg) + // // fmt.Println("healthResults.getSpecs()", healthResults.getSpecs()) + // for specId := range healthResults.getSpecs() { + // // fmt.Println("specId", specId) + // utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed + // for _, arg := range lookupSpecsFromArg { + // // fmt.Println("arg", arg) + // if arg == strings.ToUpper(specId) { + // // fmt.Println("Match found for specId", specId) // Print when a match is found + // utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found + // processSpecProviders(specId) + // break + // } + // } + // } + // } else { + // // fmt.Println("healthResults.getSpecs()", healthResults.getSpecs()) + // for specId := range healthResults.getSpecs() { + // go processSpecProviders(specId) + // } + // } + + if lookupSpecsFromArg == nil { + wgspecs.Wait() + } + + // check for mismtaches in the pairings query and the arguments + // This flow can be triggered with the following command: + // lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510 + // lavap test health health_all_providers.yml --node https://testnet2-rpc.lavapro.xyz:443/ --single-provider-address lava@1ggcyk4wrlluh42dmak4s2c489ldkx8zaahde84 --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"FVM": ["jsonrpc"] }' --log_level debug --post-results-address http://localhost:6510 + + if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 { + for _, address := range providerAddresses { + for specId, apiInterfaces := range singleProviderSpecsInterfacesData { + for _, apiInterface := range apiInterfaces { + lookupKey := LavaEntity{ + Address: address, + SpecId: specId, + ApiInterface: apiInterface, + } + if _, ok := stakeEntries[lookupKey]; !ok { + healthResults.SetUnhealthyProvider(lookupKey, "No stake entry found for provider-spec-api pair") } } - mutex.Unlock() - } - return - } - if err != nil { - select { - case errCh <- err: - default: } } } - // get provider stake entries - for specId := range healthResults.getSpecs() { - go processSpecProviders(specId) - } - wgspecs.Wait() + if len(errCh) > 0 { return nil, utils.LavaFormatWarning("[-] processing providers entries", <-errCh) } + utils.LavaFormatDebug("[+] checking subscriptions") err = checkSubscriptions(ctx, clientCtx, subscriptionAddresses, healthResults) if err != nil { @@ -546,8 +712,12 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults relayerClient, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT) if err != nil { utils.LavaFormatDebug("failed connecting to provider endpoint", utils.LogAttr("error", err), utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT}) + } + + if err != nil { return 0, "", 0, err } + defer conn.Close() guid := uint64(rand.Int63()) relaySentTime := time.Now() @@ -587,17 +757,31 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults } return relayLatency, versions, probeResp.GetLatestBlock(), nil } + endpointServices := endpoint.GetSupportedServices() if len(endpointServices) == 0 { utils.LavaFormatWarning("endpoint has no supported services", nil, utils.Attribute{Key: "endpoint", Value: endpoint}) } + for _, endpointService := range endpointServices { providerKey := LavaEntity{ Address: providerEntry.Address, SpecId: providerEntry.Chain, ApiInterface: endpointService.ApiInterface, } + probeLatency, version, latestBlockFromProbe, err := checkOneProvider(endpoint, endpointService.ApiInterface, endpointService.Addon, providerEntry) + + utils.LavaFormatDebug("[+] checked provider", + utils.LogAttr("endpoint", endpoint), + utils.LogAttr("apiInterface", endpointService.ApiInterface), + utils.LogAttr("addon", endpointService.Addon), + utils.LogAttr("providerEntry", providerEntry), + utils.LogAttr("probeLatency", probeLatency), + utils.LogAttr("version", version), + utils.LogAttr("latestBlockFromProbe", latestBlockFromProbe), + utils.LogAttr("error", err)) + if err != nil { errMsg := prettifyProviderError(err) healthResults.SetUnhealthyProvider(providerKey, errMsg) diff --git a/protocol/monitoring/health_cmd.go b/protocol/monitoring/health_cmd.go index 97da35b1a0..24a72e98bc 100644 --- a/protocol/monitoring/health_cmd.go +++ b/protocol/monitoring/health_cmd.go @@ -24,34 +24,39 @@ import ( ) const ( - allowedBlockTimeDefaultLag = 30 * time.Second - intervalDefaultDuration = 0 * time.Second - defaultCUPercentageThreshold = 0.2 - defaultSubscriptionLeftDays = 10 - defaultMaxProviderLatency = 200 * time.Millisecond - defaultAlertSuppressionInterval = 6 * time.Hour - DefaultSuppressionCountThreshold = 3 - DisableAlertLogging = "disable-alert-logging" - maxProviderLatencyFlagName = "max-provider-latency" - subscriptionLeftTimeFlagName = "subscription-days-left-alert" - providerAddressesFlagName = "provider_addresses" - subscriptionAddressesFlagName = "subscription_addresses" - intervalFlagName = "interval" - consumerEndpointPropertyName = "consumer_endpoints" - referenceEndpointPropertyName = "reference_endpoints" - allowedBlockTimeLagFlagName = "allowed_time_lag" - queryRetriesFlagName = "query-retries" - alertingWebHookFlagName = "alert-webhook-url" - identifierFlagName = "identifier" - percentageCUFlagName = "cu-percent-threshold" - alertSuppressionIntervalFlagName = "alert-suppression-interval" - disableAlertSuppressionFlagName = "disable-alert-suppression" - SuppressionCountThresholdFlagName = "suppression-alert-count-threshold" - resultsPostAddressFlagName = "post-results-address" - AllProvidersFlagName = "all-providers" - AllProvidersMarker = "all" - ConsumerGrpcTLSFlagName = "consumer-grpc-tls" - allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing" + allowedBlockTimeDefaultLag = 30 * time.Second + intervalDefaultDuration = 0 * time.Second + defaultCUPercentageThreshold = 0.2 + defaultSubscriptionLeftDays = 10 + defaultMaxProviderLatency = 200 * time.Millisecond + defaultAlertSuppressionInterval = 6 * time.Hour + DefaultSuppressionCountThreshold = 3 + DisableAlertLogging = "disable-alert-logging" + maxProviderLatencyFlagName = "max-provider-latency" + subscriptionLeftTimeFlagName = "subscription-days-left-alert" + providerAddressesFlagName = "provider_addresses" + subscriptionAddressesFlagName = "subscription_addresses" + intervalFlagName = "interval" + consumerEndpointPropertyName = "consumer_endpoints" + referenceEndpointPropertyName = "reference_endpoints" + allowedBlockTimeLagFlagName = "allowed_time_lag" + queryRetriesFlagName = "query-retries" + alertingWebHookFlagName = "alert-webhook-url" + identifierFlagName = "identifier" + percentageCUFlagName = "cu-percent-threshold" + alertSuppressionIntervalFlagName = "alert-suppression-interval" + disableAlertSuppressionFlagName = "disable-alert-suppression" + SuppressionCountThresholdFlagName = "suppression-alert-count-threshold" + resultsPostAddressFlagName = "post-results-address" + resultsPostGUIDFlagName = "post-results-guid" + resultsPostSkipSepcFlagName = "post-results-skip-spec" + AllProvidersFlagName = "all-providers" + AllProvidersMarker = "all" + ConsumerGrpcTLSFlagName = "consumer-grpc-tls" + allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing" + singleProviderAddressFlagName = "single-provider-address" + singleProviderSpecsInterfacesDataFlagName = "single-provider-specs-interfaces-data" + runOnceAndExitFlagName = "run-once-and-exit" ) func ParseEndpoints(keyName string, viper_endpoints *viper.Viper) (endpoints []*HealthRPCEndpoint, err error) { @@ -142,8 +147,31 @@ reference_endpoints: prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) providerAddresses := viper.GetStringSlice(providerAddressesFlagName) allProviders := viper.GetBool(AllProvidersFlagName) - if allProviders { + singleProvider := viper.GetString(singleProviderAddressFlagName) + if singleProvider != "" { + providerAddresses = []string{singleProvider} + utils.LavaFormatInfo("Health probe provider addresses set to a single provider address", utils.Attribute{Key: "provider", Value: singleProvider}) + } else if allProviders { providerAddresses = []string{AllProvidersMarker} + utils.LavaFormatInfo("Health probe provider addresses set to all") + } + singleProviderSpecsInterfacesRawData := viper.GetString(singleProviderSpecsInterfacesDataFlagName) + var singleProviderSpecsInterfacesData map[string][]string + if singleProviderSpecsInterfacesRawData != "" { + if singleProvider == "" { + utils.LavaFormatFatal("single provider address and single provider specs interfaces data must be set together", nil) + } + err := json.Unmarshal([]byte(singleProviderSpecsInterfacesRawData), &singleProviderSpecsInterfacesData) + if err != nil { + utils.LavaFormatFatal("Failed to parse singleProviderSpecsInterfacesDataFlagName as JSON", err) + } + if len(singleProviderSpecsInterfacesData) == 0 { + utils.LavaFormatFatal("singleProviderSpecsInterfacesData is empty", nil) + } + } + runOnceAndExit := viper.GetBool(runOnceAndExitFlagName) + if runOnceAndExit { + utils.LavaFormatInfo("Run once and exit flag set") } subscriptionAddresses := viper.GetStringSlice(subscriptionAddressesFlagName) keyName := consumerEndpointPropertyName @@ -167,6 +195,8 @@ reference_endpoints: SuppressionCounterThreshold: viper.GetUint64(SuppressionCountThresholdFlagName), } resultsPostAddress := viper.GetString(resultsPostAddressFlagName) + resultsPostGUID := viper.GetString(resultsPostGUIDFlagName) + resultsPostSkipSepc := viper.GetBool(resultsPostSkipSepcFlagName) alerting := NewAlerting(alertingOptions) RunHealthCheck := func(ctx context.Context, @@ -178,24 +208,36 @@ reference_endpoints: prometheusListenAddr string, ) { utils.LavaFormatInfo("[+] starting health run") - healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr) + healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr, resultsPostGUID, singleProviderSpecsInterfacesData) if err != nil { utils.LavaFormatError("[-] invalid health run", err) + if runOnceAndExit { + os.Exit(0) + } healthMetrics.SetFailedRun(identifier) } else { if resultsPostAddress != "" { + if resultsPostSkipSepc { + healthResult.Specs = nil + } jsonData, err := json.Marshal(healthResult) - if err == nil { + if err != nil { + utils.LavaFormatError("[-] failed marshaling results", err) + } else { resp, err := http.Post(resultsPostAddress, "application/json", bytes.NewBuffer(jsonData)) if err != nil { utils.LavaFormatError("[-] failed posting health results", err, utils.LogAttr("address", resultsPostAddress)) + } else { + defer resp.Body.Close() } - defer resp.Body.Close() - } else { - utils.LavaFormatError("[-] failed marshaling results", err) } } + utils.LavaFormatInfo("[+] completed health run") + if runOnceAndExit { + os.Exit(0) + } + healthMetrics.SetLatestBlockData(identifier, healthResult.FormatForLatestBlock()) alerting.CheckHealthResults(healthResult) activeAlerts, unhealthy, healthy := alerting.ActiveAlerts() @@ -241,12 +283,19 @@ reference_endpoints: cmdTestHealth.Flags().String(alertingWebHookFlagName, "", "a url to post an alert to") cmdTestHealth.Flags().String(metrics.MetricsListenFlagName, metrics.DisabledFlagOption, "the address to expose prometheus metrics (such as localhost:7779)") cmdTestHealth.Flags().String(resultsPostAddressFlagName, "", "the address to send the raw results to") + cmdTestHealth.Flags().String(resultsPostGUIDFlagName, "", "a guid marker to add to the results posted to the results post address") + cmdTestHealth.Flags().Bool(resultsPostSkipSepcFlagName, false, "enable to send the results without the specs to the results post address") cmdTestHealth.Flags().Duration(intervalFlagName, intervalDefaultDuration, "the interval duration for the health check, (defaults to 0s) if 0 runs once") cmdTestHealth.Flags().Duration(allowedBlockTimeLagFlagName, allowedBlockTimeDefaultLag, "the amount of time one rpc can be behind the most advanced one") cmdTestHealth.Flags().Uint64Var(&QueryRetries, queryRetriesFlagName, QueryRetries, "set the amount of max queries to send every health run to consumers and references") cmdTestHealth.Flags().Bool(AllProvidersFlagName, false, "a flag to overwrite the provider addresses with all the currently staked providers") cmdTestHealth.Flags().Bool(ConsumerGrpcTLSFlagName, true, "use tls configuration for grpc connections to your consumer") cmdTestHealth.Flags().Bool(allowInsecureConsumerDialingFlagName, false, "used to test grpc, to allow insecure (self signed cert).") + cmdTestHealth.Flags().String(singleProviderAddressFlagName, "", "single provider address in bach32 to override config settings") + cmdTestHealth.Flags().String(singleProviderSpecsInterfacesDataFlagName, "", "a json of spec:[interfaces...] to make the single provider query faster") + + cmdTestHealth.Flags().Bool(runOnceAndExitFlagName, false, "exit after first run.") + viper.BindPFlag(queryRetriesFlagName, cmdTestHealth.Flags().Lookup(queryRetriesFlagName)) // bind the flag flags.AddQueryFlagsToCmd(cmdTestHealth) common.AddRollingLogConfig(cmdTestHealth) diff --git a/protocol/monitoring/health_results.go b/protocol/monitoring/health_results.go index 22bd452da2..f85ed23404 100644 --- a/protocol/monitoring/health_results.go +++ b/protocol/monitoring/health_results.go @@ -18,6 +18,8 @@ type HealthResults struct { UnhealthyProviders map[LavaEntity]string `json:"unhealthyProviders,omitempty"` UnhealthyConsumers map[LavaEntity]string `json:"unhealthyConsumers,omitempty"` Specs map[string]*spectypes.Spec `json:"specs,omitempty"` + ResultsPostGUID string `json:"resultsGUID,omitempty"` + ProviderAddresses []string `json:"providerAddresses,omitempty"` Lock sync.RWMutex `json:"-"` }