From eaa5bc5a40c4bdb2557cccc1a34a40bf1e08a3cc Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Mon, 19 Aug 2024 09:38:04 -0500 Subject: [PATCH 1/7] Created blob verifier worker. Signed-off-by: Cody Littley --- tools/traffic/table/blob_metadata.go | 5 + tools/traffic/table/blob_store.go | 12 ++ tools/traffic/table/blob_store_test.go | 2 +- tools/traffic/workers/blob_verifier.go | 221 ++++++++++++++++++++ tools/traffic/workers/blob_verifier_test.go | 205 ++++++++++++++++++ tools/traffic/workers/blob_writer.go | 14 +- tools/traffic/workers/blob_writer_test.go | 22 +- tools/traffic/workers/key_handler.go | 7 - tools/traffic/workers/mock_key_handler.go | 24 --- tools/traffic/workers/unconfirmed_key.go | 15 ++ 10 files changed, 481 insertions(+), 46 deletions(-) create mode 100644 tools/traffic/workers/blob_verifier.go create mode 100644 tools/traffic/workers/blob_verifier_test.go delete mode 100644 tools/traffic/workers/key_handler.go delete mode 100644 tools/traffic/workers/mock_key_handler.go create mode 100644 tools/traffic/workers/unconfirmed_key.go diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go index 6307718b7a..90dc12bd0a 100644 --- a/tools/traffic/table/blob_metadata.go +++ b/tools/traffic/table/blob_metadata.go @@ -10,6 +10,9 @@ type BlobMetadata struct { // BlobIndex of the blob. BlobIndex uint + // Hash of the batch header that the blob was written in. + BatchHeaderHash []byte + // Checksum of the blob. Checksum [16]byte @@ -28,6 +31,7 @@ func NewBlobMetadata( checksum [16]byte, size uint, blobIndex uint, + batchHeaderHash []byte, readPermits int) (*BlobMetadata, error) { if readPermits == 0 { @@ -39,6 +43,7 @@ func NewBlobMetadata( Checksum: checksum, Size: size, BlobIndex: blobIndex, + BatchHeaderHash: batchHeaderHash, RemainingReadPermits: readPermits, }, nil } diff --git a/tools/traffic/table/blob_store.go b/tools/traffic/table/blob_store.go index fa28e03fcb..a127bd663e 100644 --- a/tools/traffic/table/blob_store.go +++ b/tools/traffic/table/blob_store.go @@ -60,3 +60,15 @@ func (store *BlobStore) Size() uint { return uint(len(store.blobs)) } + +// GetAll returns all blobs currently stored. For testing purposes only. +func (store *BlobStore) GetAll() []*BlobMetadata { + store.lock.Lock() + defer store.lock.Unlock() + + blobs := make([]*BlobMetadata, 0, len(store.blobs)) + for _, blob := range store.blobs { + blobs = append(blobs, blob) + } + return blobs +} diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go index f56e54b606..82d30a815c 100644 --- a/tools/traffic/table/blob_store_test.go +++ b/tools/traffic/table/blob_store_test.go @@ -13,7 +13,7 @@ func randomMetadata(t *testing.T, permits int) *BlobMetadata { checksum := [16]byte{} _, _ = rand.Read(key) _, _ = rand.Read(checksum[:]) - metadata, err := NewBlobMetadata(key, checksum, 1024, 0, permits) + metadata, err := NewBlobMetadata(key, checksum, 1024, 0, nil, permits) assert.Nil(t, err) return metadata diff --git a/tools/traffic/workers/blob_verifier.go b/tools/traffic/workers/blob_verifier.go new file mode 100644 index 0000000000..6444aac98b --- /dev/null +++ b/tools/traffic/workers/blob_verifier.go @@ -0,0 +1,221 @@ +package workers + +import ( + "context" + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + config2 "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigenda/tools/traffic/table" + "github.com/Layr-Labs/eigensdk-go/logging" + "math/rand" + "sync" + "time" +) + +// BlobVerifier periodically polls the disperser service to verify the status of blobs that were recently written. +// When blobs become confirmed, the status verifier updates the blob blobsToRead accordingly. +// This is a thread safe data structure. +type BlobVerifier struct { + + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. + waitGroup *sync.WaitGroup + + // All logs should be written using this logger. + logger logging.Logger + + // config contains the configuration for the generator. + config *config2.WorkerConfig + + // A table of confirmed blobs. Blobs are added here when they are confirmed by the disperser service. + table *table.BlobStore + + // The disperser client used to monitor the disperser service. + dispenser clients.DisperserClient + + // The keys of blobs that have not yet been confirmed by the disperser service. + unconfirmedKeys []*UnconfirmedKey + + // Newly added keys that require verification. + keyChannel chan *UnconfirmedKey + + blobsInFlightMetric metrics.GaugeMetric + getStatusLatencyMetric metrics.LatencyMetric + confirmationLatencyMetric metrics.LatencyMetric + getStatusErrorCountMetric metrics.CountMetric + unknownCountMetric metrics.CountMetric + processingCountMetric metrics.CountMetric + dispersingCountMetric metrics.CountMetric + failedCountMetric metrics.CountMetric + insufficientSignaturesCountMetric metrics.CountMetric + confirmedCountMetric metrics.CountMetric + finalizedCountMetric metrics.CountMetric +} + +// NewBlobVerifier creates a new BlobVerifier instance. +func NewBlobVerifier( + ctx *context.Context, + waitGroup *sync.WaitGroup, + logger logging.Logger, + config *config2.WorkerConfig, + keyChannel chan *UnconfirmedKey, + table *table.BlobStore, + disperser clients.DisperserClient, + generatorMetrics metrics.Metrics) BlobVerifier { + + return BlobVerifier{ + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + config: config, + keyChannel: keyChannel, + table: table, + dispenser: disperser, + unconfirmedKeys: make([]*UnconfirmedKey, 0), + blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"), + getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"), + confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"), + getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"), + unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"), + processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"), + dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"), + failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"), + insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"), + confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"), + finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"), + } +} + +// Start begins the status goroutine, which periodically polls +// the disperser service to verify the status of blobs. +func (verifier *BlobVerifier) Start() { + verifier.waitGroup.Add(1) + go verifier.monitor() +} + +// monitor periodically polls the disperser service to verify the status of blobs. +func (verifier *BlobVerifier) monitor() { + ticker := time.NewTicker(verifier.config.VerifierInterval) + for { + select { + case <-(*verifier.ctx).Done(): + verifier.waitGroup.Done() + return + case key := <-verifier.keyChannel: + verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, key) + case <-ticker.C: + verifier.poll() + } + } +} + +// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. +// If a Key is confirmed, it is added to the blob table and removed from the list of unconfirmed keys. +func (verifier *BlobVerifier) poll() { + + // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. + // Revisit this method if there are performance problems. + + unconfirmedKeys := make([]*UnconfirmedKey, 0) + for _, key := range verifier.unconfirmedKeys { + confirmed := verifier.checkStatusForBlob(key) + if !confirmed { + unconfirmedKeys = append(unconfirmedKeys, key) + } + } + verifier.unconfirmedKeys = unconfirmedKeys + verifier.blobsInFlightMetric.Set(float64(len(verifier.unconfirmedKeys))) +} + +// checkStatusForBlob checks the status of a blob. Returns true if the final blob status is known, false otherwise. +func (verifier *BlobVerifier) checkStatusForBlob(key *UnconfirmedKey) bool { + + ctxTimeout, cancel := context.WithTimeout(*verifier.ctx, verifier.config.GetBlobStatusTimeout) + defer cancel() + + status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](verifier.getStatusLatencyMetric, + func() (*disperser.BlobStatusReply, error) { + return verifier.dispenser.GetBlobStatus(ctxTimeout, key.Key) + }) + + if err != nil { + verifier.logger.Error("failed check blob status", "err:", err) + verifier.getStatusErrorCountMetric.Increment() + return false + } + + switch status.GetStatus() { + + case disperser.BlobStatus_UNKNOWN: + verifier.unknownCountMetric.Increment() + return false + case disperser.BlobStatus_PROCESSING: + verifier.processingCountMetric.Increment() + return false + case disperser.BlobStatus_DISPERSING: + verifier.dispersingCountMetric.Increment() + return false + + case disperser.BlobStatus_FAILED: + verifier.failedCountMetric.Increment() + return true + case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: + verifier.insufficientSignaturesCountMetric.Increment() + return true + + case disperser.BlobStatus_CONFIRMED: + verifier.confirmedCountMetric.Increment() + verifier.forwardToReader(key, status) + return true + case disperser.BlobStatus_FINALIZED: + verifier.finalizedCountMetric.Increment() + verifier.forwardToReader(key, status) + return true + + default: + verifier.logger.Error("unknown blob status", "status:", status.GetStatus()) + return true + } +} + +// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. +func (verifier *BlobVerifier) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { + batchHeaderHash := status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash + blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() + + confirmationTime := time.Now() + confirmationLatency := confirmationTime.Sub(key.SubmissionTime) + verifier.confirmationLatencyMetric.ReportLatency(confirmationLatency) + + requiredDownloads := verifier.config.RequiredDownloads + var downloadCount int32 + if requiredDownloads <= 0 { + // Allow unlimited downloads. + downloadCount = -1 + } else if requiredDownloads == 0 { + // Do not download blob. + return + } else if requiredDownloads < 1 { + // Download blob with probability equal to requiredDownloads. + if rand.Float64() < requiredDownloads { + // Download the blob once. + downloadCount = 1 + } else { + // Do not download blob. + return + } + } else { + // Download blob requiredDownloads times. + downloadCount = int32(requiredDownloads) + } + + blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount)) + if err != nil { + verifier.logger.Error("failed to create blob metadata", "err:", err) + return + } + verifier.table.Add(blobMetadata) +} diff --git a/tools/traffic/workers/blob_verifier_test.go b/tools/traffic/workers/blob_verifier_test.go new file mode 100644 index 0000000000..666aee1a83 --- /dev/null +++ b/tools/traffic/workers/blob_verifier_test.go @@ -0,0 +1,205 @@ +package workers + +import ( + "context" + "fmt" + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigenda/tools/traffic/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/exp/rand" + "sync" + "testing" + "time" +) + +func getRandomStatus() disperser_rpc.BlobStatus { + return disperser_rpc.BlobStatus(rand.Intn(7)) +} + +func isStatusTerminal(status disperser_rpc.BlobStatus) bool { + switch status { + case disperser_rpc.BlobStatus_UNKNOWN: + return false + case disperser_rpc.BlobStatus_PROCESSING: + return false + case disperser_rpc.BlobStatus_DISPERSING: + return false + + case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES: + return true + case disperser_rpc.BlobStatus_FAILED: + return true + case disperser_rpc.BlobStatus_FINALIZED: + return true + case disperser_rpc.BlobStatus_CONFIRMED: + return true + default: + panic("unknown status") + } +} + +func isStatusSuccess(status disperser_rpc.BlobStatus) bool { + switch status { + case disperser_rpc.BlobStatus_CONFIRMED: + return true + case disperser_rpc.BlobStatus_FINALIZED: + return true + default: + return false + } +} + +func TestBlobVerifier(t *testing.T) { + tu.InitializeRandom() + + ctx, cancel := context.WithCancel(context.Background()) + waitGroup := sync.WaitGroup{} + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + assert.Nil(t, err) + + requiredDownloads := rand.Intn(10) + config := &config.WorkerConfig{ + RequiredDownloads: float64(requiredDownloads), + } + + blobStore := table.NewBlobStore() + + verifierMetrics := metrics.NewMockMetrics() + + disperserClient := &MockDisperserClient{} + + verifier := NewBlobVerifier( + &ctx, + &waitGroup, + logger, + config, + make(chan *UnconfirmedKey), + blobStore, + disperserClient, + verifierMetrics) + + expectedGetStatusCount := 0 + statusCounts := make(map[disperser_rpc.BlobStatus]int) + checksums := make(map[string][16]byte) + sizes := make(map[string]uint) + + statusMap := make(map[string]disperser_rpc.BlobStatus) + + for i := 0; i < 100; i++ { + + // Add some new keys to track. + newKeys := rand.Intn(10) + for j := 0; j < newKeys; j++ { + key := make([]byte, 16) + checksum := [16]byte{} + size := rand.Uint32() + + _, err = rand.Read(key) + assert.Nil(t, err) + _, err = rand.Read(checksum[:]) + assert.Nil(t, err) + + checksums[string(key)] = checksum + sizes[string(key)] = uint(size) + + stringifiedKey := string(key) + statusMap[stringifiedKey] = disperser_rpc.BlobStatus_UNKNOWN + + unconfirmedKey := &UnconfirmedKey{ + Key: key, + Checksum: checksum, + Size: uint(size), + SubmissionTime: time.Now(), + } + + verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, unconfirmedKey) + } + + // Reset the mock disperser client. + disperserClient.mock = mock.Mock{} + expectedGetStatusCount = 0 + + // Choose some new statuses to be returned. + // Count the number of status queries we expect to see in this iteration. + for key, status := range statusMap { + var newStatus disperser_rpc.BlobStatus + if isStatusTerminal(status) { + newStatus = status + } else { + // Blobs in a non-terminal status will be queried again. + expectedGetStatusCount += 1 + // Set the next status to be returned. + newStatus = getRandomStatus() + statusMap[key] = newStatus + + statusCounts[newStatus] += 1 + } + disperserClient.mock.On("GetBlobStatus", []byte(key)).Return( + &disperser_rpc.BlobStatusReply{ + Status: newStatus, + Info: &disperser_rpc.BlobInfo{ + BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ + BatchMetadata: &disperser_rpc.BatchMetadata{ + BatchHeaderHash: make([]byte, 0), + }, + }, + }, + }, nil) + } + + // Simulate advancement of time, allowing the verifier to process the new keys. + verifier.poll() + + // Validate the number of calls made to the disperser client. + disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) + + // Read the data in the table into a map for quick lookup. + tableData := make(map[string]*table.BlobMetadata) + for _, metadata := range blobStore.GetAll() { + tableData[string(metadata.Key)] = metadata + } + + blobsInFlight := 0 + for key, status := range statusMap { + metadata, present := tableData[key] + + if !isStatusTerminal(status) { + blobsInFlight++ + } + + if isStatusSuccess(status) { + // Successful blobs should be in the table. + assert.True(t, present) + } else { + // Non-successful blobs should not be in the table. + assert.False(t, present) + } + + // Verify metadata. + if present { + assert.Equal(t, checksums[key], metadata.Checksum) + assert.Equal(t, sizes[key], metadata.Size) + assert.Equal(t, requiredDownloads, metadata.RemainingReadPermits) + } + } + + // Verify metrics. + for status, count := range statusCounts { + metricName := fmt.Sprintf("get_status_%s", status.String()) + assert.Equal(t, float64(count), verifierMetrics.GetCount(metricName)) + } + if float64(blobsInFlight) != verifierMetrics.GetGaugeValue("blobs_in_flight") { + assert.Equal(t, float64(blobsInFlight), verifierMetrics.GetGaugeValue("blobs_in_flight")) + } + } + + cancel() + tu.ExecuteWithTimeout(func() { + waitGroup.Wait() + }, time.Second) +} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index a30a7e5bdd..81b6c8a9ef 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -32,7 +32,7 @@ type BlobWriter struct { disperser clients.DisperserClient // Unconfirmed keys are sent here. - unconfirmedKeyHandler KeyHandler + unconfirmedKeyChannel chan *UnconfirmedKey // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData []byte @@ -54,7 +54,7 @@ func NewBlobWriter( logger logging.Logger, config *config.WorkerConfig, disperser clients.DisperserClient, - unconfirmedKeyHandler KeyHandler, + unconfirmedKeyChannel chan *UnconfirmedKey, generatorMetrics metrics.Metrics) BlobWriter { var fixedRandomData []byte @@ -77,7 +77,7 @@ func NewBlobWriter( logger: logger, config: config, disperser: disperser, - unconfirmedKeyHandler: unconfirmedKeyHandler, + unconfirmedKeyChannel: unconfirmedKeyChannel, fixedRandomData: fixedRandomData, writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), @@ -123,7 +123,13 @@ func (writer *BlobWriter) writeNextBlob() { writer.writeSuccessMetric.Increment() checksum := md5.Sum(data) - writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(data))) + + writer.unconfirmedKeyChannel <- &UnconfirmedKey{ + Key: key, + Checksum: checksum, + Size: uint(len(data)), + SubmissionTime: time.Now(), + } } // getRandomData returns a slice of random data to be used for a blob. diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 723894490a..dcd70841c6 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -26,6 +26,7 @@ func TestBlobWriter(t *testing.T) { assert.Nil(t, err) dataSize := rand.Uint64()%1024 + 64 + encodedDataSize := len(codec.ConvertByPaddingEmptyByte(make([]byte, dataSize))) authenticated := rand.Intn(2) == 0 var signerPrivateKey string @@ -55,9 +56,7 @@ func TestBlobWriter(t *testing.T) { } disperserClient := &MockDisperserClient{} - unconfirmedKeyHandler := &MockKeyHandler{} - unconfirmedKeyHandler.mock.On( - "AddUnconfirmedKey", mock.Anything, mock.Anything, mock.Anything).Return(nil) + unconfirmedKeyChannel := make(chan *UnconfirmedKey, 100) generatorMetrics := metrics.NewMockMetrics() @@ -67,7 +66,7 @@ func TestBlobWriter(t *testing.T) { logger, config, disperserClient, - unconfirmedKeyHandler, + unconfirmedKeyChannel, generatorMetrics) errorCount := 0 @@ -83,7 +82,7 @@ func TestBlobWriter(t *testing.T) { errorToReturn = nil } - // This is the key that will be assigned to the next blob. + // This is the Key that will be assigned to the next blob. keyToReturn := make([]byte, 32) _, err = rand.Read(keyToReturn) assert.Nil(t, err) @@ -96,21 +95,24 @@ func TestBlobWriter(t *testing.T) { writer.writeNextBlob() disperserClient.mock.AssertNumberOfCalls(t, functionName, 1) - unconfirmedKeyHandler.mock.AssertNumberOfCalls(t, "AddUnconfirmedKey", i+1-errorCount) if errorToReturn == nil { - dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte) assert.NotNil(t, dataSentToDisperser) - // Strip away the extra encoding bytes. We should have data of the expected size. + // Strip away the extra encoding bytes. We should have data of the expected Size. decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser) assert.Equal(t, dataSize, uint64(len(decodedData))) - // Verify that the proper data was sent to the unconfirmed key handler. + // Verify that the proper data was sent to the unconfirmed Key handler. checksum := md5.Sum(dataSentToDisperser) - unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", keyToReturn, checksum, uint(len(dataSentToDisperser))) + unconfirmedKey, ok := <-unconfirmedKeyChannel + + assert.True(t, ok) + assert.Equal(t, keyToReturn, unconfirmedKey.Key) + assert.Equal(t, uint(encodedDataSize), unconfirmedKey.Size) + assert.Equal(t, checksum, unconfirmedKey.Checksum) // Verify that data has the proper amount of randomness. if previousData != nil { diff --git a/tools/traffic/workers/key_handler.go b/tools/traffic/workers/key_handler.go deleted file mode 100644 index 30c8b5ed9c..0000000000 --- a/tools/traffic/workers/key_handler.go +++ /dev/null @@ -1,7 +0,0 @@ -package workers - -// KeyHandler is an interface describing an object that can accept unconfirmed keys. -type KeyHandler interface { - // AddUnconfirmedKey accepts an unconfirmed blob key, the checksum of the blob, and the size of the blob in bytes. - AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) -} diff --git a/tools/traffic/workers/mock_key_handler.go b/tools/traffic/workers/mock_key_handler.go deleted file mode 100644 index 2c48de995b..0000000000 --- a/tools/traffic/workers/mock_key_handler.go +++ /dev/null @@ -1,24 +0,0 @@ -package workers - -import ( - "github.com/stretchr/testify/mock" -) - -var _ KeyHandler = (*MockKeyHandler)(nil) - -// MockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. -type MockKeyHandler struct { - mock mock.Mock - - ProvidedKey []byte - ProvidedChecksum [16]byte - ProvidedSize uint -} - -func (m *MockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { - m.mock.Called(key, checksum, size) - - m.ProvidedKey = key - m.ProvidedChecksum = checksum - m.ProvidedSize = size -} diff --git a/tools/traffic/workers/unconfirmed_key.go b/tools/traffic/workers/unconfirmed_key.go new file mode 100644 index 0000000000..c86b8f1dcd --- /dev/null +++ b/tools/traffic/workers/unconfirmed_key.go @@ -0,0 +1,15 @@ +package workers + +import "time" + +// UnconfirmedKey is a Key that has not yet been confirmed by the disperser service. +type UnconfirmedKey struct { + // The Key of the blob. + Key []byte + // The Size of the blob in bytes. + Size uint + // The Checksum of the blob. + Checksum [16]byte + // The time the blob was submitted to the disperser service. + SubmissionTime time.Time +} From 24c942c21cd1d1a94af6d543f4b73087ec7698fb Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 22 Aug 2024 09:53:41 -0500 Subject: [PATCH 2/7] Made suggested changes. Signed-off-by: Cody Littley --- tools/traffic/table/blob_store.go | 14 +- tools/traffic/workers/blob_verifier.go | 148 ++++++++++++-------- tools/traffic/workers/blob_verifier_test.go | 12 +- 3 files changed, 103 insertions(+), 71 deletions(-) diff --git a/tools/traffic/table/blob_store.go b/tools/traffic/table/blob_store.go index a127bd663e..2218b3c0bd 100644 --- a/tools/traffic/table/blob_store.go +++ b/tools/traffic/table/blob_store.go @@ -11,7 +11,7 @@ type BlobStore struct { // nextKey describes the next key to used for the blobs map. nextKey uint64 - lock sync.Mutex + lock sync.RWMutex } // NewBlobStore creates a new BlobStore instance. @@ -35,8 +35,8 @@ func (store *BlobStore) Add(blob *BlobMetadata) { // from the store if the permits are exhausted. This method makes no guarantees about the order // in which blobs are returned. Returns nil if no blobs are available. func (store *BlobStore) GetNext() *BlobMetadata { - store.lock.Lock() - defer store.lock.Unlock() + store.lock.RLock() + defer store.lock.RUnlock() for key, blob := range store.blobs { // Always return the first blob found. @@ -55,16 +55,16 @@ func (store *BlobStore) GetNext() *BlobMetadata { // Size returns the number of blobs currently stored. func (store *BlobStore) Size() uint { - store.lock.Lock() - defer store.lock.Unlock() + store.lock.RLock() + defer store.lock.RUnlock() return uint(len(store.blobs)) } // GetAll returns all blobs currently stored. For testing purposes only. func (store *BlobStore) GetAll() []*BlobMetadata { - store.lock.Lock() - defer store.lock.Unlock() + store.lock.RLock() + defer store.lock.RUnlock() blobs := make([]*BlobMetadata, 0, len(store.blobs)) for _, blob := range store.blobs { diff --git a/tools/traffic/workers/blob_verifier.go b/tools/traffic/workers/blob_verifier.go index 6444aac98b..5323234ccf 100644 --- a/tools/traffic/workers/blob_verifier.go +++ b/tools/traffic/workers/blob_verifier.go @@ -4,7 +4,7 @@ import ( "context" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/api/grpc/disperser" - config2 "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigenda/tools/traffic/table" "github.com/Layr-Labs/eigensdk-go/logging" @@ -13,10 +13,10 @@ import ( "time" ) -// BlobVerifier periodically polls the disperser service to verify the status of blobs that were recently written. +// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. // When blobs become confirmed, the status verifier updates the blob blobsToRead accordingly. // This is a thread safe data structure. -type BlobVerifier struct { +type BlobStatusTracker struct { // The context for the generator. All work should cease when this context is cancelled. ctx *context.Context @@ -28,16 +28,16 @@ type BlobVerifier struct { logger logging.Logger // config contains the configuration for the generator. - config *config2.WorkerConfig + config *config.WorkerConfig - // A table of confirmed blobs. Blobs are added here when they are confirmed by the disperser service. - table *table.BlobStore + // Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service. + confirmedBlobs *table.BlobStore // The disperser client used to monitor the disperser service. - dispenser clients.DisperserClient + disperser clients.DisperserClient // The keys of blobs that have not yet been confirmed by the disperser service. - unconfirmedKeys []*UnconfirmedKey + unconfirmedBlobs []*UnconfirmedKey // Newly added keys that require verification. keyChannel chan *UnconfirmedKey @@ -55,26 +55,26 @@ type BlobVerifier struct { finalizedCountMetric metrics.CountMetric } -// NewBlobVerifier creates a new BlobVerifier instance. +// NewBlobVerifier creates a new BlobStatusTracker instance. func NewBlobVerifier( ctx *context.Context, waitGroup *sync.WaitGroup, logger logging.Logger, - config *config2.WorkerConfig, + config *config.WorkerConfig, keyChannel chan *UnconfirmedKey, table *table.BlobStore, disperser clients.DisperserClient, - generatorMetrics metrics.Metrics) BlobVerifier { + generatorMetrics metrics.Metrics) BlobStatusTracker { - return BlobVerifier{ + return BlobStatusTracker{ ctx: ctx, waitGroup: waitGroup, logger: logger, config: config, keyChannel: keyChannel, - table: table, - dispenser: disperser, - unconfirmedKeys: make([]*UnconfirmedKey, 0), + confirmedBlobs: table, + disperser: disperser, + unconfirmedBlobs: make([]*UnconfirmedKey, 0), blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"), getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"), confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"), @@ -91,13 +91,13 @@ func NewBlobVerifier( // Start begins the status goroutine, which periodically polls // the disperser service to verify the status of blobs. -func (verifier *BlobVerifier) Start() { +func (verifier *BlobStatusTracker) Start() { verifier.waitGroup.Add(1) go verifier.monitor() } // monitor periodically polls the disperser service to verify the status of blobs. -func (verifier *BlobVerifier) monitor() { +func (verifier *BlobStatusTracker) monitor() { ticker := time.NewTicker(verifier.config.VerifierInterval) for { select { @@ -105,7 +105,7 @@ func (verifier *BlobVerifier) monitor() { verifier.waitGroup.Done() return case key := <-verifier.keyChannel: - verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, key) + verifier.unconfirmedBlobs = append(verifier.unconfirmedBlobs, key) case <-ticker.C: verifier.poll() } @@ -113,76 +113,108 @@ func (verifier *BlobVerifier) monitor() { } // poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. -// If a Key is confirmed, it is added to the blob table and removed from the list of unconfirmed keys. -func (verifier *BlobVerifier) poll() { +// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys. +func (verifier *BlobStatusTracker) poll() { // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. // Revisit this method if there are performance problems. - unconfirmedKeys := make([]*UnconfirmedKey, 0) - for _, key := range verifier.unconfirmedKeys { - confirmed := verifier.checkStatusForBlob(key) - if !confirmed { - unconfirmedKeys = append(unconfirmedKeys, key) + nonFinalBlobs := make([]*UnconfirmedKey, 0) + for _, key := range verifier.unconfirmedBlobs { + + blobStatus := verifier.getBlobStatus(key) + if blobStatus == nil { + // There was an error getting status. Try again later. + nonFinalBlobs = append(nonFinalBlobs, key) + continue + } + + verifier.updateStatusMetrics(blobStatus.Status) + if isBlobStatusTerminal(blobStatus.Status) { + if isBlobStatusConfirmed(blobStatus.Status) { + verifier.forwardToReader(key, blobStatus) + } + } else { + // try again later + nonFinalBlobs = append(nonFinalBlobs, key) } } - verifier.unconfirmedKeys = unconfirmedKeys - verifier.blobsInFlightMetric.Set(float64(len(verifier.unconfirmedKeys))) + verifier.unconfirmedBlobs = nonFinalBlobs + verifier.blobsInFlightMetric.Set(float64(len(verifier.unconfirmedBlobs))) } -// checkStatusForBlob checks the status of a blob. Returns true if the final blob status is known, false otherwise. -func (verifier *BlobVerifier) checkStatusForBlob(key *UnconfirmedKey) bool { - - ctxTimeout, cancel := context.WithTimeout(*verifier.ctx, verifier.config.GetBlobStatusTimeout) - defer cancel() - - status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](verifier.getStatusLatencyMetric, - func() (*disperser.BlobStatusReply, error) { - return verifier.dispenser.GetBlobStatus(ctxTimeout, key.Key) - }) - - if err != nil { - verifier.logger.Error("failed check blob status", "err:", err) - verifier.getStatusErrorCountMetric.Increment() +// isBlobStatusTerminal returns true if the status is a terminal status. +func isBlobStatusTerminal(status disperser.BlobStatus) bool { + switch status { + case disperser.BlobStatus_FAILED: + return true + case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: + return true + case disperser.BlobStatus_CONFIRMED: + return true + case disperser.BlobStatus_FINALIZED: + return true + default: return false } +} - switch status.GetStatus() { +// isBlobStatusConfirmed returns true if the status is a confirmed status. +func isBlobStatusConfirmed(status disperser.BlobStatus) bool { + switch status { + case disperser.BlobStatus_CONFIRMED: + return true + case disperser.BlobStatus_FINALIZED: + return true + default: + return false + } +} +// updateStatusMetrics updates the metrics for the reported status of a blob. +func (verifier *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { + switch status { case disperser.BlobStatus_UNKNOWN: verifier.unknownCountMetric.Increment() - return false case disperser.BlobStatus_PROCESSING: verifier.processingCountMetric.Increment() - return false case disperser.BlobStatus_DISPERSING: verifier.dispersingCountMetric.Increment() - return false - case disperser.BlobStatus_FAILED: verifier.failedCountMetric.Increment() - return true case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: verifier.insufficientSignaturesCountMetric.Increment() - return true - case disperser.BlobStatus_CONFIRMED: verifier.confirmedCountMetric.Increment() - verifier.forwardToReader(key, status) - return true case disperser.BlobStatus_FINALIZED: verifier.finalizedCountMetric.Increment() - verifier.forwardToReader(key, status) - return true - default: - verifier.logger.Error("unknown blob status", "status:", status.GetStatus()) - return true + verifier.logger.Error("unknown blob status", "status:", status) } } +// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error +// getting the status. +func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) *disperser.BlobStatusReply { + ctxTimeout, cancel := context.WithTimeout(*verifier.ctx, verifier.config.GetBlobStatusTimeout) + defer cancel() + + status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](verifier.getStatusLatencyMetric, + func() (*disperser.BlobStatusReply, error) { + return verifier.disperser.GetBlobStatus(ctxTimeout, key.Key) + }) + + if err != nil { + verifier.logger.Error("failed check blob status", "err:", err) + verifier.getStatusErrorCountMetric.Increment() + return nil + } + + return status +} + // forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. -func (verifier *BlobVerifier) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { +func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { batchHeaderHash := status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() @@ -217,5 +249,5 @@ func (verifier *BlobVerifier) forwardToReader(key *UnconfirmedKey, status *dispe verifier.logger.Error("failed to create blob metadata", "err:", err) return } - verifier.table.Add(blobMetadata) + verifier.confirmedBlobs.Add(blobMetadata) } diff --git a/tools/traffic/workers/blob_verifier_test.go b/tools/traffic/workers/blob_verifier_test.go index 666aee1a83..7eea5a413c 100644 --- a/tools/traffic/workers/blob_verifier_test.go +++ b/tools/traffic/workers/blob_verifier_test.go @@ -117,7 +117,7 @@ func TestBlobVerifier(t *testing.T) { SubmissionTime: time.Now(), } - verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, unconfirmedKey) + verifier.unconfirmedBlobs = append(verifier.unconfirmedBlobs, unconfirmedKey) } // Reset the mock disperser client. @@ -158,7 +158,7 @@ func TestBlobVerifier(t *testing.T) { // Validate the number of calls made to the disperser client. disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) - // Read the data in the table into a map for quick lookup. + // Read the data in the confirmedBlobs into a map for quick lookup. tableData := make(map[string]*table.BlobMetadata) for _, metadata := range blobStore.GetAll() { tableData[string(metadata.Key)] = metadata @@ -173,10 +173,10 @@ func TestBlobVerifier(t *testing.T) { } if isStatusSuccess(status) { - // Successful blobs should be in the table. + // Successful blobs should be in the confirmedBlobs. assert.True(t, present) } else { - // Non-successful blobs should not be in the table. + // Non-successful blobs should not be in the confirmedBlobs. assert.False(t, present) } @@ -189,9 +189,9 @@ func TestBlobVerifier(t *testing.T) { } // Verify metrics. - for status, count := range statusCounts { + for status, count := range statusCounts { // TODO metricName := fmt.Sprintf("get_status_%s", status.String()) - assert.Equal(t, float64(count), verifierMetrics.GetCount(metricName)) + assert.Equal(t, float64(count), verifierMetrics.GetCount(metricName), "status: %s", status.String()) } if float64(blobsInFlight) != verifierMetrics.GetGaugeValue("blobs_in_flight") { assert.Equal(t, float64(blobsInFlight), verifierMetrics.GetGaugeValue("blobs_in_flight")) From 97ee14c749ae8eca50c33e1193235160f0da3839 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 22 Aug 2024 09:54:56 -0500 Subject: [PATCH 3/7] Rename files. Signed-off-by: Cody Littley --- .../traffic/workers/{blob_verifier.go => blob_status_tracker.go} | 0 .../{blob_verifier_test.go => blob_status_tracker_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tools/traffic/workers/{blob_verifier.go => blob_status_tracker.go} (100%) rename tools/traffic/workers/{blob_verifier_test.go => blob_status_tracker_test.go} (100%) diff --git a/tools/traffic/workers/blob_verifier.go b/tools/traffic/workers/blob_status_tracker.go similarity index 100% rename from tools/traffic/workers/blob_verifier.go rename to tools/traffic/workers/blob_status_tracker.go diff --git a/tools/traffic/workers/blob_verifier_test.go b/tools/traffic/workers/blob_status_tracker_test.go similarity index 100% rename from tools/traffic/workers/blob_verifier_test.go rename to tools/traffic/workers/blob_status_tracker_test.go From 60d5dad72e1c2330176caf0747942c90e3ed5882 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Tue, 27 Aug 2024 08:45:10 -0500 Subject: [PATCH 4/7] Fix unit test flake. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_status_tracker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go index 7eea5a413c..4036c1e87b 100644 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ b/tools/traffic/workers/blob_status_tracker_test.go @@ -62,7 +62,7 @@ func TestBlobVerifier(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.Nil(t, err) - requiredDownloads := rand.Intn(10) + requiredDownloads := rand.Intn(10) + 1 config := &config.WorkerConfig{ RequiredDownloads: float64(requiredDownloads), } From 51d38ad334446329b2576b84567acf08b2e0b36d Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 29 Aug 2024 14:36:08 -0500 Subject: [PATCH 5/7] Made suggested changes. Signed-off-by: Cody Littley --- tools/traffic/table/blob_metadata.go | 4 ++-- tools/traffic/workers/blob_status_tracker.go | 19 +++++++++++-------- .../workers/blob_status_tracker_test.go | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go index 90dc12bd0a..40a0b415e9 100644 --- a/tools/traffic/table/blob_metadata.go +++ b/tools/traffic/table/blob_metadata.go @@ -11,7 +11,7 @@ type BlobMetadata struct { BlobIndex uint // Hash of the batch header that the blob was written in. - BatchHeaderHash []byte + BatchHeaderHash [32]byte // Checksum of the blob. Checksum [16]byte @@ -31,7 +31,7 @@ func NewBlobMetadata( checksum [16]byte, size uint, blobIndex uint, - batchHeaderHash []byte, + batchHeaderHash [32]byte, readPermits int) (*BlobMetadata, error) { if readPermits == 0 { diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go index 5323234ccf..14fd2d539d 100644 --- a/tools/traffic/workers/blob_status_tracker.go +++ b/tools/traffic/workers/blob_status_tracker.go @@ -122,8 +122,9 @@ func (verifier *BlobStatusTracker) poll() { nonFinalBlobs := make([]*UnconfirmedKey, 0) for _, key := range verifier.unconfirmedBlobs { - blobStatus := verifier.getBlobStatus(key) - if blobStatus == nil { + blobStatus, err := verifier.getBlobStatus(key) + if err != nil { + verifier.logger.Error("failed to get blob status: ", "err:", err) // There was an error getting status. Try again later. nonFinalBlobs = append(nonFinalBlobs, key) continue @@ -151,6 +152,9 @@ func isBlobStatusTerminal(status disperser.BlobStatus) bool { case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: return true case disperser.BlobStatus_CONFIRMED: + // Technically this isn't terminal, as confirmed blobs eventually should become finalized. + // But it is terminal from the status tracker's perspective, since we stop tracking the blob + // once it becomes either confirmed or finalized. return true case disperser.BlobStatus_FINALIZED: return true @@ -195,7 +199,7 @@ func (verifier *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStat // getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error // getting the status. -func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) *disperser.BlobStatusReply { +func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { ctxTimeout, cancel := context.WithTimeout(*verifier.ctx, verifier.config.GetBlobStatusTimeout) defer cancel() @@ -205,17 +209,16 @@ func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) *disperser }) if err != nil { - verifier.logger.Error("failed check blob status", "err:", err) verifier.getStatusErrorCountMetric.Increment() - return nil + return nil, err } - return status + return status, nil } // forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { - batchHeaderHash := status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash + batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash) blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() confirmationTime := time.Now() @@ -227,7 +230,7 @@ func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status * if requiredDownloads <= 0 { // Allow unlimited downloads. downloadCount = -1 - } else if requiredDownloads == 0 { + } else if requiredDownloads <= 0 { // Do not download blob. return } else if requiredDownloads < 1 { diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go index 4036c1e87b..b730ca1fa3 100644 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ b/tools/traffic/workers/blob_status_tracker_test.go @@ -145,7 +145,7 @@ func TestBlobVerifier(t *testing.T) { Info: &disperser_rpc.BlobInfo{ BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ BatchMetadata: &disperser_rpc.BatchMetadata{ - BatchHeaderHash: make([]byte, 0), + BatchHeaderHash: make([]byte, 32), }, }, }, From aef5d55075e9c9ad265152f6450eb80993fbaac8 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 29 Aug 2024 14:39:34 -0500 Subject: [PATCH 6/7] Bugfixes. Signed-off-by: Cody Littley --- tools/traffic/table/blob_store_test.go | 3 ++- tools/traffic/workers/blob_status_tracker.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go index 82d30a815c..99c9c91a02 100644 --- a/tools/traffic/table/blob_store_test.go +++ b/tools/traffic/table/blob_store_test.go @@ -13,7 +13,8 @@ func randomMetadata(t *testing.T, permits int) *BlobMetadata { checksum := [16]byte{} _, _ = rand.Read(key) _, _ = rand.Read(checksum[:]) - metadata, err := NewBlobMetadata(key, checksum, 1024, 0, nil, permits) + batchHeaderHash := [32]byte{} + metadata, err := NewBlobMetadata(key, checksum, 1024, 0, batchHeaderHash, permits) assert.Nil(t, err) return metadata diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go index 14fd2d539d..f3519fa619 100644 --- a/tools/traffic/workers/blob_status_tracker.go +++ b/tools/traffic/workers/blob_status_tracker.go @@ -227,10 +227,10 @@ func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status * requiredDownloads := verifier.config.RequiredDownloads var downloadCount int32 - if requiredDownloads <= 0 { + if requiredDownloads < 0 { // Allow unlimited downloads. downloadCount = -1 - } else if requiredDownloads <= 0 { + } else if requiredDownloads == 0 { // Do not download blob. return } else if requiredDownloads < 1 { From b7df5a2fec946f4195dc0682eb1960fc90b4e04e Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Fri, 30 Aug 2024 10:55:26 -0500 Subject: [PATCH 7/7] Made suggested changes. Signed-off-by: Cody Littley --- tools/traffic/config/config.go | 16 ++-- tools/traffic/config/worker_config.go | 8 +- tools/traffic/workers/blob_status_tracker.go | 80 +++++++++---------- .../workers/blob_status_tracker_test.go | 20 ++--- 4 files changed, 62 insertions(+), 62 deletions(-) diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 63a1630dc2..82c7251b8f 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -93,16 +93,16 @@ func NewConfig(ctx *cli.Context) (*Config, error) { RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name), WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name), - VerifierInterval: ctx.Duration(VerifierIntervalFlag.Name), + TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name), GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name), - NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name), - ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name), - RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name), - ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name), - FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name), - RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), - VerificationChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), + NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name), + ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name), + RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name), + ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name), + FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name), + RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), + StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr, SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go index 84c00060d5..322d07aac7 100644 --- a/tools/traffic/config/worker_config.go +++ b/tools/traffic/config/worker_config.go @@ -16,12 +16,12 @@ type WorkerConfig struct { // The amount of time to wait for a blob to be written. WriteTimeout time.Duration - // The amount of time between attempts by the verifier to confirm the status of blobs. - VerifierInterval time.Duration + // The amount of time between attempts by the status tracker to confirm the status of blobs. + TrackerInterval time.Duration // The amount of time to wait for a blob status to be fetched. GetBlobStatusTimeout time.Duration - // The size of the channel used to communicate between the writer and verifier. - VerificationChannelCapacity uint + // The size of the channel used to communicate between the writer and status tracker. + StatusTrackerChannelCapacity uint // The number of worker threads that generate read traffic. NumReadInstances uint diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go index f3519fa619..cad394af44 100644 --- a/tools/traffic/workers/blob_status_tracker.go +++ b/tools/traffic/workers/blob_status_tracker.go @@ -14,7 +14,7 @@ import ( ) // BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. -// When blobs become confirmed, the status verifier updates the blob blobsToRead accordingly. +// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly. // This is a thread safe data structure. type BlobStatusTracker struct { @@ -55,8 +55,8 @@ type BlobStatusTracker struct { finalizedCountMetric metrics.CountMetric } -// NewBlobVerifier creates a new BlobStatusTracker instance. -func NewBlobVerifier( +// NewBlobStatusTracker creates a new BlobStatusTracker instance. +func NewBlobStatusTracker( ctx *context.Context, waitGroup *sync.WaitGroup, logger logging.Logger, @@ -91,57 +91,57 @@ func NewBlobVerifier( // Start begins the status goroutine, which periodically polls // the disperser service to verify the status of blobs. -func (verifier *BlobStatusTracker) Start() { - verifier.waitGroup.Add(1) - go verifier.monitor() +func (tracker *BlobStatusTracker) Start() { + tracker.waitGroup.Add(1) + go tracker.monitor() } // monitor periodically polls the disperser service to verify the status of blobs. -func (verifier *BlobStatusTracker) monitor() { - ticker := time.NewTicker(verifier.config.VerifierInterval) +func (tracker *BlobStatusTracker) monitor() { + ticker := time.NewTicker(tracker.config.TrackerInterval) for { select { - case <-(*verifier.ctx).Done(): - verifier.waitGroup.Done() + case <-(*tracker.ctx).Done(): + tracker.waitGroup.Done() return - case key := <-verifier.keyChannel: - verifier.unconfirmedBlobs = append(verifier.unconfirmedBlobs, key) + case key := <-tracker.keyChannel: + tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key) case <-ticker.C: - verifier.poll() + tracker.poll() } } } // poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. // If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys. -func (verifier *BlobStatusTracker) poll() { +func (tracker *BlobStatusTracker) poll() { // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. // Revisit this method if there are performance problems. nonFinalBlobs := make([]*UnconfirmedKey, 0) - for _, key := range verifier.unconfirmedBlobs { + for _, key := range tracker.unconfirmedBlobs { - blobStatus, err := verifier.getBlobStatus(key) + blobStatus, err := tracker.getBlobStatus(key) if err != nil { - verifier.logger.Error("failed to get blob status: ", "err:", err) + tracker.logger.Error("failed to get blob status: ", "err:", err) // There was an error getting status. Try again later. nonFinalBlobs = append(nonFinalBlobs, key) continue } - verifier.updateStatusMetrics(blobStatus.Status) + tracker.updateStatusMetrics(blobStatus.Status) if isBlobStatusTerminal(blobStatus.Status) { if isBlobStatusConfirmed(blobStatus.Status) { - verifier.forwardToReader(key, blobStatus) + tracker.forwardToReader(key, blobStatus) } } else { // try again later nonFinalBlobs = append(nonFinalBlobs, key) } } - verifier.unconfirmedBlobs = nonFinalBlobs - verifier.blobsInFlightMetric.Set(float64(len(verifier.unconfirmedBlobs))) + tracker.unconfirmedBlobs = nonFinalBlobs + tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs))) } // isBlobStatusTerminal returns true if the status is a terminal status. @@ -176,40 +176,40 @@ func isBlobStatusConfirmed(status disperser.BlobStatus) bool { } // updateStatusMetrics updates the metrics for the reported status of a blob. -func (verifier *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { +func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { switch status { case disperser.BlobStatus_UNKNOWN: - verifier.unknownCountMetric.Increment() + tracker.unknownCountMetric.Increment() case disperser.BlobStatus_PROCESSING: - verifier.processingCountMetric.Increment() + tracker.processingCountMetric.Increment() case disperser.BlobStatus_DISPERSING: - verifier.dispersingCountMetric.Increment() + tracker.dispersingCountMetric.Increment() case disperser.BlobStatus_FAILED: - verifier.failedCountMetric.Increment() + tracker.failedCountMetric.Increment() case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - verifier.insufficientSignaturesCountMetric.Increment() + tracker.insufficientSignaturesCountMetric.Increment() case disperser.BlobStatus_CONFIRMED: - verifier.confirmedCountMetric.Increment() + tracker.confirmedCountMetric.Increment() case disperser.BlobStatus_FINALIZED: - verifier.finalizedCountMetric.Increment() + tracker.finalizedCountMetric.Increment() default: - verifier.logger.Error("unknown blob status", "status:", status) + tracker.logger.Error("unknown blob status", "status:", status) } } // getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error // getting the status. -func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { - ctxTimeout, cancel := context.WithTimeout(*verifier.ctx, verifier.config.GetBlobStatusTimeout) +func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { + ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout) defer cancel() - status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](verifier.getStatusLatencyMetric, + status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](tracker.getStatusLatencyMetric, func() (*disperser.BlobStatusReply, error) { - return verifier.disperser.GetBlobStatus(ctxTimeout, key.Key) + return tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) }) if err != nil { - verifier.getStatusErrorCountMetric.Increment() + tracker.getStatusErrorCountMetric.Increment() return nil, err } @@ -217,15 +217,15 @@ func (verifier *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperse } // forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. -func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { +func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash) blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() confirmationTime := time.Now() confirmationLatency := confirmationTime.Sub(key.SubmissionTime) - verifier.confirmationLatencyMetric.ReportLatency(confirmationLatency) + tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency) - requiredDownloads := verifier.config.RequiredDownloads + requiredDownloads := tracker.config.RequiredDownloads var downloadCount int32 if requiredDownloads < 0 { // Allow unlimited downloads. @@ -249,8 +249,8 @@ func (verifier *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status * blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount)) if err != nil { - verifier.logger.Error("failed to create blob metadata", "err:", err) + tracker.logger.Error("failed to create blob metadata", "err:", err) return } - verifier.confirmedBlobs.Add(blobMetadata) + tracker.confirmedBlobs.Add(blobMetadata) } diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go index b730ca1fa3..9246e2052c 100644 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ b/tools/traffic/workers/blob_status_tracker_test.go @@ -54,7 +54,7 @@ func isStatusSuccess(status disperser_rpc.BlobStatus) bool { } } -func TestBlobVerifier(t *testing.T) { +func TestStatusTracker(t *testing.T) { tu.InitializeRandom() ctx, cancel := context.WithCancel(context.Background()) @@ -69,11 +69,11 @@ func TestBlobVerifier(t *testing.T) { blobStore := table.NewBlobStore() - verifierMetrics := metrics.NewMockMetrics() + trackerMetrics := metrics.NewMockMetrics() disperserClient := &MockDisperserClient{} - verifier := NewBlobVerifier( + tracker := NewBlobStatusTracker( &ctx, &waitGroup, logger, @@ -81,7 +81,7 @@ func TestBlobVerifier(t *testing.T) { make(chan *UnconfirmedKey), blobStore, disperserClient, - verifierMetrics) + trackerMetrics) expectedGetStatusCount := 0 statusCounts := make(map[disperser_rpc.BlobStatus]int) @@ -117,7 +117,7 @@ func TestBlobVerifier(t *testing.T) { SubmissionTime: time.Now(), } - verifier.unconfirmedBlobs = append(verifier.unconfirmedBlobs, unconfirmedKey) + tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, unconfirmedKey) } // Reset the mock disperser client. @@ -152,8 +152,8 @@ func TestBlobVerifier(t *testing.T) { }, nil) } - // Simulate advancement of time, allowing the verifier to process the new keys. - verifier.poll() + // Simulate advancement of time, allowing the tracker to process the new keys. + tracker.poll() // Validate the number of calls made to the disperser client. disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) @@ -191,10 +191,10 @@ func TestBlobVerifier(t *testing.T) { // Verify metrics. for status, count := range statusCounts { // TODO metricName := fmt.Sprintf("get_status_%s", status.String()) - assert.Equal(t, float64(count), verifierMetrics.GetCount(metricName), "status: %s", status.String()) + assert.Equal(t, float64(count), trackerMetrics.GetCount(metricName), "status: %s", status.String()) } - if float64(blobsInFlight) != verifierMetrics.GetGaugeValue("blobs_in_flight") { - assert.Equal(t, float64(blobsInFlight), verifierMetrics.GetGaugeValue("blobs_in_flight")) + if float64(blobsInFlight) != trackerMetrics.GetGaugeValue("blobs_in_flight") { + assert.Equal(t, float64(blobsInFlight), trackerMetrics.GetGaugeValue("blobs_in_flight")) } }