Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reachability check loop to expose reachability failures in node logs #556

Merged
merged 7 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions node/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
clean:
rm -rf ./bin
GITCOMMIT := $(shell git rev-parse --short HEAD)
GITDATE := $(shell git log -1 --format=%cd --date=unix)

# GitVersion provides the semantic versioning for the project.
SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer)
ifeq ($(SEMVER), )
SEMVER = "0.0.0" # Fallback if docker is not installed or gitversion fails
endif

build: clean
# cd .. && make protoc
go mod tidy
go build -o ./bin/node ./cmd

clean:
rm -rf ./bin

docker: docker-node docker-plugin

docker-node:
cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile

docker-plugin:
cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile

semver:
echo "${SEMVER}"
18 changes: 14 additions & 4 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

const (
// Min number of seconds for the ExpirationPollIntervalSecFlag.
minExpirationPollIntervalSec = 3
AppName = "da-node"
minExpirationPollIntervalSec = 3
minReachabilityPollIntervalSec = 10
AppName = "da-node"
)

var (
Expand Down Expand Up @@ -65,9 +66,11 @@ type Config struct {
PubIPProvider string
PubIPCheckInterval time.Duration
ChurnerUrl string
DataApiUrl string
NumBatchValidators int
ClientIPHeader string
UseSecureGrpc bool
ReachabilityPollIntervalSec uint64

EthClientConfig geth.EthClientConfig
LoggerConfig common.LoggerConfig
Expand Down Expand Up @@ -96,8 +99,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
}

expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name)
if expirationPollIntervalSec <= minExpirationPollIntervalSec {
return nil, errors.New("the expiration-poll-interval flag must be greater than 3 seconds")
if expirationPollIntervalSec < minExpirationPollIntervalSec {
return nil, fmt.Errorf("the expiration-poll-interval flag must be >= %d seconds", minExpirationPollIntervalSec)
}

reachabilityPollIntervalSec := ctx.GlobalUint64(flags.ReachabilityPollIntervalSecFlag.Name)
if reachabilityPollIntervalSec != 0 && reachabilityPollIntervalSec < minReachabilityPollIntervalSec {
return nil, fmt.Errorf("the reachability-poll-interval flag must be >= %d seconds or 0 to disable", minReachabilityPollIntervalSec)
}

testMode := ctx.GlobalBool(flags.EnableTestModeFlag.Name)
Expand Down Expand Up @@ -172,6 +180,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
ExpirationPollIntervalSec: expirationPollIntervalSec,
ReachabilityPollIntervalSec: reachabilityPollIntervalSec,
EnableTestMode: testMode,
OverrideBlockStaleMeasure: ctx.GlobalInt64(flags.OverrideBlockStaleMeasureFlag.Name),
OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name),
Expand All @@ -186,6 +195,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PubIPProvider: ctx.GlobalString(flags.PubIPProviderFlag.Name),
PubIPCheckInterval: pubIPCheckInterval,
ChurnerUrl: ctx.GlobalString(flags.ChurnerUrlFlag.Name),
DataApiUrl: ctx.GlobalString(flags.DataApiUrlFlag.Name),
NumBatchValidators: ctx.GlobalInt(flags.NumBatchValidatorsFlag.Name),
ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name),
UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name),
Expand Down
17 changes: 17 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ var (
Value: "180",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "EXPIRATION_POLL_INTERVAL"),
}
ReachabilityPollIntervalSecFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "reachability-poll-interval"),
Usage: "How often (in second) to check if node is reachabile from Disperser",
Required: false,
Value: "60",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "REACHABILITY_POLL_INTERVAL"),
}
// Optional DataAPI URL. If not set, reachability checks are disabled
DataApiUrlFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dataapi-url"),
Usage: "URL of the DataAPI",
Required: false,
Value: "",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DATAAPI_URL"),
}
// NumBatchValidators is the maximum number of parallel workers used to
// validate a batch (defaults to 128).
NumBatchValidatorsFlag = cli.IntFlag{
Expand Down Expand Up @@ -263,6 +278,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
RegisterAtNodeStartFlag,
ExpirationPollIntervalSecFlag,
ReachabilityPollIntervalSecFlag,
EnableTestModeFlag,
OverrideBlockStaleMeasureFlag,
OverrideStoreDurationBlocksFlag,
Expand All @@ -274,6 +290,7 @@ var optionalFlags = []cli.Flag{
ChurnerUseSecureGRPC,
EcdsaKeyFileFlag,
EcdsaKeyPasswordFlag,
DataApiUrlFlag,
}

func init() {
Expand Down
10 changes: 10 additions & 0 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Metrics struct {
AccuSocketUpdates prometheus.Counter
// avs node spec eigen_ metrics: https://eigen.nethermind.io/docs/spec/metrics/metrics-prom-spec
EigenMetrics eigenmetrics.Metrics
// Reachability gauge to monitoring the reachability of the node's retrieval/dispersal sockets
ReachabilityGauge *prometheus.GaugeVec

registry *prometheus.Registry
// socketAddr is the address at which the metrics server will be listening.
Expand Down Expand Up @@ -129,6 +131,14 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log
Help: "the total number of node's socket address updates",
},
),
ReachabilityGauge: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Name: "reachability_status",
Help: "the reachability status of the nodes retrievel/dispersal sockets",
},
[]string{"service"},
),
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
Expand Down
82 changes: 82 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package node
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
"net/url"
"os"
"sync"
"time"
Expand Down Expand Up @@ -185,6 +189,7 @@ func (n *Node) Start(ctx context.Context) error {
}

go n.expireLoop()
go n.checkNodeReachability()

// Build the socket based on the hostname/IP provided in the CLI
socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort))
Expand Down Expand Up @@ -453,3 +458,80 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) {
}
}
}

// OperatorReachabilityResponse is the response object for the reachability check
type OperatorReachabilityResponse struct {
OperatorID string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
}

func (n *Node) checkNodeReachability() {
pschork marked this conversation as resolved.
Show resolved Hide resolved
if n.Config.ReachabilityPollIntervalSec == 0 {
n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0")
return
}

if n.Config.DataApiUrl == "" {
n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env")
return
}

checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex()))
if err != nil {
n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String())
return
}

n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node")
ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second)
defer ticker.Stop()

for {
<-ticker.C

n.Logger.Debug("Calling reachability check", "url", checkUrl.String())

resp, err := http.Get(checkUrl.String())
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a value in the gauge in case it gets error? @shrimalmadhur

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should, otherwise it will show last value.

n.Logger.Error("Reachability check request failed", err)
continue
} else if resp.StatusCode == 404 {
n.Logger.Error("Reachability check failed - operator id not found", "status", resp.StatusCode, "operator_id", n.Config.ID.Hex())
continue
} else if resp.StatusCode != 200 {
n.Logger.Error("Reachability check request failed", "status", resp.StatusCode)
continue
}

data, err := io.ReadAll(resp.Body)
if err != nil {
n.Logger.Error("Failed to read reachability check response", err, "data", resp.Body)
continue
}

var responseObject OperatorReachabilityResponse
err = json.Unmarshal(data, &responseObject)
if err != nil {
n.Logger.Error("Reachability check failed to unmarshal json response", err)
continue
}

if responseObject.DispersalOnline {
n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(1.0)
} else {
n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(0.0)
}
if responseObject.RetrievalOnline {
n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(1.0)
} else {
n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(0.0)
}
}
}
Loading