Skip to content

Commit

Permalink
Add reachability check loop to expose reachability failures in node l…
Browse files Browse the repository at this point in the history
…ogs (#556)
  • Loading branch information
pschork authored May 14, 2024
1 parent f366a3c commit d1c657c
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 7 deletions.
25 changes: 22 additions & 3 deletions node/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
clean:
rm -rf ./bin
GITCOMMIT := $(shell git rev-parse --short HEAD)
GITDATE := $(shell git log -1 --format=%cd --date=unix)

# GitVersion provides the semantic versioning for the project.
SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer)
ifeq ($(SEMVER), )
SEMVER = "0.0.0" # Fallback if docker is not installed or gitversion fails
endif

build: clean
# cd .. && make protoc
go mod tidy
go build -o ./bin/node ./cmd

clean:
rm -rf ./bin

docker: docker-node docker-plugin

docker-node:
cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile

docker-plugin:
cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile

semver:
echo "${SEMVER}"
18 changes: 14 additions & 4 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

const (
// Min number of seconds for the ExpirationPollIntervalSecFlag.
minExpirationPollIntervalSec = 3
AppName = "da-node"
minExpirationPollIntervalSec = 3
minReachabilityPollIntervalSec = 10
AppName = "da-node"
)

var (
Expand Down Expand Up @@ -65,9 +66,11 @@ type Config struct {
PubIPProvider string
PubIPCheckInterval time.Duration
ChurnerUrl string
DataApiUrl string
NumBatchValidators int
ClientIPHeader string
UseSecureGrpc bool
ReachabilityPollIntervalSec uint64

EthClientConfig geth.EthClientConfig
LoggerConfig common.LoggerConfig
Expand Down Expand Up @@ -96,8 +99,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
}

expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name)
if expirationPollIntervalSec <= minExpirationPollIntervalSec {
return nil, errors.New("the expiration-poll-interval flag must be greater than 3 seconds")
if expirationPollIntervalSec < minExpirationPollIntervalSec {
return nil, fmt.Errorf("the expiration-poll-interval flag must be >= %d seconds", minExpirationPollIntervalSec)
}

reachabilityPollIntervalSec := ctx.GlobalUint64(flags.ReachabilityPollIntervalSecFlag.Name)
if reachabilityPollIntervalSec != 0 && reachabilityPollIntervalSec < minReachabilityPollIntervalSec {
return nil, fmt.Errorf("the reachability-poll-interval flag must be >= %d seconds or 0 to disable", minReachabilityPollIntervalSec)
}

testMode := ctx.GlobalBool(flags.EnableTestModeFlag.Name)
Expand Down Expand Up @@ -172,6 +180,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
ExpirationPollIntervalSec: expirationPollIntervalSec,
ReachabilityPollIntervalSec: reachabilityPollIntervalSec,
EnableTestMode: testMode,
OverrideBlockStaleMeasure: ctx.GlobalInt64(flags.OverrideBlockStaleMeasureFlag.Name),
OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name),
Expand All @@ -186,6 +195,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PubIPProvider: ctx.GlobalString(flags.PubIPProviderFlag.Name),
PubIPCheckInterval: pubIPCheckInterval,
ChurnerUrl: ctx.GlobalString(flags.ChurnerUrlFlag.Name),
DataApiUrl: ctx.GlobalString(flags.DataApiUrlFlag.Name),
NumBatchValidators: ctx.GlobalInt(flags.NumBatchValidatorsFlag.Name),
ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name),
UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name),
Expand Down
17 changes: 17 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ var (
Value: "180",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "EXPIRATION_POLL_INTERVAL"),
}
ReachabilityPollIntervalSecFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "reachability-poll-interval"),
Usage: "How often (in second) to check if node is reachabile from Disperser",
Required: false,
Value: "60",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "REACHABILITY_POLL_INTERVAL"),
}
// Optional DataAPI URL. If not set, reachability checks are disabled
DataApiUrlFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dataapi-url"),
Usage: "URL of the DataAPI",
Required: false,
Value: "",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DATAAPI_URL"),
}
// NumBatchValidators is the maximum number of parallel workers used to
// validate a batch (defaults to 128).
NumBatchValidatorsFlag = cli.IntFlag{
Expand Down Expand Up @@ -263,6 +278,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
RegisterAtNodeStartFlag,
ExpirationPollIntervalSecFlag,
ReachabilityPollIntervalSecFlag,
EnableTestModeFlag,
OverrideBlockStaleMeasureFlag,
OverrideStoreDurationBlocksFlag,
Expand All @@ -274,6 +290,7 @@ var optionalFlags = []cli.Flag{
ChurnerUseSecureGRPC,
EcdsaKeyFileFlag,
EcdsaKeyPasswordFlag,
DataApiUrlFlag,
}

func init() {
Expand Down
10 changes: 10 additions & 0 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Metrics struct {
AccuSocketUpdates prometheus.Counter
// avs node spec eigen_ metrics: https://eigen.nethermind.io/docs/spec/metrics/metrics-prom-spec
EigenMetrics eigenmetrics.Metrics
// Reachability gauge to monitoring the reachability of the node's retrieval/dispersal sockets
ReachabilityGauge *prometheus.GaugeVec

registry *prometheus.Registry
// socketAddr is the address at which the metrics server will be listening.
Expand Down Expand Up @@ -129,6 +131,14 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log
Help: "the total number of node's socket address updates",
},
),
ReachabilityGauge: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Name: "reachability_status",
Help: "the reachability status of the nodes retrievel/dispersal sockets",
},
[]string{"service"},
),
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
Expand Down
82 changes: 82 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package node
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
"net/url"
"os"
"sync"
"time"
Expand Down Expand Up @@ -185,6 +189,7 @@ func (n *Node) Start(ctx context.Context) error {
}

