Skip to content

Commit

Permalink
Created blob verifier worker. (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Sep 3, 2024
1 parent 5618e74 commit 57ef5f7
Show file tree
Hide file tree
Showing 12 changed files with 534 additions and 63 deletions.
16 changes: 8 additions & 8 deletions tools/traffic/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name),
WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name),

VerifierInterval: ctx.Duration(VerifierIntervalFlag.Name),
TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name),
GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name),

NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
VerificationChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),
NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),

EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),
Expand Down
8 changes: 4 additions & 4 deletions tools/traffic/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ type WorkerConfig struct {
// The amount of time to wait for a blob to be written.
WriteTimeout time.Duration

// The amount of time between attempts by the verifier to confirm the status of blobs.
VerifierInterval time.Duration
// The amount of time between attempts by the status tracker to confirm the status of blobs.
TrackerInterval time.Duration
// The amount of time to wait for a blob status to be fetched.
GetBlobStatusTimeout time.Duration
// The size of the channel used to communicate between the writer and verifier.
VerificationChannelCapacity uint
// The size of the channel used to communicate between the writer and status tracker.
StatusTrackerChannelCapacity uint

// The number of worker threads that generate read traffic.
NumReadInstances uint
Expand Down
5 changes: 5 additions & 0 deletions tools/traffic/table/blob_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlobMetadata struct {
// BlobIndex of the blob.
BlobIndex uint

// Hash of the batch header that the blob was written in.
BatchHeaderHash [32]byte

// Checksum of the blob.
Checksum [16]byte

Expand All @@ -28,6 +31,7 @@ func NewBlobMetadata(
checksum [16]byte,
size uint,
blobIndex uint,
batchHeaderHash [32]byte,
readPermits int) (*BlobMetadata, error) {

if readPermits == 0 {
Expand All @@ -39,6 +43,7 @@ func NewBlobMetadata(
Checksum: checksum,
Size: size,
BlobIndex: blobIndex,
BatchHeaderHash: batchHeaderHash,
RemainingReadPermits: readPermits,
}, nil
}
22 changes: 17 additions & 5 deletions tools/traffic/table/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type BlobStore struct {
// nextKey describes the next key to used for the blobs map.
nextKey uint64

lock sync.Mutex
lock sync.RWMutex
}

// NewBlobStore creates a new BlobStore instance.
Expand All @@ -35,8 +35,8 @@ func (store *BlobStore) Add(blob *BlobMetadata) {
// from the store if the permits are exhausted. This method makes no guarantees about the order
// in which blobs are returned. Returns nil if no blobs are available.
func (store *BlobStore) GetNext() *BlobMetadata {
store.lock.Lock()
defer store.lock.Unlock()
store.lock.RLock()
defer store.lock.RUnlock()

for key, blob := range store.blobs {
// Always return the first blob found.
Expand All @@ -55,8 +55,20 @@ func (store *BlobStore) GetNext() *BlobMetadata {

// Size returns the number of blobs currently stored.
func (store *BlobStore) Size() uint {
store.lock.Lock()
defer store.lock.Unlock()
store.lock.RLock()
defer store.lock.RUnlock()

return uint(len(store.blobs))
}

// GetAll returns all blobs currently stored. For testing purposes only.
func (store *BlobStore) GetAll() []*BlobMetadata {
store.lock.RLock()
defer store.lock.RUnlock()

blobs := make([]*BlobMetadata, 0, len(store.blobs))
for _, blob := range store.blobs {
blobs = append(blobs, blob)
}
return blobs
}
3 changes: 2 additions & 1 deletion tools/traffic/table/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func randomMetadata(t *testing.T, permits int) *BlobMetadata {
checksum := [16]byte{}
_, _ = rand.Read(key)
_, _ = rand.Read(checksum[:])
metadata, err := NewBlobMetadata(key, checksum, 1024, 0, permits)
batchHeaderHash := [32]byte{}
metadata, err := NewBlobMetadata(key, checksum, 1024, 0, batchHeaderHash, permits)
assert.Nil(t, err)

return metadata
Expand Down
256 changes: 256 additions & 0 deletions tools/traffic/workers/blob_status_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package workers

import (
"context"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigenda/tools/traffic/table"
"github.com/Layr-Labs/eigensdk-go/logging"
"math/rand"
"sync"
"time"
)

// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written.
// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly.
// This is a thread safe data structure.
type BlobStatusTracker struct {

// The context for the generator. All work should cease when this context is cancelled.
ctx *context.Context

// Tracks the number of active goroutines within the generator.
waitGroup *sync.WaitGroup

// All logs should be written using this logger.
logger logging.Logger

// config contains the configuration for the generator.
config *config.WorkerConfig

// Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service.
confirmedBlobs *table.BlobStore

// The disperser client used to monitor the disperser service.
disperser clients.DisperserClient

// The keys of blobs that have not yet been confirmed by the disperser service.
unconfirmedBlobs []*UnconfirmedKey

// Newly added keys that require verification.
keyChannel chan *UnconfirmedKey

blobsInFlightMetric metrics.GaugeMetric
getStatusLatencyMetric metrics.LatencyMetric
confirmationLatencyMetric metrics.LatencyMetric
getStatusErrorCountMetric metrics.CountMetric
unknownCountMetric metrics.CountMetric
processingCountMetric metrics.CountMetric
dispersingCountMetric metrics.CountMetric
failedCountMetric metrics.CountMetric
insufficientSignaturesCountMetric metrics.CountMetric
confirmedCountMetric metrics.CountMetric
finalizedCountMetric metrics.CountMetric
}

// NewBlobStatusTracker creates a new BlobStatusTracker instance.
func NewBlobStatusTracker(
ctx *context.Context,
waitGroup *sync.WaitGroup,
logger logging.Logger,
config *config.WorkerConfig,
keyChannel chan *UnconfirmedKey,
table *table.BlobStore,
disperser clients.DisperserClient,
generatorMetrics metrics.Metrics) BlobStatusTracker {

return BlobStatusTracker{
ctx: ctx,
waitGroup: waitGroup,
logger: logger,
config: config,
keyChannel: keyChannel,
confirmedBlobs: table,
disperser: disperser,
unconfirmedBlobs: make([]*UnconfirmedKey, 0),
blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"),
getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"),
confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"),
getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"),
unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"),
processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"),
dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"),
failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"),
insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"),
confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"),
finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"),
}
}

// Start begins the status goroutine, which periodically polls
// the disperser service to verify the status of blobs.
func (tracker *BlobStatusTracker) Start() {
tracker.waitGroup.Add(1)
go tracker.monitor()
}

// monitor periodically polls the disperser service to verify the status of blobs.
func (tracker *BlobStatusTracker) monitor() {
ticker := time.NewTicker(tracker.config.TrackerInterval)
for {
select {
case <-(*tracker.ctx).Done():
tracker.waitGroup.Done()
return
case key := <-tracker.keyChannel:
tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key)
case <-ticker.C:
tracker.poll()
}
}
}

// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service.
// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys.
func (tracker *BlobStatusTracker) poll() {

// FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient.
// Revisit this method if there are performance problems.

nonFinalBlobs := make([]*UnconfirmedKey, 0)
for _, key := range tracker.unconfirmedBlobs {

blobStatus, err := tracker.getBlobStatus(key)
if err != nil {
tracker.logger.Error("failed to get blob status: ", "err:", err)
// There was an error getting status. Try again later.
nonFinalBlobs = append(nonFinalBlobs, key)
continue
}

tracker.updateStatusMetrics(blobStatus.Status)
if isBlobStatusTerminal(blobStatus.Status) {
if isBlobStatusConfirmed(blobStatus.Status) {
tracker.forwardToReader(key, blobStatus)
}
} else {
// try again later
nonFinalBlobs = append(nonFinalBlobs, key)
}
}
tracker.unconfirmedBlobs = nonFinalBlobs
tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs)))
}

// isBlobStatusTerminal returns true if the status is a terminal status.
func isBlobStatusTerminal(status disperser.BlobStatus) bool {
switch status {
case disperser.BlobStatus_FAILED:
return true
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES:
return true
case disperser.BlobStatus_CONFIRMED:
// Technically this isn't terminal, as confirmed blobs eventually should become finalized.
// But it is terminal from the status tracker's perspective, since we stop tracking the blob
// once it becomes either confirmed or finalized.
return true
case disperser.BlobStatus_FINALIZED:
return true
default:
return false
}
}

// isBlobStatusConfirmed returns true if the status is a confirmed status.
func isBlobStatusConfirmed(status disperser.BlobStatus) bool {
switch status {
case disperser.BlobStatus_CONFIRMED:
return true
case disperser.BlobStatus_FINALIZED:
return true
default:
return false
}
}

// updateStatusMetrics updates the metrics for the reported status of a blob.
func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) {
switch status {
case disperser.BlobStatus_UNKNOWN:
tracker.unknownCountMetric.Increment()
case disperser.BlobStatus_PROCESSING:
tracker.processingCountMetric.Increment()
case disperser.BlobStatus_DISPERSING:
tracker.dispersingCountMetric.Increment()
case disperser.BlobStatus_FAILED:
tracker.failedCountMetric.Increment()
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES:
tracker.insufficientSignaturesCountMetric.Increment()
case disperser.BlobStatus_CONFIRMED:
tracker.confirmedCountMetric.Increment()
case disperser.BlobStatus_FINALIZED:
tracker.finalizedCountMetric.Increment()
default:
tracker.logger.Error("unknown blob status", "status:", status)
}
}

// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error
// getting the status.
func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) {
ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout)
defer cancel()

status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](tracker.getStatusLatencyMetric,
func() (*disperser.BlobStatusReply, error) {
return tracker.disperser.GetBlobStatus(ctxTimeout, key.Key)
})

if err != nil {
tracker.getStatusErrorCountMetric.Increment()
return nil, err
}

return status, nil
}

// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read.
func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) {
batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash)
blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex()

confirmationTime := time.Now()
confirmationLatency := confirmationTime.Sub(key.SubmissionTime)
tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency)

requiredDownloads := tracker.config.RequiredDownloads
var downloadCount int32
if requiredDownloads < 0 {
// Allow unlimited downloads.
downloadCount = -1
} else if requiredDownloads == 0 {
// Do not download blob.
return
} else if requiredDownloads < 1 {
// Download blob with probability equal to requiredDownloads.
if rand.Float64() < requiredDownloads {
// Download the blob once.
downloadCount = 1
} else {
// Do not download blob.
return
}
} else {
// Download blob requiredDownloads times.
downloadCount = int32(requiredDownloads)
}

blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount))
if err != nil {
tracker.logger.Error("failed to create blob metadata", "err:", err)
return
}
tracker.confirmedBlobs.Add(blobMetadata)
}
Loading

0 comments on commit 57ef5f7

Please sign in to comment.