From f9ebf2c060bef520395aa47b70517307971d002b Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 10 May 2024 17:25:00 -0700 Subject: [PATCH 1/7] Add reachability check loop to node to expose reachability failures Default check interval in 5min Minimum check interval is 10sec For backwards compatibility, the reachability check loop is disabled if 1. NODE_DATA_URL is undefined (ie operator did not update their .env) 1. NODE_REACHABILITY_POLL_INTERVAL is set to 0 --- node/Makefile | 15 +++++++--- node/config.go | 18 +++++++++--- node/flags/flags.go | 17 +++++++++++ node/node.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/node/Makefile b/node/Makefile index 604f2a4dca..e801a1c296 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,7 +1,14 @@ -clean: - rm -rf ./bin - build: clean - # cd .. && make protoc go mod tidy go build -o ./bin/node ./cmd + +clean: + rm -rf ./bin + +docker: docker-node docker-plugin + +docker-node: + cd ../ && docker build . -t opr-node -f node/cmd/Dockerfile + +docker-plugin: + cd ../ && docker build . -t opr-nodeplugin -f node/plugin/cmd/Dockerfile diff --git a/node/config.go b/node/config.go index ca5999de13..a0104e8c56 100644 --- a/node/config.go +++ b/node/config.go @@ -21,8 +21,9 @@ import ( const ( // Min number of seconds for the ExpirationPollIntervalSecFlag. - minExpirationPollIntervalSec = 3 - AppName = "da-node" + minExpirationPollIntervalSec = 3 + minReachabilityPollIntervalSec = 10 + AppName = "da-node" ) var ( @@ -65,9 +66,11 @@ type Config struct { PubIPProvider string PubIPCheckInterval time.Duration ChurnerUrl string + DataApiUrl string NumBatchValidators int ClientIPHeader string UseSecureGrpc bool + ReachabilityPollIntervalSec uint64 EthClientConfig geth.EthClientConfig LoggerConfig common.LoggerConfig @@ -96,8 +99,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) { } expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name) - if expirationPollIntervalSec <= minExpirationPollIntervalSec { - return nil, errors.New("the expiration-poll-interval flag must be greater than 3 seconds") + if expirationPollIntervalSec < minExpirationPollIntervalSec { + return nil, fmt.Errorf("the expiration-poll-interval flag must be >= %d seconds", minExpirationPollIntervalSec) + } + + reachabilityPollIntervalSec := ctx.GlobalUint64(flags.ReachabilityPollIntervalSecFlag.Name) + if reachabilityPollIntervalSec != 0 && reachabilityPollIntervalSec < minReachabilityPollIntervalSec { + return nil, fmt.Errorf("the reachability-poll-interval flag must be >= %d seconds or 0 to disable", minReachabilityPollIntervalSec) } testMode := ctx.GlobalBool(flags.EnableTestModeFlag.Name) @@ -172,6 +180,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { Timeout: timeout, RegisterNodeAtStart: registerNodeAtStart, ExpirationPollIntervalSec: expirationPollIntervalSec, + ReachabilityPollIntervalSec: reachabilityPollIntervalSec, EnableTestMode: testMode, OverrideBlockStaleMeasure: ctx.GlobalInt64(flags.OverrideBlockStaleMeasureFlag.Name), OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name), @@ -186,6 +195,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { PubIPProvider: ctx.GlobalString(flags.PubIPProviderFlag.Name), PubIPCheckInterval: pubIPCheckInterval, ChurnerUrl: ctx.GlobalString(flags.ChurnerUrlFlag.Name), + DataApiUrl: ctx.GlobalString(flags.DataApiUrlFlag.Name), NumBatchValidators: ctx.GlobalInt(flags.NumBatchValidatorsFlag.Name), ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name), UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name), diff --git a/node/flags/flags.go b/node/flags/flags.go index f40ac22b5c..13a9b47563 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -181,6 +181,21 @@ var ( Value: "180", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "EXPIRATION_POLL_INTERVAL"), } + ReachabilityPollIntervalSecFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "reachability-poll-interval"), + Usage: "How often (in second) to check if node is reachabile from DA backend", + Required: false, + Value: "300", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "REACHABILITY_POLL_INTERVAL"), + } + // Optional DataAPI URL. If not set, reachability checks are disabled + DataApiUrlFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "dataapi-url"), + Usage: "URL of the DataAPI", + Required: false, + Value: "", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DATAAPI_URL"), + } // NumBatchValidators is the maximum number of parallel workers used to // validate a batch (defaults to 128). NumBatchValidatorsFlag = cli.IntFlag{ @@ -263,6 +278,7 @@ var requiredFlags = []cli.Flag{ var optionalFlags = []cli.Flag{ RegisterAtNodeStartFlag, ExpirationPollIntervalSecFlag, + ReachabilityPollIntervalSecFlag, EnableTestModeFlag, OverrideBlockStaleMeasureFlag, OverrideStoreDurationBlocksFlag, @@ -274,6 +290,7 @@ var optionalFlags = []cli.Flag{ ChurnerUseSecureGRPC, EcdsaKeyFileFlag, EcdsaKeyPasswordFlag, + DataApiUrlFlag, } func init() { diff --git a/node/node.go b/node/node.go index 3c1e37c25e..393408483e 100644 --- a/node/node.go +++ b/node/node.go @@ -3,10 +3,14 @@ package node import ( "context" "encoding/hex" + "encoding/json" "errors" "fmt" + "io" "math" "math/big" + "net/http" + "net/url" "os" "sync" "time" @@ -185,6 +189,7 @@ func (n *Node) Start(ctx context.Context) error { } go n.expireLoop() + go n.checkNodeReachability() // Build the socket based on the hostname/IP provided in the CLI socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort)) @@ -453,3 +458,70 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) { } } } + +type OperatorReachabilityResponse struct { + OperatorId string `json:"operator_id"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` +} + +func (n *Node) checkNodeReachability() { + if n.Config.ReachabilityPollIntervalSec == 0 { + n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0") + return + } + if n.Config.DataApiUrl == "" { + n.Logger.Error("Node reachability checks disabled!!! DataAPI URL is not configured") + return + } + + checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) + if err != nil { + n.Logger.Error("Node reachability checks disabled!!! Failed to parse reachability check url", err) + return + } + + n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node") + ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second) + defer ticker.Stop() + + for { + <-ticker.C + + n.Logger.Info("Calling reachability check", "url", checkUrl.String()) + + resp, err := http.Get(checkUrl.String()) + if err != nil { + n.Logger.Error("Reachability check failed", err) + continue + } else if resp.StatusCode == 404 { + n.Logger.Error("Reachability check failed - operator id not found", "status", resp.StatusCode, "operator_id", n.Config.ID.Hex()) + continue + } else if resp.StatusCode != 200 { + n.Logger.Error("Reachability check failed", "status", resp.StatusCode) + continue + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + n.Logger.Error("Failed to read reachability check response", err, "data", resp.Body) + continue + } + + var responseObject OperatorReachabilityResponse + json.Unmarshal(data, &responseObject) + if responseObject.DispersalOnline { + n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket) + } else { + n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket) + } + + if responseObject.RetrievalOnline { + n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket) + } else { + n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket) + } + } +} From bacebc770de7ee4d2b945682281695292add2ab5 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 10 May 2024 18:09:54 -0700 Subject: [PATCH 2/7] Lint --- node/node.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/node/node.go b/node/node.go index 393408483e..da70973f2c 100644 --- a/node/node.go +++ b/node/node.go @@ -459,8 +459,9 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) { } } +// OperatorReachabilityResponse is the response object for the reachability check type OperatorReachabilityResponse struct { - OperatorId string `json:"operator_id"` + OperatorID string `json:"operator_id"` DispersalSocket string `json:"dispersal_socket"` RetrievalSocket string `json:"retrieval_socket"` DispersalOnline bool `json:"dispersal_online"` @@ -511,13 +512,17 @@ func (n *Node) checkNodeReachability() { } var responseObject OperatorReachabilityResponse - json.Unmarshal(data, &responseObject) + err = json.Unmarshal(data, &responseObject) + if err != nil { + n.Logger.Error("Reachability check failed to unmarshal json response", err) + continue + } + if responseObject.DispersalOnline { n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket) } else { n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket) } - if responseObject.RetrievalOnline { n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket) } else { From dde0f4b0826dae3795f0cb7665712f9a85ce36ff Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Fri, 10 May 2024 19:23:13 -0700 Subject: [PATCH 3/7] Add semver to node makefile --- node/Makefile | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/node/Makefile b/node/Makefile index e801a1c296..0bd4580866 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,3 +1,12 @@ +GITCOMMIT := $(shell git rev-parse --short HEAD) +GITDATE := $(shell git log -1 --format=%cd --date=unix) + +# GitVersion provides the semantic versioning for the project +SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) +ifeq ($(SEMVER), ) +SEMVER = "0.0.0" +endif + build: clean go mod tidy go build -o ./bin/node ./cmd @@ -8,7 +17,10 @@ clean: docker: docker-node docker-plugin docker-node: - cd ../ && docker build . -t opr-node -f node/cmd/Dockerfile + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile + +docker-nodeplugin: + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile -docker-plugin: - cd ../ && docker build . -t opr-nodeplugin -f node/plugin/cmd/Dockerfile +semver: + echo "${SEMVER}" From 5ea2fc51a552aa85d9564ee81eaf4f8379d8e9f7 Mon Sep 17 00:00:00 2001 From: Patrick Schork Date: Sat, 11 May 2024 20:59:10 -0700 Subject: [PATCH 4/7] Refactor error handling to ensure that .env misconfigurations do not go unnoticed. - If NODE_DATAAPI_URL is not defined, we will continue to log error every interval. - If the constructed checkUrl is invalid, we will continue to log error every interval --- node/Makefile | 4 ++-- node/node.go | 25 +++++++++++++------------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/node/Makefile b/node/Makefile index 0bd4580866..9f2c0633e0 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,7 +1,7 @@ GITCOMMIT := $(shell git rev-parse --short HEAD) GITDATE := $(shell git log -1 --format=%cd --date=unix) -# GitVersion provides the semantic versioning for the project +# GitVersion provides the semantic versioning for the project. If docker is not installed, semver fallsback to 0.0.0 SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) ifeq ($(SEMVER), ) SEMVER = "0.0.0" @@ -19,7 +19,7 @@ docker: docker-node docker-plugin docker-node: cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile -docker-nodeplugin: +docker-plugin: cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile semver: diff --git a/node/node.go b/node/node.go index da70973f2c..95e0cb4e22 100644 --- a/node/node.go +++ b/node/node.go @@ -473,16 +473,6 @@ func (n *Node) checkNodeReachability() { n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0") return } - if n.Config.DataApiUrl == "" { - n.Logger.Error("Node reachability checks disabled!!! DataAPI URL is not configured") - return - } - - checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) - if err != nil { - n.Logger.Error("Node reachability checks disabled!!! Failed to parse reachability check url", err) - return - } n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node") ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second) @@ -491,17 +481,28 @@ func (n *Node) checkNodeReachability() { for { <-ticker.C + if n.Config.DataApiUrl == "" { + n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env") + continue + } + + checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) + if err != nil { + n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String()) + return + } + n.Logger.Info("Calling reachability check", "url", checkUrl.String()) resp, err := http.Get(checkUrl.String()) if err != nil { - n.Logger.Error("Reachability check failed", err) + n.Logger.Error("Reachability check request failed", err) continue } else if resp.StatusCode == 404 { n.Logger.Error("Reachability check failed - operator id not found", "status", resp.StatusCode, "operator_id", n.Config.ID.Hex()) continue } else if resp.StatusCode != 200 { - n.Logger.Error("Reachability check failed", "status", resp.StatusCode) + n.Logger.Error("Reachability check request failed", "status", resp.StatusCode) continue } From 7e2eec57766f52cdaf238e1c89ea711d1c27a80d Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Sat, 11 May 2024 21:07:19 -0700 Subject: [PATCH 5/7] Document semver fallback --- node/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/Makefile b/node/Makefile index 9f2c0633e0..5b91ed1517 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,10 +1,10 @@ GITCOMMIT := $(shell git rev-parse --short HEAD) GITDATE := $(shell git log -1 --format=%cd --date=unix) -# GitVersion provides the semantic versioning for the project. If docker is not installed, semver fallsback to 0.0.0 +# GitVersion provides the semantic versioning for the project. SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) ifeq ($(SEMVER), ) -SEMVER = "0.0.0" +SEMVER = "0.0.0" # Fallback if docker is not installed or gitversion fails endif build: clean From 6f158961caacb322dfbbee0bf4c1f77a4214a793 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 14 May 2024 09:51:27 -0700 Subject: [PATCH 6/7] Add metrics for reachability status Disable reachability goroutine if configuration is broken --- node/metrics.go | 10 ++++++++++ node/node.go | 28 ++++++++++++++++------------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/node/metrics.go b/node/metrics.go index 5b7fc53ce7..65bf149fbb 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -43,6 +43,8 @@ type Metrics struct { AccuSocketUpdates prometheus.Counter // avs node spec eigen_ metrics: https://eigen.nethermind.io/docs/spec/metrics/metrics-prom-spec EigenMetrics eigenmetrics.Metrics + // Reachability gauge to monitoring the reachability of the node's retrieval/dispersal sockets + ReachabilityGauge *prometheus.GaugeVec registry *prometheus.Registry // socketAddr is the address at which the metrics server will be listening. @@ -129,6 +131,14 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log Help: "the total number of node's socket address updates", }, ), + ReachabilityGauge: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "reachability_status", + Help: "the reachability status of the nodes retrievel/dispersal sockets", + }, + []string{"service"}, + ), EigenMetrics: eigenMetrics, logger: logger.With("component", "NodeMetrics"), registry: reg, diff --git a/node/node.go b/node/node.go index 95e0cb4e22..bde3732466 100644 --- a/node/node.go +++ b/node/node.go @@ -474,6 +474,17 @@ func (n *Node) checkNodeReachability() { return } + if n.Config.DataApiUrl == "" { + n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env") + return + } + + checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) + if err != nil { + n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String()) + return + } + n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node") ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second) defer ticker.Stop() @@ -481,18 +492,7 @@ func (n *Node) checkNodeReachability() { for { <-ticker.C - if n.Config.DataApiUrl == "" { - n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env") - continue - } - - checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex())) - if err != nil { - n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String()) - return - } - - n.Logger.Info("Calling reachability check", "url", checkUrl.String()) + n.Logger.Debug("Calling reachability check", "url", checkUrl.String()) resp, err := http.Get(checkUrl.String()) if err != nil { @@ -521,13 +521,17 @@ func (n *Node) checkNodeReachability() { if responseObject.DispersalOnline { n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(1.0) } else { n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(0.0) } if responseObject.RetrievalOnline { n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(1.0) } else { n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket) + n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(0.0) } } } From 280785c9eb4db86a8f710a77c4ae620f2b1fc8f5 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 14 May 2024 10:48:56 -0700 Subject: [PATCH 7/7] Drop default reachability poll interval to 60 secs --- node/flags/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/flags/flags.go b/node/flags/flags.go index 13a9b47563..c27351a15d 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -183,9 +183,9 @@ var ( } ReachabilityPollIntervalSecFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "reachability-poll-interval"), - Usage: "How often (in second) to check if node is reachabile from DA backend", + Usage: "How often (in second) to check if node is reachabile from Disperser", Required: false, - Value: "300", + Value: "60", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "REACHABILITY_POLL_INTERVAL"), } // Optional DataAPI URL. If not set, reachability checks are disabled