Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node metrics #948

Merged
merged 35 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
362a390
Add metrics to relay.
cody-littley Nov 26, 2024
3b4637e
Incremental progress.
cody-littley Nov 27, 2024
60f015e
Incremental progress.
cody-littley Nov 27, 2024
2d7e9ef
Incremental progress, need running averages.
cody-littley Nov 27, 2024
b8c7d35
Added running average metrics for GetChunks
cody-littley Nov 27, 2024
a6692c4
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
b9d71d6
Documentation
cody-littley Nov 27, 2024
671f0c8
Add time window to metrics doc
cody-littley Nov 27, 2024
4adb7ea
Added GetBlob metrics.
cody-littley Nov 27, 2024
5579a88
Cleanup.
cody-littley Nov 27, 2024
2b84f21
Cleanup test
cody-littley Nov 27, 2024
fb0cad5
Add locking for running average metric.
cody-littley Nov 27, 2024
a2c05cb
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
dfd2925
Add cache metrics.
cody-littley Nov 27, 2024
24f5f5d
Fix test bug
cody-littley Nov 27, 2024
c3adb70
Made suggested change.
cody-littley Dec 3, 2024
5c8c173
Added metrics for v2 DA node.
cody-littley Dec 3, 2024
1795654
Added metrics documentation.
cody-littley Dec 3, 2024
434c6b9
Merge branch 'master' into node-metrics
cody-littley Dec 6, 2024
5c9274c
Revert deletions.
cody-littley Dec 6, 2024
4d4bfe9
Remove documentation.
cody-littley Dec 6, 2024
d9d898c
Reimplement without metrics framework.
cody-littley Dec 6, 2024
8bd8ff1
Cleanup.
cody-littley Dec 6, 2024
2070eee
Stop background thread when metrics are stopped.
cody-littley Dec 6, 2024
cffa884
Revert unintentional change
cody-littley Dec 6, 2024
5c511c9
Made suggested changes.
cody-littley Dec 10, 2024
a15117f
Don't start two metrics servers.
cody-littley Dec 10, 2024
1076a8f
Fix compile issue.
cody-littley Dec 10, 2024
bbf9005
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
62ec4f6
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
143b798
Enable debug code.
cody-littley Dec 12, 2024
168ded5
Debug
cody-littley Dec 12, 2024
dd21f61
Fix inabox bug.
cody-littley Dec 12, 2024
1bb1404
Made suggested changes.
cody-littley Dec 12, 2024
864e7d0
Made suggested changes.
cody-littley Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func NodeMain(ctx *cli.Context) error {

// Creates the GRPC server.
server := nodegrpc.NewServer(config, node, logger, ratelimiter)
serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
serverV2, err := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
if err != nil {
return fmt.Errorf("failed to create server v2: %v", err)
}
err = nodegrpc.RunServers(server, serverV2, config, logger)

return err
Expand Down
6 changes: 4 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Config struct {
EnableNodeApi bool
NodeApiPort string
EnableMetrics bool
MetricsPort string
MetricsPort int
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
Expand All @@ -62,6 +62,7 @@ type Config struct {
OverrideStoreDurationBlocks int64
QuorumIDList []core.QuorumID
DbPath string
DBSizePollPeriod time.Duration
LogPath string
PrivateBls string
ID core.OperatorID
Expand Down Expand Up @@ -207,7 +208,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name),
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
Expand All @@ -218,6 +219,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name),
QuorumIDList: ids,
DbPath: ctx.GlobalString(flags.DbPathFlag.Name),
DBSizePollPeriod: ctx.GlobalDuration(flags.DBSizePollPeriodFlag.Name),
PrivateBls: privateBls,
EthClientConfig: ethClientConfig,
EncoderConfig: kzg.ReadCLIConfig(ctx),
Expand Down
12 changes: 10 additions & 2 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_METRICS"),
}
MetricsPortFlag = cli.StringFlag{
MetricsPortFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-port"),
Usage: "Port at which node listens for metrics calls",
Required: false,
Value: "9091",
Value: 9091,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Expand Down Expand Up @@ -98,6 +98,13 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DB_PATH"),
}
DBSizePollPeriodFlag = cli.DurationFlag{
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
Name: common.PrefixFlag(FlagPrefix, "db-size-poll-period"),
Usage: "The period at which the database size is polled. If set to 0, the database size is not polled.",
Required: false,
Value: 10 * time.Minute,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DB_SIZE_POLL_PERIOD"),
}
// The files for encrypted private keys.
BlsKeyFileFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "bls-key-file"),
Expand Down Expand Up @@ -384,6 +391,7 @@ var optionalFlags = []cli.Flag{
ChunkDownloadTimeoutFlag,
PprofHttpPort,
EnablePprof,
DBSizePollPeriodFlag,
}

