From d1c657c01f96d3e6f718c85bb7a08281ed4fa0c7 Mon Sep 17 00:00:00 2001 From: pschork <354473+pschork@users.noreply.github.com> Date: Tue, 14 May 2024 15:54:41 -0700 Subject: [PATCH] Add reachability check loop to expose reachability failures in node logs (#556) --- node/Makefile | 25 ++++++++++++-- node/config.go | 18 +++++++--- node/flags/flags.go | 17 ++++++++++ node/metrics.go | 10 ++++++ node/node.go | 82 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 7 deletions(-) diff --git a/node/Makefile b/node/Makefile index 604f2a4dca..5b91ed1517 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,7 +1,26 @@ -clean: - rm -rf ./bin +GITCOMMIT := $(shell git rev-parse --short HEAD) +GITDATE := $(shell git log -1 --format=%cd --date=unix) + +# GitVersion provides the semantic versioning for the project. +SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) +ifeq ($(SEMVER), ) +SEMVER = "0.0.0" # Fallback if docker is not installed or gitversion fails +endif build: clean - # cd .. && make protoc go mod tidy go build -o ./bin/node ./cmd + +clean: + rm -rf ./bin + +docker: docker-node docker-plugin + +docker-node: + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile + +docker-plugin: + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile + +semver: + echo "${SEMVER}" diff --git a/node/config.go b/node/config.go index ca5999de13..a0104e8c56 100644 --- a/node/config.go +++ b/node/config.go @@ -21,8 +21,9 @@ import ( const ( // Min number of seconds for the ExpirationPollIntervalSecFlag. - minExpirationPollIntervalSec = 3 - AppName = "da-node" + minExpirationPollIntervalSec = 3 + minReachabilityPollIntervalSec = 10 + AppName = "da-node" ) var ( @@ -65,9 +66,11 @@ type Config struct { PubIPProvider string PubIPCheckInterval time.Duration ChurnerUrl string + DataApiUrl string NumBatchValidators int ClientIPHeader string UseSecureGrpc bool + ReachabilityPollIntervalSec uint64 EthClientConfig geth.EthClientConfig LoggerConfig common.LoggerConfig @@ -96,8 +99,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) { } expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name) - if expirationPollIntervalSec <= minExpirationPollIntervalSec { - return nil, errors.New("the expiration-poll-interval flag must be greater than 3 seconds") + if expirationPollIntervalSec < minExpirationPollIntervalSec { + return nil, fmt.Errorf("the expiration-poll-interval flag must be >= %d seconds", minExpirationPollIntervalSec) + } + + reachabilityPollIntervalSec := ctx.GlobalUint64(flags.ReachabilityPollIntervalSecFlag.Name) + if reachabilityPollIntervalSec != 0 && reachabilityPollIntervalSec < minReachabilityPollIntervalSec { + return nil, fmt.Errorf("the reachability-poll-interval flag must be >= %d seconds or 0 to disable", minReachabilityPollIntervalSec) } testMode := ctx.GlobalBool(flags.EnableTestModeFlag.Name) @@ -172,6 +180,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { Timeout: timeout, RegisterNodeAtStart: registerNodeAtStart, ExpirationPollIntervalSec: expirationPollIntervalSec, + ReachabilityPollIntervalSec: reachabilityPollIntervalSec, EnableTestMode: testMode, OverrideBlockStaleMeasure: ctx.GlobalInt64(flags.OverrideBlockStaleMeasureFlag.Name), OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name), @@ -186,6 +195,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { PubIPProvider: ctx.GlobalString(flags.PubIPProviderFlag.Name), PubIPCheckInterval: pubIPCheckInterval, ChurnerUrl: ctx.GlobalString(flags.ChurnerUrlFlag.Name), + DataApiUrl: ctx.GlobalString(flags.DataApiUrlFlag.Name), NumBatchValidators: ctx.GlobalInt(flags.NumBatchValidatorsFlag.Name), ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name), UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name), diff --git a/node/flags/flags.go b/node/flags/flags.go index f40ac22b5c..c27351a15d 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -181,6 +181,21 @@ var ( Value: "180", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "EXPIRATION_POLL_INTERVAL"), } + ReachabilityPollIntervalSecFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "reachability-poll-interval"), + Usage: "How often (in second) to check if node is reachabile from Disperser", + Required: false, + Value: "60", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "REACHABILITY_POLL_INTERVAL"), + } + // Optional DataAPI URL. If not set, reachability checks are disabled + DataApiUrlFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "dataapi-url"), + Usage: "URL of the DataAPI", + Required: false, + Value: "", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DATAAPI_URL"), + } // NumBatchValidators is the maximum number of parallel workers used to // validate a batch (defaults to 128). NumBatchValidatorsFlag = cli.IntFlag{ @@ -263,6 +278,7 @@ var requiredFlags = []cli.Flag{ var optionalFlags = []cli.Flag{ RegisterAtNodeStartFlag, ExpirationPollIntervalSecFlag, + ReachabilityPollIntervalSecFlag, EnableTestModeFlag, OverrideBlockStaleMeasureFlag, OverrideStoreDurationBlocksFlag, @@ -274,6 +290,7 @@ var optionalFlags = []cli.Flag{ ChurnerUseSecureGRPC, EcdsaKeyFileFlag, EcdsaKeyPasswordFlag, + DataApiUrlFlag, } func init() { diff --git a/node/metrics.go b/node/metrics.go index 8435948dba..212f5ddab3 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -43,6 +43,8 @@ type Metrics struct { AccuSocketUpdates prometheus.Counter // avs node spec eigen_ metrics: https://eigen.nethermind.io/docs/spec/metrics/metrics-prom-spec EigenMetrics eigenmetrics.Metrics + // Reachability gauge to monitoring the reachability of the node's retrieval/dispersal sockets + ReachabilityGauge *prometheus.GaugeVec registry *prometheus.Registry // socketAddr is the address at which the metrics server will be listening. @@ -129,6 +131,14 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log Help: "the total number of node's socket address updates", }, ), + ReachabilityGauge: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "reachability_status", + Help: "the reachability status of the nodes retrievel/dispersal sockets", + }, + []string{"service"}, + ), EigenMetrics: eigenMetrics, logger: logger.With("component", "NodeMetrics"), registry: reg, diff --git a/node/node.go b/node/node.go index ab56593fa2..a9ecd88726 100644 --- a/node/node.go +++ b/node/node.go @@ -3,10 +3,14 @@ package node import ( "context" "encoding/hex" + "encoding/json" "errors" "fmt" + "io" "math" "math/big" + "net/http" + "net/url" "os" "sync" "time" @@ -185,6 +189,7 @@ func (n *Node) Start(ctx context.Context) error { } go n.expireLoop() + go n.checkNodeReachability() // Build the socket based on the hostname/IP provided in the CLI socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort)) @@ -451,3 +456,80 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) { } } } + +// OperatorReachabilityResponse is the response object for the reachability check +type OperatorReachabilityResponse struct { + OperatorID string `json:"operator_id"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` +} + +func (n *Node) checkNodeReachability() { + if n.Config.ReachabilityPollIntervalSec == 0 { + n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0") + return + } + + if n.Config.DataApiUrl == "" { + n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env") + return + } + + checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) + if err != nil { + n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String()) + return + } + + n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node") + ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second) + defer ticker.Stop() + + for { + <-ticker.C + + n.Logger.Debug("Calling reachability check", "url", checkUrl.String()) + + resp, err := http.Get(checkUrl.String()) + if err != nil { + n.Logger.Error("Reachability check request failed", err) + continue + } else if resp.StatusCode == 404 { + n.Logger.Error("Reachability check failed - operator id not found", "status", resp.StatusCode, "operator_id", n.Config.ID.Hex()) + continue + } else if resp.StatusCode != 200 { + n.Logger.Error("Reachability check request failed", "status", resp.StatusCode) + continue + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + n.Logger.Error("Failed to read reachability check response", err, "data", resp.Body) + continue + } + + var responseObject OperatorReachabilityResponse + err = json.Unmarshal(data, &responseObject) + if err != nil { + n.Logger.Error("Reachability check failed to unmarshal json response", err) + continue + } + + if responseObject.DispersalOnline { + n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(1.0) + } else { + n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(0.0) + } + if responseObject.RetrievalOnline { + n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(1.0) + } else { + n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(0.0) + } + } +}