Skip to content

Commit

Permalink
final changes to optimize network/process time
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecot committed Jun 19, 2024
1 parent 9d45fa5 commit 09d5ef7
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 44 deletions.
6 changes: 5 additions & 1 deletion protocol/monitoring/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ func (al *Alerting) ProvidersAlerts(healthResults *HealthResults) {
latestBlock := healthResults.LatestBlocks[specId]
if latestBlock > data.Block {
gap := latestBlock - data.Block
timeGap := time.Duration(gap*healthResults.Specs[specId].AverageBlockTime) * time.Millisecond
specHealthResult, ok := healthResults.Specs[specId]
if !ok {
utils.LavaFormatFatal("Invalid specid - missing in healthResults", nil, utils.Attribute{Key: "specId", Value: specId})
}
timeGap := time.Duration(gap*specHealthResult.AverageBlockTime) * time.Millisecond
if timeGap > al.allowedTimeGapVsReference {
attrs = append(attrs, AlertAttribute{entity: provider, data: fmt.Sprintf("block gap: %s/%s", utils.StrValue(data.Block), utils.StrValue(latestBlock))})
}
Expand Down
114 changes: 109 additions & 5 deletions protocol/monitoring/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func RunHealth(ctx context.Context,
consumerEndpoints []*HealthRPCEndpoint,
referenceEndpoints []*HealthRPCEndpoint,
prometheusListenAddr string,
resultsPostGUID string,
singleProviderSpecsInterfacesData map[string][]string,
) (*HealthResults, error) {
specQuerier := spectypes.NewQueryClient(clientCtx)
healthResults := &HealthResults{
Expand All @@ -89,6 +91,8 @@ func RunHealth(ctx context.Context,
UnhealthyProviders: map[LavaEntity]string{},
UnhealthyConsumers: map[LavaEntity]string{},
Specs: map[string]*spectypes.Spec{},
ResultsPostGUID: resultsPostGUID,
ProviderAddresses: providerAddresses,
}
currentBlock := int64(0)
for i := 0; i < BasicQueryRetries; i++ {
Expand Down Expand Up @@ -125,6 +129,24 @@ func RunHealth(ctx context.Context,
getAllProviders = true
}

// we can limit the specs we are checking if those where given as arguments
var lookupSpecsFromArg []string

if singleProviderSpecsInterfacesData != nil {
lookupSpecsFromArg = make([]string, 0, len(singleProviderSpecsInterfacesData))
for k := range singleProviderSpecsInterfacesData {
k = strings.ToUpper(strings.TrimSpace(k))
if len(k) > 2 {
lookupSpecsFromArg = append(lookupSpecsFromArg, k)
}
}
if len(lookupSpecsFromArg) == 0 {
lookupSpecsFromArg = nil
}
} else {
lookupSpecsFromArg = nil
}

errCh := make(chan error, 1)

// get a list of all necessary specs for the test
Expand All @@ -150,6 +172,19 @@ func RunHealth(ctx context.Context,
for _, specInfo := range specsResp.ChainInfoList {
healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID})
}
} else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 {
for _, providerAddress := range providerAddresses {
for spec, apiInterfaces := range singleProviderSpecsInterfacesData {
healthResults.setSpec(&spectypes.Spec{Index: spec})
for _, apiInterface := range apiInterfaces {
healthResults.SetProviderData(LavaEntity{
Address: providerAddress,
SpecId: spec,
ApiInterface: apiInterface,
}, ReplyData{})
}
}
}
} else {
var wgproviders sync.WaitGroup
wgproviders.Add(len(providerAddresses))
Expand All @@ -163,11 +198,13 @@ func RunHealth(ctx context.Context,
Delegator: providerAddress,
WithPending: false,
})

cancel()
if err != nil || response == nil {
time.Sleep(QuerySleepTime)
continue
}

delegations := response.GetDelegations()
for _, delegation := range delegations {
if delegation.Provider == providerAddress {
Expand Down Expand Up @@ -267,10 +304,25 @@ func RunHealth(ctx context.Context,
}

for _, providerEntry := range response.StakeEntry {

Check failure on line 306 in protocol/monitoring/health.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

if len(providerAddresses) > 0 {
found := false
for _, address := range providerAddresses {
if address == providerEntry.Address {
found = true
break
}
}
if !found {
continue
}
}

providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
}

apiInterfaces := chainIdToApiInterfaces[specId]
// just to check if this is a provider we need to check we need one of the apiInterfaces
if len(apiInterfaces) == 0 {
Expand Down Expand Up @@ -303,11 +355,44 @@ func RunHealth(ctx context.Context,
}
}
}
// get provider stake entries
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
// get provider stake entries for each spec or only for the ones given as arguments
if lookupSpecsFromArg != nil {
for specId := range healthResults.getSpecs() {
for _, arg := range lookupSpecsFromArg {
if arg == strings.ToUpper(specId) {
processSpecProviders(specId)
break
}
}
}
} else {
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
}
}

wgspecs.Wait()

// check for mismtaches in the pairings query and the arguments
// This flow can be triggered with the following command:
// lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510
if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 {
for _, address := range providerAddresses {
for specId, apiInterfaces := range singleProviderSpecsInterfacesData {
for _, apiInterface := range apiInterfaces {
lookupKey := LavaEntity{
Address: address,
SpecId: specId,
ApiInterface: apiInterface,
}
if _, ok := stakeEntries[lookupKey]; !ok {
healthResults.SetUnhealthyProvider(lookupKey, "no pairings found")
}
}
}
}
}

if len(errCh) > 0 {
return nil, utils.LavaFormatWarning("[-] processing providers entries", <-errCh)
}
Expand Down Expand Up @@ -541,13 +626,26 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
checkProviderEndpoints := func(providerEntry epochstoragetypes.StakeEntry) {
defer wg.Done()
for _, endpoint := range providerEntry.Endpoints {

Check failure on line 628 in protocol/monitoring/health.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

checkOneProvider := func(endpoint epochstoragetypes.Endpoint, apiInterface string, addon string, providerEntry epochstoragetypes.StakeEntry) (time.Duration, string, int64, error) {
cswp := lavasession.ConsumerSessionsWithProvider{}
relayerClientPt, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err != nil {

var relayerClientPt *pairingtypes.RelayerClient
var conn *grpc.ClientConn
var err error

for i := 0; i < 3; i++ {
relayerClientPt, conn, err = cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err == nil {
break
}
utils.LavaFormatDebug("failed connecting to provider endpoint", utils.LogAttr("error", err), utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
}

if err != nil {
return 0, "", 0, err
}

defer conn.Close()
relayerClient := *relayerClientPt
guid := uint64(rand.Int63())
Expand Down Expand Up @@ -588,17 +686,23 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
}
return relayLatency, versions, probeResp.GetLatestBlock(), nil
}

endpointServices := endpoint.GetSupportedServices()
if len(endpointServices) == 0 {
utils.LavaFormatWarning("endpoint has no supported services", nil, utils.Attribute{Key: "endpoint", Value: endpoint})
}

for _, endpointService := range endpointServices {
providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: providerEntry.Chain,
ApiInterface: endpointService.ApiInterface,
}

probeLatency, version, latestBlockFromProbe, err := checkOneProvider(endpoint, endpointService.ApiInterface, endpointService.Addon, providerEntry)

utils.LavaFormatDebug("[+] checked provider", utils.LogAttr("endpoint", endpoint), utils.LogAttr("apiInterface", endpointService.ApiInterface), utils.LogAttr("addon", endpointService.Addon), utils.LogAttr("providerEntry", providerEntry))

if err != nil {
errMsg := prettifyProviderError(err)
healthResults.SetUnhealthyProvider(providerKey, errMsg)
Expand Down
108 changes: 70 additions & 38 deletions protocol/monitoring/health_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,39 @@ import (
)

const (
allowedBlockTimeDefaultLag = 30 * time.Second
intervalDefaultDuration = 0 * time.Second
defaultCUPercentageThreshold = 0.2
defaultSubscriptionLeftDays = 10
defaultMaxProviderLatency = 200 * time.Millisecond
defaultAlertSuppressionInterval = 6 * time.Hour
DefaultSuppressionCountThreshold = 3
DisableAlertLogging = "disable-alert-logging"
maxProviderLatencyFlagName = "max-provider-latency"
subscriptionLeftTimeFlagName = "subscription-days-left-alert"
providerAddressesFlagName = "provider_addresses"
subscriptionAddressesFlagName = "subscription_addresses"
intervalFlagName = "interval"
consumerEndpointPropertyName = "consumer_endpoints"
referenceEndpointPropertyName = "reference_endpoints"
allowedBlockTimeLagFlagName = "allowed_time_lag"
queryRetriesFlagName = "query-retries"
alertingWebHookFlagName = "alert-webhook-url"
identifierFlagName = "identifier"
percentageCUFlagName = "cu-percent-threshold"
alertSuppressionIntervalFlagName = "alert-suppression-interval"
disableAlertSuppressionFlagName = "disable-alert-suppression"
SuppressionCountThresholdFlagName = "suppression-alert-count-threshold"
resultsPostAddressFlagName = "post-results-address"
AllProvidersFlagName = "all-providers"
AllProvidersMarker = "all"
ConsumerGrpcTLSFlagName = "consumer-grpc-tls"
allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing"
singleProviderAddressFlagName = "single-provider-address"
runOnceAndExitFlagName = "run-once-and-exit"
allowedBlockTimeDefaultLag = 30 * time.Second
intervalDefaultDuration = 0 * time.Second
defaultCUPercentageThreshold = 0.2
defaultSubscriptionLeftDays = 10
defaultMaxProviderLatency = 200 * time.Millisecond
defaultAlertSuppressionInterval = 6 * time.Hour
DefaultSuppressionCountThreshold = 3
DisableAlertLogging = "disable-alert-logging"
maxProviderLatencyFlagName = "max-provider-latency"
subscriptionLeftTimeFlagName = "subscription-days-left-alert"
providerAddressesFlagName = "provider_addresses"
subscriptionAddressesFlagName = "subscription_addresses"
intervalFlagName = "interval"
consumerEndpointPropertyName = "consumer_endpoints"
referenceEndpointPropertyName = "reference_endpoints"
allowedBlockTimeLagFlagName = "allowed_time_lag"
queryRetriesFlagName = "query-retries"
alertingWebHookFlagName = "alert-webhook-url"
identifierFlagName = "identifier"
percentageCUFlagName = "cu-percent-threshold"
alertSuppressionIntervalFlagName = "alert-suppression-interval"
disableAlertSuppressionFlagName = "disable-alert-suppression"
SuppressionCountThresholdFlagName = "suppression-alert-count-threshold"
resultsPostAddressFlagName = "post-results-address"
resultsPostGUIDFlagName = "post-results-guid"
resultsPostSkipSepcFlagName = "post-results-skip-spec"
AllProvidersFlagName = "all-providers"
AllProvidersMarker = "all"
ConsumerGrpcTLSFlagName = "consumer-grpc-tls"
allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing"
singleProviderAddressFlagName = "single-provider-address"
singleProviderSpecsInterfacesDataFlagName = "single-provider-specs-interfaces-data"
runOnceAndExitFlagName = "run-once-and-exit"
)

func ParseEndpoints(keyName string, viper_endpoints *viper.Viper) (endpoints []*HealthRPCEndpoint, err error) {
Expand Down Expand Up @@ -152,6 +155,20 @@ reference_endpoints:
providerAddresses = []string{AllProvidersMarker}
utils.LavaFormatInfo("Health probe provider addresses set to all")
}
singleProviderSpecsInterfacesRawData := viper.GetString(singleProviderSpecsInterfacesDataFlagName)
var singleProviderSpecsInterfacesData map[string][]string
if singleProviderSpecsInterfacesRawData != "" {
if singleProvider == "" {
utils.LavaFormatFatal("single provider address and single provider specs interfaces data must be set together", nil)
}
err := json.Unmarshal([]byte(singleProviderSpecsInterfacesRawData), &singleProviderSpecsInterfacesData)
if err != nil {
utils.LavaFormatFatal("Failed to parse singleProviderSpecsInterfacesDataFlagName as JSON", err)
}
if len(singleProviderSpecsInterfacesData) == 0 {
utils.LavaFormatFatal("singleProviderSpecsInterfacesData is empty", nil)
}
}
runOnceAndExit := viper.GetBool(runOnceAndExitFlagName)
if runOnceAndExit {
utils.LavaFormatInfo("Run once and exit flag set")
Expand All @@ -178,6 +195,8 @@ reference_endpoints:
SuppressionCounterThreshold: viper.GetUint64(SuppressionCountThresholdFlagName),
}
resultsPostAddress := viper.GetString(resultsPostAddressFlagName)
resultsPostGUID := viper.GetString(resultsPostGUIDFlagName)
resultsPostSkipSepc := viper.GetBool(resultsPostSkipSepcFlagName)

alerting := NewAlerting(alertingOptions)
RunHealthCheck := func(ctx context.Context,
Expand All @@ -189,33 +208,42 @@ reference_endpoints:
prometheusListenAddr string,
) {
utils.LavaFormatInfo("[+] starting health run")
healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr)
healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr, resultsPostGUID, singleProviderSpecsInterfacesData)
if err != nil {
utils.LavaFormatError("[-] invalid health run", err)
if runOnceAndExit {
os.Exit(0)
}
healthMetrics.SetFailedRun(identifier)
} else {
if resultsPostAddress != "" {
if resultsPostSkipSepc {
healthResult.Specs = nil
}
jsonData, err := json.Marshal(healthResult)
if err == nil {
if err != nil {
utils.LavaFormatError("[-] failed marshaling results", err)
} else {
resp, err := http.Post(resultsPostAddress, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
utils.LavaFormatError("[-] failed posting health results", err, utils.LogAttr("address", resultsPostAddress))
} else {
defer resp.Body.Close()
}
defer resp.Body.Close()
} else {
utils.LavaFormatError("[-] failed marshaling results", err)
}
}

utils.LavaFormatInfo("[+] completed health run")
if runOnceAndExit {
os.Exit(0)
}

healthMetrics.SetLatestBlockData(identifier, healthResult.FormatForLatestBlock())
alerting.CheckHealthResults(healthResult)
activeAlerts, unhealthy, healthy := alerting.ActiveAlerts()
healthMetrics.SetSuccess(identifier)
healthMetrics.SetAlertResults(identifier, activeAlerts, unhealthy, healthy)
}
if runOnceAndExit {
os.Exit(0)
}
}

RunHealthCheck(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr)
Expand Down Expand Up @@ -255,13 +283,17 @@ reference_endpoints:
cmdTestHealth.Flags().String(alertingWebHookFlagName, "", "a url to post an alert to")
cmdTestHealth.Flags().String(metrics.MetricsListenFlagName, metrics.DisabledFlagOption, "the address to expose prometheus metrics (such as localhost:7779)")
cmdTestHealth.Flags().String(resultsPostAddressFlagName, "", "the address to send the raw results to")
cmdTestHealth.Flags().String(resultsPostGUIDFlagName, "", "a guid marker to add to the results posted to the results post address")
cmdTestHealth.Flags().Bool(resultsPostSkipSepcFlagName, false, "enable to send the results without the specs to the results post address")
cmdTestHealth.Flags().Duration(intervalFlagName, intervalDefaultDuration, "the interval duration for the health check, (defaults to 0s) if 0 runs once")
cmdTestHealth.Flags().Duration(allowedBlockTimeLagFlagName, allowedBlockTimeDefaultLag, "the amount of time one rpc can be behind the most advanced one")
cmdTestHealth.Flags().Uint64Var(&QueryRetries, queryRetriesFlagName, QueryRetries, "set the amount of max queries to send every health run to consumers and references")
cmdTestHealth.Flags().Bool(AllProvidersFlagName, false, "a flag to overwrite the provider addresses with all the currently staked providers")
cmdTestHealth.Flags().Bool(ConsumerGrpcTLSFlagName, true, "use tls configuration for grpc connections to your consumer")
cmdTestHealth.Flags().Bool(allowInsecureConsumerDialingFlagName, false, "used to test grpc, to allow insecure (self signed cert).")
cmdTestHealth.Flags().String(singleProviderAddressFlagName, "", "single provider address in bach32 to override config settings")
cmdTestHealth.Flags().String(singleProviderSpecsInterfacesDataFlagName, "", "a json of spec:[interfaces...] to make the single provider query faster")

cmdTestHealth.Flags().Bool(runOnceAndExitFlagName, false, "exit after first run.")

viper.BindPFlag(queryRetriesFlagName, cmdTestHealth.Flags().Lookup(queryRetriesFlagName)) // bind the flag
Expand Down
2 changes: 2 additions & 0 deletions protocol/monitoring/health_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type HealthResults struct {
UnhealthyProviders map[LavaEntity]string `json:"unhealthyProviders,omitempty"`
UnhealthyConsumers map[LavaEntity]string `json:"unhealthyConsumers,omitempty"`
Specs map[string]*spectypes.Spec `json:"specs,omitempty"`
ResultsPostGUID string `json:"resultsGUID,omitempty"`
ProviderAddresses []string `json:"providerAddresses,omitempty"`
Lock sync.RWMutex `json:"-"`
}

Expand Down

0 comments on commit 09d5ef7

Please sign in to comment.