func init() {
Expand Down
6 changes: 4 additions & 2 deletions node/grpc/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
return errors.New("node V2 server is not configured")
}

serverV2.metrics.Start()

go func() {
for {
addr := fmt.Sprintf("%s:%s", localhost, config.InternalDispersalPort)
Expand All @@ -33,7 +35,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand All @@ -60,7 +62,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand Down
38 changes: 32 additions & 6 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -15,6 +13,8 @@ import (
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/shirou/gopsutil/mem"
"runtime"
"time"
)

// ServerV2 implements the Node v2 proto APIs.
Expand All @@ -26,21 +26,28 @@ type ServerV2 struct {
node *node.Node
ratelimiter common.RateLimiter
logger logging.Logger
metrics *V2Metrics
}

// NewServerV2 creates a new Server instance with the provided parameters.
func NewServerV2(
config *node.Config,
node *node.Node,
logger logging.Logger,
ratelimiter common.RateLimiter,
) *ServerV2 {
ratelimiter common.RateLimiter) (*ServerV2, error) {

metrics, err := NewV2Metrics(logger, config.MetricsPort, config.DbPath, config.DBSizePollPeriod)
if err != nil {
return nil, err
}

return &ServerV2{
config: config,
node: node,
ratelimiter: ratelimiter,
logger: logger,
}
metrics: metrics,
}, nil
}

func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) {
Expand All @@ -58,6 +65,8 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
start := time.Now()

if !s.config.EnableV2 {
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}
Expand Down Expand Up @@ -92,7 +101,7 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
}
storeChan := make(chan storeResult)
go func() {
keys, err := s.node.StoreV2.StoreBatch(batch, rawBundles)
keys, size, err := s.node.StoreV2.StoreBatch(batch, rawBundles)
if err != nil {
storeChan <- storeResult{
keys: nil,
Expand All @@ -101,6 +110,8 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return
}

s.metrics.ReportStoreChunksDataSize(size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the store operation gets reverted in L125?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general rule of thumb, should we report incremental metrics if the operation as a whole fails? Or should we only report metrics for an operation if it is successful? (in another PR, you suggested that I should report latencies even when there are failures).

I can make this only report if the request ends up being valid, but I want to be consistent with the way we handle scenarios like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this will be left the way it currently is.


storeChan <- storeResult{
keys: keys,
err: nil,
Expand All @@ -124,6 +135,10 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
}

sig := s.node.KeyPair.SignMessage(batchHeaderHash).Bytes()

timeElapsed := time.Since(start)
s.metrics.ReportStoreChunksLatency(timeElapsed)
cody-littley marked this conversation as resolved.
Show resolved Hide resolved

return &pb.StoreChunksReply{
Signature: sig[:],
}, nil
Expand All @@ -144,6 +159,8 @@ func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*core
}

func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
start := time.Now()

if !s.config.EnableV2 {
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}
Expand All @@ -166,6 +183,15 @@ func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get chunks: %v", err))
}

var size uint64
for _, chunk := range chunks {
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
size += uint64(len(chunk))
}
s.metrics.ReportGetChunksDataSize(size)

elapsed := time.Since(start)
s.metrics.ReportGetChunksLatency(elapsed)
cody-littley marked this conversation as resolved.
Show resolved Hide resolved

return &pb.GetChunksReply{
Chunks: chunks,
}, nil
Expand Down
3 changes: 2 additions & 1 deletion node/grpc/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents {
RelayClient: atomicRelayClient,
}
node.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap))
server := grpc.NewServerV2(config, node, logger, ratelimiter)
server, err := grpc.NewServerV2(config, node, logger, ratelimiter)
require.NoError(t, err)
return &testComponents{
server: server,
node: node,
Expand Down
Loading
Loading