go n.expireLoop()
go n.checkNodeReachability()

// Build the socket based on the hostname/IP provided in the CLI
socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort))
Expand Down Expand Up @@ -451,3 +456,80 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) {
}
}
}

// OperatorReachabilityResponse is the response object for the reachability check
type OperatorReachabilityResponse struct {
OperatorID string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
}

func (n *Node) checkNodeReachability() {
if n.Config.ReachabilityPollIntervalSec == 0 {
n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0")
return
}

if n.Config.DataApiUrl == "" {
n.Logger.Error("Unable to perform reachability check - NODE_DATAAPI_URL is not defined in .env")
return
}

checkUrl, err := url.Parse(fmt.Sprintf("%s/api/v1/operators-info/port-check?operator_id=%s", n.Config.DataApiUrl, n.Config.ID.Hex()))
if err != nil {
n.Logger.Error("Reachability check failed - invalid check url", err, "checkUrl", checkUrl.String())
return
}

n.Logger.Info("Start nodeReachabilityCheck goroutine in background to check the reachability of the operator node")
ticker := time.NewTicker(time.Duration(n.Config.ReachabilityPollIntervalSec) * time.Second)
defer ticker.Stop()

for {
<-ticker.C

n.Logger.Debug("Calling reachability check", "url", checkUrl.String())

resp, err := http.Get(checkUrl.String())
if err != nil {
n.Logger.Error("Reachability check request failed", err)
continue
} else if resp.StatusCode == 404 {
n.Logger.Error("Reachability check failed - operator id not found", "status", resp.StatusCode, "operator_id", n.Config.ID.Hex())
continue
} else if resp.StatusCode != 200 {
n.Logger.Error("Reachability check request failed", "status", resp.StatusCode)
continue
}

data, err := io.ReadAll(resp.Body)
if err != nil {
n.Logger.Error("Failed to read reachability check response", err, "data", resp.Body)
continue
}

var responseObject OperatorReachabilityResponse
err = json.Unmarshal(data, &responseObject)
if err != nil {
n.Logger.Error("Reachability check failed to unmarshal json response", err)
continue
}

if responseObject.DispersalOnline {
n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(1.0)
} else {
n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(0.0)
}
if responseObject.RetrievalOnline {
n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(1.0)
} else {
n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(0.0)
}
}
}

0 comments on commit d1c657c

Please sign in to comment.