Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created blob verifier worker. #708

Merged
merged 7 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions tools/traffic/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name),
WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name),

VerifierInterval: ctx.Duration(VerifierIntervalFlag.Name),
TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name),
GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name),

NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
VerificationChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),
NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),

EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),
Expand Down
8 changes: 4 additions & 4 deletions tools/traffic/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ type WorkerConfig struct {
// The amount of time to wait for a blob to be written.
WriteTimeout time.Duration

// The amount of time between attempts by the verifier to confirm the status of blobs.
VerifierInterval time.Duration
// The amount of time between attempts by the status tracker to confirm the status of blobs.
TrackerInterval time.Duration
// The amount of time to wait for a blob status to be fetched.
GetBlobStatusTimeout time.Duration
// The size of the channel used to communicate between the writer and verifier.
VerificationChannelCapacity uint
// The size of the channel used to communicate between the writer and status tracker.
StatusTrackerChannelCapacity uint

// The number of worker threads that generate read traffic.
NumReadInstances uint
Expand Down
5 changes: 5 additions & 0 deletions tools/traffic/table/blob_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlobMetadata struct {
// BlobIndex of the blob.
BlobIndex uint

// Hash of the batch header that the blob was written in.
BatchHeaderHash [32]byte

// Checksum of the blob.
Checksum [16]byte

Expand All @@ -28,6 +31,7 @@ func NewBlobMetadata(
checksum [16]byte,
size uint,
blobIndex uint,
batchHeaderHash [32]byte,
readPermits int) (*BlobMetadata, error) {

if readPermits == 0 {
Expand All @@ -39,6 +43,7 @@ func NewBlobMetadata(
Checksum: checksum,
Size: size,
BlobIndex: blobIndex,
BatchHeaderHash: batchHeaderHash,
RemainingReadPermits: readPermits,
}, nil
}
22 changes: 17 additions & 5 deletions tools/traffic/table/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type BlobStore struct {
// nextKey describes the next key to used for the blobs map.
nextKey uint64

lock sync.Mutex
lock sync.RWMutex
}

// NewBlobStore creates a new BlobStore instance.
Expand All @@ -35,8 +35,8 @@ func (store *BlobStore) Add(blob *BlobMetadata) {
// from the store if the permits are exhausted. This method makes no guarantees about the order
// in which blobs are returned. Returns nil if no blobs are available.
func (store *BlobStore) GetNext() *BlobMetadata {
store.lock.Lock()
defer store.lock.Unlock()
store.lock.RLock()
defer store.lock.RUnlock()

for key, blob := range store.blobs {
// Always return the first blob found.
Expand All @@ -55,8 +55,20 @@ func (store *BlobStore) GetNext() *BlobMetadata {

// Size returns the number of blobs currently stored.
func (store *BlobStore) Size() uint {
store.lock.Lock()
defer store.lock.Unlock()
store.lock.RLock()
defer store.lock.RUnlock()

return uint(len(store.blobs))
}

// GetAll returns all blobs currently stored. For testing purposes only.
func (store *BlobStore) GetAll() []*BlobMetadata {
store.lock.RLock()
defer store.lock.RUnlock()

blobs := make([]*BlobMetadata, 0, len(store.blobs))
for _, blob := range store.blobs {
blobs = append(blobs, blob)
}
return blobs
}
3 changes: 2 additions & 1 deletion tools/traffic/table/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func randomMetadata(t *testing.T, permits int) *BlobMetadata {
checksum := [16]byte{}
_, _ = rand.Read(key)
_, _ = rand.Read(checksum[:])
metadata, err := NewBlobMetadata(key, checksum, 1024, 0, permits)
batchHeaderHash := [32]byte{}
metadata, err := NewBlobMetadata(key, checksum, 1024, 0, batchHeaderHash, permits)
assert.Nil(t, err)

return metadata
Expand Down
256 changes: 256 additions & 0 deletions tools/traffic/workers/blob_status_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package workers

import (
"context"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigenda/tools/traffic/table"
"github.com/Layr-Labs/eigensdk-go/logging"
"math/rand"
"sync"
"time"
)

// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written.
// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly.
// This is a thread safe data structure.
type BlobStatusTracker struct {

// The context for the generator. All work should cease when this context is cancelled.
ctx *context.Context

// Tracks the number of active goroutines within the generator.
waitGroup *sync.WaitGroup

// All logs should be written using this logger.
logger logging.Logger

// config contains the configuration for the generator.
config *config.WorkerConfig

// Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service.
confirmedBlobs *table.BlobStore

// The disperser client used to monitor the disperser service.
disperser clients.DisperserClient

// The keys of blobs that have not yet been confirmed by the disperser service.
unconfirmedBlobs []*UnconfirmedKey

// Newly added keys that require verification.
keyChannel chan *UnconfirmedKey

blobsInFlightMetric metrics.GaugeMetric
getStatusLatencyMetric metrics.LatencyMetric
confirmationLatencyMetric metrics.LatencyMetric
getStatusErrorCountMetric metrics.CountMetric
unknownCountMetric metrics.CountMetric
processingCountMetric metrics.CountMetric
dispersingCountMetric metrics.CountMetric
failedCountMetric metrics.CountMetric
insufficientSignaturesCountMetric metrics.CountMetric
confirmedCountMetric metrics.CountMetric
finalizedCountMetric metrics.CountMetric
}

// NewBlobStatusTracker creates a new BlobStatusTracker instance.
func NewBlobStatusTracker(
ctx *context.Context,
waitGroup *sync.WaitGroup,
logger logging.Logger,
config *config.WorkerConfig,
keyChannel chan *UnconfirmedKey,
table *table.BlobStore,
disperser clients.DisperserClient,
generatorMetrics metrics.Metrics) BlobStatusTracker {

return BlobStatusTracker{
ctx: ctx,
waitGroup: waitGroup,
logger: logger,
config: config,
keyChannel: keyChannel,
confirmedBlobs: table,
disperser: disperser,
unconfirmedBlobs: make([]*UnconfirmedKey, 0),
blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"),
getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"),
confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"),
getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"),
unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"),
processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"),
dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"),
failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"),
insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"),
confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"),
finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"),
}
}

// Start begins the status goroutine, which periodically polls
// the disperser service to verify the status of blobs.
func (tracker *BlobStatusTracker) Start() {
tracker.waitGroup.Add(1)
go tracker.monitor()
}

// monitor periodically polls the disperser service to verify the status of blobs.
func (tracker *BlobStatusTracker) monitor() {
ticker := time.NewTicker(tracker.config.TrackerInterval)
for {
select {
case <-(*tracker.ctx).Done():
tracker.waitGroup.Done()
return
case key := <-tracker.keyChannel:
tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key)
case <-ticker.C:
tracker.poll()
}
}
}

// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service.
// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys.
func (tracker *BlobStatusTracker) poll() {

// FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient.
// Revisit this method if there are performance problems.

nonFinalBlobs := make([]*UnconfirmedKey, 0)
for _, key := range tracker.unconfirmedBlobs {

blobStatus, err := tracker.getBlobStatus(key)
if err != nil {
tracker.logger.Error("failed to get blob status: ", "err:", err)
// There was an error getting status. Try again later.
nonFinalBlobs = append(nonFinalBlobs, key)
continue
}

tracker.updateStatusMetrics(blobStatus.Status)
if isBlobStatusTerminal(blobStatus.Status) {
if isBlobStatusConfirmed(blobStatus.Status) {
tracker.forwardToReader(key, blobStatus)
}
} else {
// try again later
nonFinalBlobs = append(nonFinalBlobs, key)
}
}
tracker.unconfirmedBlobs = nonFinalBlobs
tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs)))
}

// isBlobStatusTerminal returns true if the status is a terminal status.
func isBlobStatusTerminal(status disperser.BlobStatus) bool {
switch status {
case disperser.BlobStatus_FAILED:
return true
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES:
return true
case disperser.BlobStatus_CONFIRMED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONFIRMED isn't a terminal state as it can be transitioned to FINALIZED or FAILED (if reorg'd)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, although the behavior is intentional in this context. Added a comment to clarify.

case disperser.BlobStatus_CONFIRMED:
		// Technically this isn't terminal, as confirmed blobs eventually should become finalized.
		// But it is terminal from the status tracker's perspective, since we stop tracking the blob
		// once it becomes either confirmed or finalized.
		return true

// Technically this isn't terminal, as confirmed blobs eventually should become finalized.
// But it is terminal from the status tracker's perspective, since we stop tracking the blob
// once it becomes either confirmed or finalized.
return true
case disperser.BlobStatus_FINALIZED:
return true
default:
return false
}
}

// isBlobStatusConfirmed returns true if the status is a confirmed status.
func isBlobStatusConfirmed(status disperser.BlobStatus) bool {
switch status {
case disperser.BlobStatus_CONFIRMED:
return true
case disperser.BlobStatus_FINALIZED:
return true
default:
return false
}
}

// updateStatusMetrics updates the metrics for the reported status of a blob.
func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) {
switch status {
case disperser.BlobStatus_UNKNOWN:
tracker.unknownCountMetric.Increment()
case disperser.BlobStatus_PROCESSING:
tracker.processingCountMetric.Increment()
case disperser.BlobStatus_DISPERSING:
tracker.dispersingCountMetric.Increment()
case disperser.BlobStatus_FAILED:
tracker.failedCountMetric.Increment()
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES:
tracker.insufficientSignaturesCountMetric.Increment()
case disperser.BlobStatus_CONFIRMED:
tracker.confirmedCountMetric.Increment()
case disperser.BlobStatus_FINALIZED:
tracker.finalizedCountMetric.Increment()
default:
tracker.logger.Error("unknown blob status", "status:", status)
}
}

// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error
// getting the status.
func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) {
ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout)
defer cancel()

status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](tracker.getStatusLatencyMetric,
func() (*disperser.BlobStatusReply, error) {
return tracker.disperser.GetBlobStatus(ctxTimeout, key.Key)
})

