diff --git a/api/clients/v2/accountant.go b/api/clients/v2/accountant.go index 02dd58d9f6..99605e848c 100644 --- a/api/clients/v2/accountant.go +++ b/api/clients/v2/accountant.go @@ -107,6 +107,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo } return 0, a.cumulativePayment, nil } + return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available") } diff --git a/api/clients/v2/disperser_client.go b/api/clients/v2/disperser_client.go index 3ef20404be..4f4415fea4 100644 --- a/api/clients/v2/disperser_client.go +++ b/api/clients/v2/disperser_client.go @@ -103,6 +103,7 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error { if err != nil { return fmt.Errorf("error setting payment state for accountant: %w", err) } + return nil } diff --git a/docker-bake.hcl b/docker-bake.hcl index ecee0f2aef..2c7f04e4b6 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -44,6 +44,7 @@ group "all" { "churner", "dataapi", "traffic-generator", + "traffic-generator2", "controller", "relay" ] @@ -84,6 +85,7 @@ group "internal-release" { "churner-internal", "dataapi-internal", "traffic-generator-internal", + "traffic-generator2-internal", "controller-internal", "relay-internal" ] @@ -201,19 +203,19 @@ target "traffic-generator-internal" { ] } -target "traffic-generator2" { +target "traffic-generator-v2" { context = "." - dockerfile = "./trafficgenerator2.Dockerfile" + dockerfile = "./trafficgenerator-v2.Dockerfile" target = "generator2" - tags = ["${REGISTRY}/${REPO}/traffic-generator2:${BUILD_TAG}"] + tags = ["${REGISTRY}/${REPO}/traffic-generator-v2:${BUILD_TAG}"] } -target "traffic-generator2-internal" { - inherits = ["traffic-generator2"] +target "traffic-generator-v2-internal" { + inherits = ["traffic-generator-v2"] tags = [ - "${REGISTRY}/eigenda-traffic-generator2:${BUILD_TAG}", - "${REGISTRY}/eigenda-traffic-generator2:${GIT_SHA}", - "${REGISTRY}/eigenda-traffic-generator2:sha-${GIT_SHORT_SHA}" + "${REGISTRY}/eigenda-traffic-generator-v2:${BUILD_TAG}", + "${REGISTRY}/eigenda-traffic-generator-v2:${GIT_SHA}", + "${REGISTRY}/eigenda-traffic-generator-v2:sha-${GIT_SHORT_SHA}" ] } diff --git a/tools/traffic/Makefile b/tools/traffic/Makefile index fd2893a709..5c7c9cdd7b 100644 --- a/tools/traffic/Makefile +++ b/tools/traffic/Makefile @@ -23,25 +23,13 @@ build2: clean run2: build2 TRAFFIC_GENERATOR_LOG_FORMAT=text \ - TRAFFIC_GENERATOR_DISPERSER_HOSTNAME=localhost \ - TRAFFIC_GENERATOR_DISPERSER_PORT=32003 \ - RETRIEVER_HOSTNAME=localhost \ - RETRIEVER_GRPC_PORT=32003 \ - RETRIEVER_BLS_OPERATOR_STATE_RETRIVER=0x5f3f1dBD7B74C6B46e8c44f98792A1dAf8d69154 \ - RETRIEVER_EIGENDA_SERVICE_MANAGER=0x851356ae760d987E095750cCeb3bC6014560891C \ - RETRIEVER_TIMEOUT=10s \ + TRAFFIC_GENERATOR_DISPERSER_HOSTNAME=disperser-preprod-holesky.eigenda.xyz \ + TRAFFIC_GENERATOR_DISPERSER_PORT=443 \ + TRAFFIC_GENERATOR_DISPERSER_USE_SECURE_GRPC=true \ TRAFFIC_GENERATOR_NUM_WRITE_INSTANCES=1 \ - TRAFFIC_GENERATOR_WRITE_REQUEST_INTERVAL=1s \ + TRAFFIC_GENERATOR_WRITE_REQUEST_INTERVAL=5s \ TRAFFIC_GENERATOR_DATA_SIZE=1000 \ - TRAFFIC_GENERATOR_SIGNER_PRIVATE_KEY_HEX=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA \ - TRAFFIC_GENERATOR_PRIVATE_KEY=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA \ + TRAFFIC_GENERATOR_SIGNER_PRIVATE_KEY_HEX=73ae7e3a40b59caacb1cda8fa04f4e7fa5bb2b37101f9f3506290c201f57bf7b \ TRAFFIC_GENERATOR_METRICS_HTTP_PORT=9101 \ - TRAFFIC_GENERATOR_GRAPH_URL=http://localhost:8000/subgraphs/name/Layr-Labs/eigenda-operator-state \ - TRAFFIC_GENERATOR_THE_GRAPH_URL=http://localhost:8000/subgraphs/name/Layr-Labs/eigenda-operator-state \ - TRAFFIC_GENERATOR_G1_PATH=../../inabox/resources/kzg/g1.point \ - TRAFFIC_GENERATOR_G2_PATH=../../inabox/resources/kzg/g2.point \ - TRAFFIC_GENERATOR_CACHE_PATH=../../inabox/resources/kzg/SRSTables \ - TRAFFIC_GENERATOR_SRS_ORDER=3000 \ - TRAFFIC_GENERATOR_SRS_LOAD=3000 \ - TRAFFIC_GENERATOR_CHAIN_RPC=http://localhost:8545 \ + TRAFFIC_GENERATOR_CUSTOM_QUORUM_NUMBERS=1 \ ./bin/server2 diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 2b147f1a49..c1cd26bd6f 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -2,13 +2,11 @@ package config import ( "errors" - "fmt" - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/retriever" "time" + "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/urfave/cli" ) @@ -19,17 +17,14 @@ type Config struct { LoggingConfig common.LoggerConfig // Configuration for the disperser client. - DisperserClientConfig *clients.Config + DisperserClientConfig *clients.DisperserClientConfig - // Configuration for the retriever client. - RetrievalClientConfig *retriever.Config + // Signer private key + SignerPrivateKey string // Configuration for the graph. TheGraphConfig *thegraph.Config - // Configuration for the EigenDA client. - EigenDAClientConfig *clients.EigenDAClientConfig - // Configures the traffic generator workers. WorkerConfig WorkerConfig @@ -47,6 +42,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return nil, err } customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name) + if len(customQuorums) == 0 { + return nil, errors.New("no custom quorum numbers provided") + } + customQuorumsUint8 := make([]uint8, len(customQuorums)) for i, q := range customQuorums { if q < 0 || q > 255 { @@ -55,17 +54,14 @@ func NewConfig(ctx *cli.Context) (*Config, error) { customQuorumsUint8[i] = uint8(q) } - retrieverConfig := retriever.ReadRetrieverConfig(ctx) - config := &Config{ - DisperserClientConfig: &clients.Config{ + DisperserClientConfig: &clients.DisperserClientConfig{ Hostname: ctx.GlobalString(HostnameFlag.Name), Port: ctx.GlobalString(GrpcPortFlag.Name), - Timeout: ctx.Duration(TimeoutFlag.Name), UseSecureGrpcFlag: ctx.GlobalBool(UseSecureGrpcFlag.Name), }, - RetrievalClientConfig: retrieverConfig, + SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), TheGraphConfig: &thegraph.Config{ Endpoint: ctx.String(TheGraphUrlFlag.Name), @@ -73,12 +69,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { MaxRetries: ctx.Int(TheGraphRetriesFlag.Name), }, - EigenDAClientConfig: &clients.EigenDAClientConfig{ - RPC: fmt.Sprintf("%s:%s", ctx.GlobalString(HostnameFlag.Name), ctx.GlobalString(GrpcPortFlag.Name)), - SignerPrivateKeyHex: ctx.String(SignerPrivateKeyFlag.Name), - DisableTLS: ctx.GlobalBool(DisableTLSFlag.Name), - }, - LoggingConfig: *loggerConfig, MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name), @@ -103,19 +93,12 @@ func NewConfig(ctx *cli.Context) (*Config, error) { RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), - EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr, - SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), - CustomQuorums: customQuorumsUint8, + CustomQuorums: customQuorumsUint8, MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name), MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name), }, } - err = config.EigenDAClientConfig.CheckAndSetDefaults() - if err != nil { - return nil, err - } - return config, nil } diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index c4218e52cf..fc0ec776d9 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -1,14 +1,10 @@ package config import ( - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" - "github.com/Layr-Labs/eigenda/retriever/flags" "time" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/indexer" "github.com/urfave/cli" ) @@ -133,7 +129,7 @@ var ( TheGraphUrlFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "the-graph-url"), Usage: "URL of the subgraph instance.", - Required: true, + Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_URL"), } TheGraphPullIntervalFlag = cli.DurationFlag{ @@ -225,7 +221,6 @@ var ( var requiredFlags = []cli.Flag{ HostnameFlag, GrpcPortFlag, - TheGraphUrlFlag, } var optionalFlags = []cli.Flag{ @@ -243,6 +238,7 @@ var optionalFlags = []cli.Flag{ RequiredDownloadsFlag, DisableTLSFlag, MetricsHTTPPortFlag, + TheGraphUrlFlag, TheGraphPullIntervalFlag, TheGraphRetriesFlag, VerifierIntervalFlag, @@ -261,10 +257,6 @@ var Flags []cli.Flag func init() { Flags = append(requiredFlags, optionalFlags...) - Flags = append(Flags, flags.RetrieverFlags(envPrefix)...) - Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) - Flags = append(Flags, geth.EthClientFlags(envPrefix)...) Flags = append(Flags, indexer.CLIFlags(envPrefix)...) - Flags = append(Flags, thegraph.CLIFlags(envPrefix)...) } diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go index 78eb7a6fc2..0c6ee0eeb2 100644 --- a/tools/traffic/config/worker_config.go +++ b/tools/traffic/config/worker_config.go @@ -38,8 +38,7 @@ type WorkerConfig struct { // The address of the EigenDA service manager smart contract, in hex. EigenDAServiceManager string - // The private key to use for signing requests. - SignerPrivateKey string + // Custom quorum numbers to use for the traffic generator. CustomQuorums []uint8 diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index beec5e393b..b29971401d 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -9,23 +9,14 @@ import ( "syscall" "time" - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/core/eth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" - retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" + "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/common" + auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core" ) // Generator simulates read/write traffic to the DA service. @@ -37,7 +28,7 @@ import ( // └------------┘ └------------┘ // // The traffic generator is built from three principal components: one or more writers -// that write blobs, a statusTracker that polls the dispenser service until blobs are confirmed, +// that write blobs, a statusTracker that polls the disperser service until blobs are confirmed, // and one or more readers that read blobs. // // When a writer finishes writing a blob, it sends information about that blob to the statusTracker. @@ -50,12 +41,10 @@ type Generator struct { generatorMetrics metrics.Metrics logger *logging.Logger disperserClient clients.DisperserClient - eigenDAClient *clients.EigenDAClient - config *config.Config + // eigenDAClient *clients.EigenDAClient #TODO: Add this back in when the client is implemented + config *config.Config - writers []*workers.BlobWriter - statusTracker *workers.BlobStatusTracker - readers []*workers.BlobReader + writers []*workers.BlobWriter } func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { @@ -64,16 +53,20 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { return nil, err } - var signer core.BlobRequestSigner - if config.EigenDAClientConfig.SignerPrivateKeyHex != "" { - signer = auth.NewLocalBlobRequestSigner(config.EigenDAClientConfig.SignerPrivateKeyHex) + var signer *auth.LocalBlobRequestSigner + if config.SignerPrivateKey != "" { + signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKey) + } else { + logger.Error("signer private key is required") + return nil, fmt.Errorf("signer private key is required") } - logger2 := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) - client, err := clients.NewEigenDAClient(logger2, *config.EigenDAClientConfig) + signerAccountId, err := signer.GetAccountID() if err != nil { - return nil, err + return nil, fmt.Errorf("error getting account ID: %w", err) } + accountId := gethcommon.HexToAddress(signerAccountId) + logger.Info("Initializing traffic generator", "accountId", accountId) ctx, cancel := context.WithCancel(context.Background()) waitGroup := sync.WaitGroup{} @@ -84,25 +77,14 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { config.WorkerConfig.MetricsBlacklist, config.WorkerConfig.MetricsFuzzyBlacklist) - blobTable := table.NewBlobStore() - - unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100) + uncertifiedKeyChannel := make(chan *workers.UncertifiedKey, 100) // TODO: create a dedicated reservation for traffic generator - disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer) + disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() return nil, fmt.Errorf("new disperser-client: %w", err) } - statusVerifier := workers.NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - unconfirmedKeyChannel, - blobTable, - disperserClient, - generatorMetrics) writers := make([]*workers.BlobWriter, 0) for i := 0; i < int(config.WorkerConfig.NumWriteInstances); i++ { @@ -112,27 +94,11 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { logger, &config.WorkerConfig, disperserClient, - unconfirmedKeyChannel, + uncertifiedKeyChannel, generatorMetrics) writers = append(writers, &writer) } - retriever, chainClient := buildRetriever(config) - - readers := make([]*workers.BlobReader, 0) - for i := 0; i < int(config.WorkerConfig.NumReadInstances); i++ { - reader := workers.NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - retriever, - chainClient, - blobTable, - generatorMetrics) - readers = append(readers, &reader) - } - return &Generator{ ctx: &ctx, cancel: &cancel, @@ -140,83 +106,27 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { generatorMetrics: generatorMetrics, logger: &logger, disperserClient: disperserClient, - eigenDAClient: client, config: config, writers: writers, - statusTracker: &statusVerifier, - readers: readers, }, nil } -// buildRetriever creates a retriever client for the traffic generator. -func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth.ChainClient) { - loggerConfig := common.DefaultLoggerConfig() - - logger, err := common.NewLogger(loggerConfig) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate logger: %s", err)) - } - - gethClient, err := geth.NewMultiHomingClient(config.RetrievalClientConfig.EthClientConfig, gethcommon.Address{}, logger) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate geth client: %s", err)) - } - - tx, err := eth.NewReader( - logger, - gethClient, - config.RetrievalClientConfig.BLSOperatorStateRetrieverAddr, - config.RetrievalClientConfig.EigenDAServiceManagerAddr) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate transactor: %s", err)) - } - - cs := eth.NewChainState(tx, gethClient) - - chainState := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) - - var assignmentCoordinator core.AssignmentCoordinator = &core.StdAssignmentCoordinator{} - - nodeClient := clients.NewNodeClient(config.NodeClientTimeout) - - config.RetrievalClientConfig.EncoderConfig.LoadG2Points = true - v, err := verifier.NewVerifier(&config.RetrievalClientConfig.EncoderConfig, nil) - if err != nil { - panic(fmt.Sprintf("Unable to build statusTracker: %s", err)) - } - - retriever, err := clients.NewRetrievalClient( - logger, - chainState, - assignmentCoordinator, - nodeClient, - v, - config.RetrievalClientConfig.NumConnections) - - if err != nil { - panic(fmt.Sprintf("Unable to build retriever: %s", err)) - } - - chainClient := retrivereth.NewChainClient(gethClient, logger) - - return retriever, chainClient -} - // Start instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. func (generator *Generator) Start() error { generator.generatorMetrics.Start() - generator.statusTracker.Start() + + // generator.statusTracker.Start() for _, writer := range generator.writers { writer.Start() time.Sleep(generator.config.InstanceLaunchInterval) } - for _, reader := range generator.readers { - reader.Start() - time.Sleep(generator.config.InstanceLaunchInterval) - } + // for _, reader := range generator.readers { + // reader.Start() + // time.Sleep(generator.config.InstanceLaunchInterval) + // } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go index 40a0b415e9..24ee9dc81e 100644 --- a/tools/traffic/table/blob_metadata.go +++ b/tools/traffic/table/blob_metadata.go @@ -1,6 +1,10 @@ package table -import "errors" +import ( + "errors" + + corev2 "github.com/Layr-Labs/eigenda/core/v2" +) // BlobMetadata encapsulates various information about a blob written by the traffic generator. type BlobMetadata struct { @@ -13,6 +17,9 @@ type BlobMetadata struct { // Hash of the batch header that the blob was written in. BatchHeaderHash [32]byte + // Blob header of the blob. + BlobHeader *corev2.BlobHeader + // Checksum of the blob. Checksum [16]byte diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go index 99c9c91a02..37992675be 100644 --- a/tools/traffic/table/blob_store_test.go +++ b/tools/traffic/table/blob_store_test.go @@ -1,10 +1,11 @@ package table import ( + "testing" + tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/stretchr/testify/assert" "golang.org/x/exp/rand" - "testing" ) // randomMetadata generates a random BlobMetadata instance. diff --git a/tools/traffic/workers/blob_reader.go b/tools/traffic/workers/blob_reader.go deleted file mode 100644 index 033eb9ee64..0000000000 --- a/tools/traffic/workers/blob_reader.go +++ /dev/null @@ -1,233 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "fmt" - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/retriever/eth" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" - gcommon "github.com/ethereum/go-ethereum/common" - "math/big" - "sync" - "time" -) - -// BlobReader reads blobs from the DA network at a configured rate. -type BlobReader struct { - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - retriever clients.RetrievalClient - chainClient eth.ChainClient - - // blobsToRead blobs we are required to read a certain number of times. - blobsToRead *table.BlobStore - - // metrics for the blob reader. - metrics *blobReaderMetrics -} - -type blobReaderMetrics struct { - generatorMetrics metrics.Metrics - fetchBatchHeaderMetric metrics.LatencyMetric - fetchBatchHeaderSuccess metrics.CountMetric - fetchBatchHeaderFailure metrics.CountMetric - readLatencyMetric metrics.LatencyMetric - readSuccessMetric metrics.CountMetric - readFailureMetric metrics.CountMetric - recombinationSuccessMetric metrics.CountMetric - recombinationFailureMetric metrics.CountMetric - validBlobMetric metrics.CountMetric - invalidBlobMetric metrics.CountMetric - operatorSuccessMetrics map[core.OperatorID]metrics.CountMetric - operatorFailureMetrics map[core.OperatorID]metrics.CountMetric - requiredReadPoolSizeMetric metrics.GaugeMetric - optionalReadPoolSizeMetric metrics.GaugeMetric -} - -// NewBlobReader creates a new BlobReader instance. -func NewBlobReader( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - retriever clients.RetrievalClient, - chainClient eth.ChainClient, - blobStore *table.BlobStore, - generatorMetrics metrics.Metrics) BlobReader { - - return BlobReader{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - retriever: retriever, - chainClient: chainClient, - blobsToRead: blobStore, - metrics: &blobReaderMetrics{ - generatorMetrics: generatorMetrics, - fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"), - fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"), - fetchBatchHeaderFailure: generatorMetrics.NewCountMetric("fetch_batch_header_failure"), - recombinationSuccessMetric: generatorMetrics.NewCountMetric("recombination_success"), - recombinationFailureMetric: generatorMetrics.NewCountMetric("recombination_failure"), - readLatencyMetric: generatorMetrics.NewLatencyMetric("read"), - validBlobMetric: generatorMetrics.NewCountMetric("valid_blob"), - invalidBlobMetric: generatorMetrics.NewCountMetric("invalid_blob"), - readSuccessMetric: generatorMetrics.NewCountMetric("read_success"), - readFailureMetric: generatorMetrics.NewCountMetric("read_failure"), - operatorSuccessMetrics: make(map[core.OperatorID]metrics.CountMetric), - operatorFailureMetrics: make(map[core.OperatorID]metrics.CountMetric), - requiredReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("required_read_pool_size"), - optionalReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("optional_read_pool_size"), - }, - } -} - -// Start begins a blob reader goroutine. -func (r *BlobReader) Start() { - r.waitGroup.Add(1) - ticker := time.NewTicker(r.config.ReadRequestInterval) - go func() { - defer r.waitGroup.Done() - for { - select { - case <-(*r.ctx).Done(): - err := (*r.ctx).Err() - if err != nil { - r.logger.Info("blob reader context closed", "err:", err) - } - return - case <-ticker.C: - r.randomRead() - } - } - }() -} - -// randomRead reads a random blob. -func (r *BlobReader) randomRead() { - metadata := r.blobsToRead.GetNext() - if metadata == nil { - // There are no blobs that we are required to read. - return - } - - r.metrics.requiredReadPoolSizeMetric.Set(float64(r.blobsToRead.Size())) - - ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout) - defer cancel() - - start := time.Now() - batchHeader, err := r.chainClient.FetchBatchHeader( - ctxTimeout, - gcommon.HexToAddress(r.config.EigenDAServiceManager), - metadata.BatchHeaderHash[:], - big.NewInt(int64(0)), - nil) - if err != nil { - r.logger.Error("failed to get batch header", "err:", err) - r.metrics.fetchBatchHeaderFailure.Increment() - return - } - r.metrics.fetchBatchHeaderMetric.ReportLatency(time.Since(start)) - - r.metrics.fetchBatchHeaderSuccess.Increment() - - ctxTimeout, cancel = context.WithTimeout(*r.ctx, r.config.RetrieveBlobChunksTimeout) - defer cancel() - - start = time.Now() - chunks, err := r.retriever.RetrieveBlobChunks( - ctxTimeout, - metadata.BatchHeaderHash, - uint32(metadata.BlobIndex), - uint(batchHeader.ReferenceBlockNumber), - batchHeader.BlobHeadersRoot, - core.QuorumID(0)) - if err != nil { - r.logger.Error("failed to read chunks", "err:", err) - r.metrics.readFailureMetric.Increment() - return - } - r.metrics.readLatencyMetric.ReportLatency(time.Since(start)) - - r.metrics.readSuccessMetric.Increment() - - assignments := chunks.Assignments - - data, err := r.retriever.CombineChunks(chunks) - if err != nil { - r.logger.Error("failed to combine chunks", "err:", err) - r.metrics.recombinationFailureMetric.Increment() - return - } - r.metrics.recombinationSuccessMetric.Increment() - - r.verifyBlob(metadata, &data) - - indexSet := make(map[encoding.ChunkNumber]bool) - for index := range chunks.Indices { - indexSet[chunks.Indices[index]] = true - } - - for id, assignment := range assignments { - for index := assignment.StartIndex; index < assignment.StartIndex+assignment.NumChunks; index++ { - if indexSet[index] { - r.reportChunk(id) - } else { - r.reportMissingChunk(id) - } - } - } -} - -// reportChunk reports a successful chunk read. -func (r *BlobReader) reportChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorSuccessMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId)) - r.metrics.operatorSuccessMetrics[operatorId] = metric - } - - metric.Increment() -} - -// reportMissingChunk reports a missing chunk. -func (r *BlobReader) reportMissingChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorFailureMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_witheld_chunk", operatorId)) - r.metrics.operatorFailureMetrics[operatorId] = metric - } - - metric.Increment() -} - -// verifyBlob performs sanity checks on the blob. -func (r *BlobReader) verifyBlob(metadata *table.BlobMetadata, blob *[]byte) { - // Trim off the padding. - truncatedBlob := (*blob)[:metadata.Size] - recomputedChecksum := md5.Sum(truncatedBlob) - - if metadata.Checksum == recomputedChecksum { - r.metrics.validBlobMetric.Increment() - } else { - r.metrics.invalidBlobMetric.Increment() - } -} diff --git a/tools/traffic/workers/blob_reader_test.go b/tools/traffic/workers/blob_reader_test.go deleted file mode 100644 index ea9d5f0773..0000000000 --- a/tools/traffic/workers/blob_reader_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "sync" - "testing" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - apiMock "github.com/Layr-Labs/eigenda/api/clients/mock" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" - retrieverMock "github.com/Layr-Labs/eigenda/retriever/mock" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" -) - -// TestBlobReaderNoOptionalReads tests the BlobReader's basic functionality' -func TestBlobReader(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - blobTable := table.NewBlobStore() - - readerMetrics := metrics.NewMockMetrics() - - chainClient := &retrieverMock.MockChainClient{} - chainClient.On( - "FetchBatchHeader", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything).Return(&binding.BatchHeader{}, nil) - retrievalClient := &apiMock.MockRetrievalClient{} - - blobReader := NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig{}, - retrievalClient, - chainClient, - blobTable, - readerMetrics) - - blobSize := 1024 - readPermits := 2 - blobCount := 100 - - invalidBlobCount := 0 - - // Insert some blobs into the table. - for i := 0; i < blobCount; i++ { - - key := make([]byte, 32) - _, err = rand.Read(key) - assert.Nil(t, err) - - blobData := make([]byte, blobSize) - _, err = rand.Read(blobData) - assert.Nil(t, err) - - var checksum [16]byte - if i%10 == 0 { - // Simulate an invalid blob - invalidBlobCount++ - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - } else { - checksum = md5.Sum(blobData) - } - - batchHeaderHash := [32]byte{} - _, err = rand.Read(batchHeaderHash[:]) - assert.Nil(t, err) - - blobMetadata, err := table.NewBlobMetadata( - key, - checksum, - uint(blobSize), - uint(i), - batchHeaderHash, - readPermits) - assert.Nil(t, err) - - // Simplify tracking by hijacking the BlobHeaderLength field to store the blob index, - // which is used as a unique identifier within this test. - chunks := &clients.BlobChunks{BlobHeaderLength: blobMetadata.BlobIndex} - retrievalClient.On("RetrieveBlobChunks", - blobMetadata.BatchHeaderHash, - uint32(blobMetadata.BlobIndex), - mock.Anything, - mock.Anything, - mock.Anything).Return(chunks, nil) - retrievalClient.On("CombineChunks", chunks).Return(blobData, nil) - - blobTable.Add(blobMetadata) - } - - // Do a bunch of reads. - expectedTotalReads := uint(readPermits * blobCount) - for i := uint(0); i < expectedTotalReads; i++ { - blobReader.randomRead() - - chainClient.AssertNumberOfCalls(t, "FetchBatchHeader", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "RetrieveBlobChunks", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "CombineChunks", int(i+1)) - - remainingPermits := uint(0) - for _, metadata := range blobTable.GetAll() { - remainingPermits += uint(metadata.RemainingReadPermits) - } - assert.Equal(t, remainingPermits, expectedTotalReads-i-1) - - assert.Equal(t, i+1, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("recombination_success"))) - } - - expectedInvalidBlobs := uint(invalidBlobCount * readPermits) - expectedValidBlobs := expectedTotalReads - expectedInvalidBlobs - - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("required_read_pool_size"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("optional_read_pool_size"))) - - // Table is empty, so doing a random read should have no effect. - blobReader.randomRead() - - // Give the system a moment to attempt to do work. This should not result in any reads. - time.Sleep(time.Second / 10) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("recombination_success"))) - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - - cancel() -} diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go deleted file mode 100644 index 15bcca84bf..0000000000 --- a/tools/traffic/workers/blob_status_tracker.go +++ /dev/null @@ -1,256 +0,0 @@ -package workers - -import ( - "context" - "math/rand" - "sync" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" -) - -// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. -// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly. -// This is a thread safe data structure. -type BlobStatusTracker struct { - - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - // Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service. - confirmedBlobs *table.BlobStore - - // The disperser client used to monitor the disperser service. - disperser clients.DisperserClient - - // The keys of blobs that have not yet been confirmed by the disperser service. - unconfirmedBlobs []*UnconfirmedKey - - // Newly added keys that require verification. - keyChannel chan *UnconfirmedKey - - blobsInFlightMetric metrics.GaugeMetric - getStatusLatencyMetric metrics.LatencyMetric - confirmationLatencyMetric metrics.LatencyMetric - getStatusErrorCountMetric metrics.CountMetric - unknownCountMetric metrics.CountMetric - processingCountMetric metrics.CountMetric - dispersingCountMetric metrics.CountMetric - failedCountMetric metrics.CountMetric - insufficientSignaturesCountMetric metrics.CountMetric - confirmedCountMetric metrics.CountMetric - finalizedCountMetric metrics.CountMetric -} - -// NewBlobStatusTracker creates a new BlobStatusTracker instance. -func NewBlobStatusTracker( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - keyChannel chan *UnconfirmedKey, - table *table.BlobStore, - disperser clients.DisperserClient, - generatorMetrics metrics.Metrics) BlobStatusTracker { - - return BlobStatusTracker{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - keyChannel: keyChannel, - confirmedBlobs: table, - disperser: disperser, - unconfirmedBlobs: make([]*UnconfirmedKey, 0), - blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"), - getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"), - confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"), - getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"), - unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"), - processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"), - dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"), - failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"), - insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"), - confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"), - finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"), - } -} - -// Start begins the status goroutine, which periodically polls -// the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) Start() { - tracker.waitGroup.Add(1) - go tracker.monitor() -} - -// monitor periodically polls the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) monitor() { - ticker := time.NewTicker(tracker.config.TrackerInterval) - for { - select { - case <-(*tracker.ctx).Done(): - tracker.waitGroup.Done() - return - case key := <-tracker.keyChannel: - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key) - case <-ticker.C: - tracker.poll() - } - } -} - -// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. -// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys. -func (tracker *BlobStatusTracker) poll() { - - // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. - // Revisit this method if there are performance problems. - - nonFinalBlobs := make([]*UnconfirmedKey, 0) - for _, key := range tracker.unconfirmedBlobs { - - blobStatus, err := tracker.getBlobStatus(key) - if err != nil { - tracker.logger.Error("failed to get blob status: ", "err:", err) - // There was an error getting status. Try again later. - nonFinalBlobs = append(nonFinalBlobs, key) - continue - } - - tracker.updateStatusMetrics(blobStatus.Status) - if isBlobStatusTerminal(blobStatus.Status) { - if isBlobStatusConfirmed(blobStatus.Status) { - tracker.forwardToReader(key, blobStatus) - } - } else { - // try again later - nonFinalBlobs = append(nonFinalBlobs, key) - } - } - tracker.unconfirmedBlobs = nonFinalBlobs - tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs))) -} - -// isBlobStatusTerminal returns true if the status is a terminal status. -func isBlobStatusTerminal(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_FAILED: - return true - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser.BlobStatus_CONFIRMED: - // Technically this isn't terminal, as confirmed blobs eventually should become finalized. - // But it is terminal from the status tracker's perspective, since we stop tracking the blob - // once it becomes either confirmed or finalized. - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// isBlobStatusConfirmed returns true if the status is a confirmed status. -func isBlobStatusConfirmed(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_CONFIRMED: - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// updateStatusMetrics updates the metrics for the reported status of a blob. -func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { - switch status { - case disperser.BlobStatus_UNKNOWN: - tracker.unknownCountMetric.Increment() - case disperser.BlobStatus_PROCESSING: - tracker.processingCountMetric.Increment() - case disperser.BlobStatus_DISPERSING: - tracker.dispersingCountMetric.Increment() - case disperser.BlobStatus_FAILED: - tracker.failedCountMetric.Increment() - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - tracker.insufficientSignaturesCountMetric.Increment() - case disperser.BlobStatus_CONFIRMED: - tracker.confirmedCountMetric.Increment() - case disperser.BlobStatus_FINALIZED: - tracker.finalizedCountMetric.Increment() - default: - tracker.logger.Error("unknown blob status", "status:", status) - } -} - -// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error -// getting the status. -func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { - ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout) - defer cancel() - - start := time.Now() - status, err := tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) - - if err != nil { - tracker.getStatusErrorCountMetric.Increment() - return nil, err - } - tracker.getStatusLatencyMetric.ReportLatency(time.Since(start)) - - return status, nil -} - -// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. -func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { - batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash) - blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() - - confirmationTime := time.Now() - confirmationLatency := confirmationTime.Sub(key.SubmissionTime) - tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency) - - requiredDownloads := tracker.config.RequiredDownloads - var downloadCount int32 - if requiredDownloads < 0 { - // Allow unlimited downloads. - downloadCount = -1 - } else if requiredDownloads == 0 { - // Do not download blob. - return - } else if requiredDownloads < 1 { - // Download blob with probability equal to requiredDownloads. - if rand.Float64() < requiredDownloads { - // Download the blob once. - downloadCount = 1 - } else { - // Do not download blob. - return - } - } else { - // Download blob requiredDownloads times. - downloadCount = int32(requiredDownloads) - } - - blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount)) - if err != nil { - tracker.logger.Error("failed to create blob metadata", "err:", err) - return - } - tracker.confirmedBlobs.Add(blobMetadata) -} diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go deleted file mode 100644 index 9246e2052c..0000000000 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package workers - -import ( - "context" - "fmt" - disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" - "sync" - "testing" - "time" -) - -func getRandomStatus() disperser_rpc.BlobStatus { - return disperser_rpc.BlobStatus(rand.Intn(7)) -} - -func isStatusTerminal(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_UNKNOWN: - return false - case disperser_rpc.BlobStatus_PROCESSING: - return false - case disperser_rpc.BlobStatus_DISPERSING: - return false - - case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser_rpc.BlobStatus_FAILED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - case disperser_rpc.BlobStatus_CONFIRMED: - return true - default: - panic("unknown status") - } -} - -func isStatusSuccess(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_CONFIRMED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -func TestStatusTracker(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - requiredDownloads := rand.Intn(10) + 1 - config := &config.WorkerConfig{ - RequiredDownloads: float64(requiredDownloads), - } - - blobStore := table.NewBlobStore() - - trackerMetrics := metrics.NewMockMetrics() - - disperserClient := &MockDisperserClient{} - - tracker := NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - config, - make(chan *UnconfirmedKey), - blobStore, - disperserClient, - trackerMetrics) - - expectedGetStatusCount := 0 - statusCounts := make(map[disperser_rpc.BlobStatus]int) - checksums := make(map[string][16]byte) - sizes := make(map[string]uint) - - statusMap := make(map[string]disperser_rpc.BlobStatus) - - for i := 0; i < 100; i++ { - - // Add some new keys to track. - newKeys := rand.Intn(10) - for j := 0; j < newKeys; j++ { - key := make([]byte, 16) - checksum := [16]byte{} - size := rand.Uint32() - - _, err = rand.Read(key) - assert.Nil(t, err) - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - - checksums[string(key)] = checksum - sizes[string(key)] = uint(size) - - stringifiedKey := string(key) - statusMap[stringifiedKey] = disperser_rpc.BlobStatus_UNKNOWN - - unconfirmedKey := &UnconfirmedKey{ - Key: key, - Checksum: checksum, - Size: uint(size), - SubmissionTime: time.Now(), - } - - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, unconfirmedKey) - } - - // Reset the mock disperser client. - disperserClient.mock = mock.Mock{} - expectedGetStatusCount = 0 - - // Choose some new statuses to be returned. - // Count the number of status queries we expect to see in this iteration. - for key, status := range statusMap { - var newStatus disperser_rpc.BlobStatus - if isStatusTerminal(status) { - newStatus = status - } else { - // Blobs in a non-terminal status will be queried again. - expectedGetStatusCount += 1 - // Set the next status to be returned. - newStatus = getRandomStatus() - statusMap[key] = newStatus - - statusCounts[newStatus] += 1 - } - disperserClient.mock.On("GetBlobStatus", []byte(key)).Return( - &disperser_rpc.BlobStatusReply{ - Status: newStatus, - Info: &disperser_rpc.BlobInfo{ - BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ - BatchMetadata: &disperser_rpc.BatchMetadata{ - BatchHeaderHash: make([]byte, 32), - }, - }, - }, - }, nil) - } - - // Simulate advancement of time, allowing the tracker to process the new keys. - tracker.poll() - - // Validate the number of calls made to the disperser client. - disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) - - // Read the data in the confirmedBlobs into a map for quick lookup. - tableData := make(map[string]*table.BlobMetadata) - for _, metadata := range blobStore.GetAll() { - tableData[string(metadata.Key)] = metadata - } - - blobsInFlight := 0 - for key, status := range statusMap { - metadata, present := tableData[key] - - if !isStatusTerminal(status) { - blobsInFlight++ - } - - if isStatusSuccess(status) { - // Successful blobs should be in the confirmedBlobs. - assert.True(t, present) - } else { - // Non-successful blobs should not be in the confirmedBlobs. - assert.False(t, present) - } - - // Verify metadata. - if present { - assert.Equal(t, checksums[key], metadata.Checksum) - assert.Equal(t, sizes[key], metadata.Size) - assert.Equal(t, requiredDownloads, metadata.RemainingReadPermits) - } - } - - // Verify metrics. - for status, count := range statusCounts { // TODO - metricName := fmt.Sprintf("get_status_%s", status.String()) - assert.Equal(t, float64(count), trackerMetrics.GetCount(metricName), "status: %s", status.String()) - } - if float64(blobsInFlight) != trackerMetrics.GetGaugeValue("blobs_in_flight") { - assert.Equal(t, float64(blobsInFlight), trackerMetrics.GetGaugeValue("blobs_in_flight")) - } - } - - cancel() - tu.ExecuteWithTimeout(func() { - waitGroup.Wait() - }, time.Second) -} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index e0add4b355..52d95caa6e 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -8,7 +8,8 @@ import ( "sync" "time" - "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/clients/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" @@ -32,8 +33,8 @@ type BlobWriter struct { // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient - // Unconfirmed keys are sent here. - unconfirmedKeyChannel chan *UnconfirmedKey + // Uncertified keys are sent here. + uncertifiedKeyChannel chan *UncertifiedKey // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData []byte @@ -55,7 +56,7 @@ func NewBlobWriter( logger logging.Logger, config *config.WorkerConfig, disperser clients.DisperserClient, - unconfirmedKeyChannel chan *UnconfirmedKey, + uncertifiedKeyChannel chan *UncertifiedKey, generatorMetrics metrics.Metrics) BlobWriter { var fixedRandomData []byte @@ -78,7 +79,7 @@ func NewBlobWriter( logger: logger, config: config, disperser: disperser, - unconfirmedKeyChannel: unconfirmedKeyChannel, + uncertifiedKeyChannel: uncertifiedKeyChannel, fixedRandomData: fixedRandomData, writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), @@ -128,7 +129,7 @@ func (writer *BlobWriter) writeNextBlob() { checksum := md5.Sum(data) - writer.unconfirmedKeyChannel <- &UnconfirmedKey{ + writer.uncertifiedKeyChannel <- &UncertifiedKey{ Key: key, Checksum: checksum, Size: uint(len(data)), @@ -153,20 +154,24 @@ func (writer *BlobWriter) getRandomData() ([]byte, error) { } // sendRequest sends a blob to a disperser. -func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) { +func (writer *BlobWriter) sendRequest(data []byte) (key v2.BlobKey, err error) { ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) defer cancel() - if writer.config.SignerPrivateKey != "" { - _, key, err = writer.disperser.DisperseBlobAuthenticated( - ctxTimeout, - data, - writer.config.CustomQuorums) - } else { - _, key, err = writer.disperser.DisperseBlob( - ctxTimeout, - data, - writer.config.CustomQuorums) + writer.logger.Info("sending blob request", "size", len(data)) + status, key, err := writer.disperser.DisperseBlob( + ctxTimeout, + data, + 0, + writer.config.CustomQuorums, + 0, + ) + if err != nil { + writer.logger.Error("failed to send blob request", "err", err) + return } + + writer.logger.Info("blob request sent", "key", key.Hex(), "status", status.String()) + return } diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index dcd70841c6..23a494b124 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -4,6 +4,9 @@ import ( "context" "crypto/md5" "fmt" + "sync" + "testing" + "github.com/Layr-Labs/eigenda/common" tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/Layr-Labs/eigenda/disperser" @@ -13,8 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "golang.org/x/exp/rand" - "sync" - "testing" ) func TestBlobWriter(t *testing.T) { @@ -56,7 +57,7 @@ func TestBlobWriter(t *testing.T) { } disperserClient := &MockDisperserClient{} - unconfirmedKeyChannel := make(chan *UnconfirmedKey, 100) + unconfirmedKeyChannel := make(chan *UncertifiedKey, 100) generatorMetrics := metrics.NewMockMetrics() diff --git a/tools/traffic/workers/unconfirmed_key.go b/tools/traffic/workers/unconfirmed_key.go index c86b8f1dcd..9523cd04c1 100644 --- a/tools/traffic/workers/unconfirmed_key.go +++ b/tools/traffic/workers/unconfirmed_key.go @@ -1,11 +1,15 @@ package workers -import "time" +import ( + "time" -// UnconfirmedKey is a Key that has not yet been confirmed by the disperser service. -type UnconfirmedKey struct { + v2 "github.com/Layr-Labs/eigenda/core/v2" +) + +// UncertifiedKey is a Key that has not yet been certified by the disperser service. +type UncertifiedKey struct { // The Key of the blob. - Key []byte + Key v2.BlobKey // The Size of the blob in bytes. Size uint // The Checksum of the blob. diff --git a/trafficgenerator2.Dockerfile b/trafficgenerator-v2.Dockerfile similarity index 77% rename from trafficgenerator2.Dockerfile rename to trafficgenerator-v2.Dockerfile index 8f9dc149b7..092d0e6e71 100644 --- a/trafficgenerator2.Dockerfile +++ b/trafficgenerator-v2.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,9 +13,8 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd2 -FROM alpine:3.18 as generator2 +FROM alpine:3.18 AS generator2 COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin -COPY --from=builder /app/inabox/resources /resources ENTRYPOINT ["generator"] diff --git a/trafficgenerator.Dockerfile b/trafficgenerator.Dockerfile index 8d39eaf337..5ffc767c29 100644 --- a/trafficgenerator.Dockerfile +++ b/trafficgenerator.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,7 +13,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd -FROM alpine:3.18 as generator +FROM alpine:3.18 AS generator COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin