From 77e4bfd75c32645152267e37a72b545246b41185 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:30:30 -0700 Subject: [PATCH 1/4] wip --- disperser/dataapi/queried_operators_handlers.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index b2ff89d78b..842173901a 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -7,9 +7,11 @@ import ( "sort" "time" + "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/gammazero/workerpool" + "google.golang.org/grpc/health/grpc_health_v1" ) type OperatorOnlineStatus struct { @@ -196,9 +198,16 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op return portCheckResponse, nil } -// method to check if operator is online -// Note: This method is least intrusive way to check if operator is online -// AlternateSolution: Should we add an endpt to check if operator is online? +// query operator host info endpoint if available +func (s *Server) checkNodeInfo(socket string, operatorId string, timeoutSecs int, logger logging.Logger) (*node.NodeInfoReply, error) { + var client node.NodeInfoRequest + + client = grpc_health_v1.NewHealthClient(s.disperserConn) + + return client.Check(ctx, &node.NodeInfoReply{}) +} + +// method to check if operator is online via socket dial func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { if !ValidOperatorIP(socket, logger) { logger.Error("port check blocked invalid operator IP", "socket", socket) From d9c30d265bb2db743f2ad2fc9480c75759f7866c Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:58:27 -0700 Subject: [PATCH 2/4] Collect NodeInfo endpoint if port check was successful --- .../dataapi/queried_operators_handlers.go | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index 842173901a..5aeebfa3e6 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -11,7 +11,8 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/gammazero/workerpool" - "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type OperatorOnlineStatus struct { @@ -182,6 +183,11 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op dispersalSocket := operatorSocket.GetDispersalSocket() dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) + if dispersalOnline { + // collect node info if online + getNodeInfo(dispersalSocket, operatorId, s.logger) + } + // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, @@ -199,12 +205,21 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op } // query operator host info endpoint if available -func (s *Server) checkNodeInfo(socket string, operatorId string, timeoutSecs int, logger logging.Logger) (*node.NodeInfoReply, error) { - var client node.NodeInfoRequest - - client = grpc_health_v1.NewHealthClient(s.disperserConn) +func getNodeInfo(socket string, operatorId string, logger logging.Logger) { + conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err) + return + } + defer conn.Close() + client := node.NewDispersalClient(conn) + reply, err := client.NodeInfo(context.Background(), &node.NodeInfoRequest{}) + if err != nil { + logger.Info("NodeInfo", "operatorId", operatorId, "semver", "unknown") + return + } - return client.Check(ctx, &node.NodeInfoReply{}) + logger.Info("NodeInfo", "operatorId", operatorId, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) } // method to check if operator is online via socket dial From 0a27be102bdfca18d7752095ce53885f91ecaa71 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:46:27 -0700 Subject: [PATCH 3/4] Add timeout to NodeInfo grpc client --- disperser/dataapi/queried_operators_handlers.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index 5aeebfa3e6..707dc2439c 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -185,7 +185,7 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op if dispersalOnline { // collect node info if online - getNodeInfo(dispersalSocket, operatorId, s.logger) + getNodeInfo(dispersalSocket, operatorId, 3, s.logger) } // Create the metadata regardless of online status @@ -205,7 +205,7 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op } // query operator host info endpoint if available -func getNodeInfo(socket string, operatorId string, logger logging.Logger) { +func getNodeInfo(socket string, operatorId string, timeoutSecs int, logger logging.Logger) { conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err) @@ -213,7 +213,9 @@ func getNodeInfo(socket string, operatorId string, logger logging.Logger) { } defer conn.Close() client := node.NewDispersalClient(conn) - reply, err := client.NodeInfo(context.Background(), &node.NodeInfoRequest{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSecs)*time.Second) + defer cancel() + reply, err := client.NodeInfo(ctx, &node.NodeInfoRequest{}) if err != nil { logger.Info("NodeInfo", "operatorId", operatorId, "semver", "unknown") return From 0d027646ef1d5c1974f946fb7e286e3d8fcc3b36 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 17 Jul 2024 13:06:27 -0700 Subject: [PATCH 4/4] Reuse upstream context --- disperser/dataapi/queried_operators_handlers.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index 707dc2439c..6c2e684d51 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -185,7 +185,7 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op if dispersalOnline { // collect node info if online - getNodeInfo(dispersalSocket, operatorId, 3, s.logger) + getNodeInfo(ctx, dispersalSocket, operatorId, s.logger) } // Create the metadata regardless of online status @@ -205,7 +205,7 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op } // query operator host info endpoint if available -func getNodeInfo(socket string, operatorId string, timeoutSecs int, logger logging.Logger) { +func getNodeInfo(ctx context.Context, socket string, operatorId string, logger logging.Logger) { conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err) @@ -213,8 +213,6 @@ func getNodeInfo(socket string, operatorId string, timeoutSecs int, logger loggi } defer conn.Close() client := node.NewDispersalClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSecs)*time.Second) - defer cancel() reply, err := client.NodeInfo(ctx, &node.NodeInfoRequest{}) if err != nil { logger.Info("NodeInfo", "operatorId", operatorId, "semver", "unknown")