if err != nil {
tracker.getStatusErrorCountMetric.Increment()
return nil, err
}

return status, nil
}

// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read.
func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) {
batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash)
blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex()

confirmationTime := time.Now()
confirmationLatency := confirmationTime.Sub(key.SubmissionTime)
tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency)

requiredDownloads := tracker.config.RequiredDownloads
var downloadCount int32
if requiredDownloads < 0 {
// Allow unlimited downloads.
downloadCount = -1
} else if requiredDownloads == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look like this block will ever be triggered

Copy link
Contributor Author

@cody-littley cody-littley Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Do not download blob.
return
} else if requiredDownloads < 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with this condition. requiredDownloads <= 0 includes this and the above conditions?

Copy link
Contributor Author

@cody-littley cody-littley Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's fixed. If requiredDownloads < 1, it would have been handled on if requiredDownloads < 0 block above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's fixed. If requiredDownloads < 1, it would have been handled on if requiredDownloads < 0 block above

The requiredDownloads < 1 will trigger if the number of required downloads is between 0 and 1. It's a float64 value. Any number between 0 and 1 is interpreted as a probabilistic number of downloads.

// Download blob with probability equal to requiredDownloads.
if rand.Float64() < requiredDownloads {
// Download the blob once.
downloadCount = 1
} else {
// Do not download blob.
return
}
} else {
// Download blob requiredDownloads times.
downloadCount = int32(requiredDownloads)
}

blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount))
if err != nil {
tracker.logger.Error("failed to create blob metadata", "err:", err)
return
}
tracker.confirmedBlobs.Add(blobMetadata)
}
Loading
Loading