-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Created blob verifier worker. #708
Created blob verifier worker. #708
Conversation
Signed-off-by: Cody Littley <[email protected]>
tools/traffic/table/blob_store.go
Outdated
|
||
// GetAll returns all blobs currently stored. For testing purposes only. | ||
func (store *BlobStore) GetAll() []*BlobMetadata { | ||
store.lock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a RW lock so the read-only function can use reader lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"context" | ||
"github.com/Layr-Labs/eigenda/api/clients" | ||
"github.com/Layr-Labs/eigenda/api/grpc/disperser" | ||
config2 "github.com/Layr-Labs/eigenda/tools/traffic/config" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This alias seems not needed again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
config *config2.WorkerConfig | ||
|
||
// A table of confirmed blobs. Blobs are added here when they are confirmed by the disperser service. | ||
table *table.BlobStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table
-> confirmedBlobs
? It'd be more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
table *table.BlobStore | ||
|
||
// The disperser client used to monitor the disperser service. | ||
dispenser clients.DisperserClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: dispenser
-> disperser
Also the comment may be more accurate to say it's used to interact
rather than monitor
the disperser?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
dispenser clients.DisperserClient | ||
|
||
// The keys of blobs that have not yet been confirmed by the disperser service. | ||
unconfirmedKeys []*UnconfirmedKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unconfirmedBlobs
? (reads well when compared with the confirmedBlobs
above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Change made.
|
||
case disperser.BlobStatus_FAILED: | ||
verifier.failedCountMetric.Increment() | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is returning true, it looks what you want is isBlobStatusTerminal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually hard to find an accurate name for this function given that it does lots of different things. I've broken this down into a small number of more focused methods.
blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() | ||
|
||
confirmationTime := time.Now() | ||
confirmationLatency := confirmationTime.Sub(key.SubmissionTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some inaccuracy introduced by when the status was polled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This is one of those situations where it's nice data to have as long as you understand the limits of its precision, but the effort required to get more precise data is cost prohibitive. How do you suggest we handle this situation? Should I remove this metric, or is ok to leave it?
// BlobVerifier periodically polls the disperser service to verify the status of blobs that were recently written. | ||
// When blobs become confirmed, the status verifier updates the blob blobsToRead accordingly. | ||
// This is a thread safe data structure. | ||
type BlobVerifier struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the purpose of this struct, it'd probably more accurate to name it something like BlobStatusTracker
. It doesn't do any verification work from what I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
// Revisit this method if there are performance problems. | ||
|
||
unconfirmedKeys := make([]*UnconfirmedKey, 0) | ||
for _, key := range verifier.unconfirmedKeys { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perf idea suggestion: this loop may have tens of thousands blobs. I think we may add an API to Disperser for querying the statuses of a batch of blobs, which will be much more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good suggestion, although I think adding this API is probably a task that doesn't belong in this PR. Would you like me to take on that task and merge it prior to the traffic generator changes, or should this be something we circle back on later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea just pointing out the potential issue here. We don't need to bundle all changes in a single PR.
for { | ||
select { | ||
case <-(*verifier.ctx).Done(): | ||
verifier.waitGroup.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this just return? Not sure if this wait group is needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use the wait group to ensure that all goroutines have exited when stopping the traffic generator, see https://github.com/Layr-Labs/eigenda/pull/666/files#diff-f597b71e94b466f93ec0cc1f7cd21e7b9230cfe074e306569b125d035c794ce5R216
If you don't think this is a necessary precaution I can remove it.
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
tools/traffic/table/blob_metadata.go
Outdated
@@ -10,6 +10,9 @@ type BlobMetadata struct { | |||
// BlobIndex of the blob. | |||
BlobIndex uint | |||
|
|||
// Hash of the batch header that the blob was written in. | |||
BatchHeaderHash []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be [32]byte
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change made
for _, key := range verifier.unconfirmedBlobs { | ||
|
||
blobStatus := verifier.getBlobStatus(key) | ||
if blobStatus == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably log error as this case is unexpected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
if err != nil { | ||
verifier.logger.Error("failed check blob status", "err:", err) | ||
verifier.getStatusErrorCountMetric.Increment() | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we return err here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returned
if requiredDownloads <= 0 { | ||
// Allow unlimited downloads. | ||
downloadCount = -1 | ||
} else if requiredDownloads == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't look like this block will ever be triggered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} else if requiredDownloads == 0 { | ||
// Do not download blob. | ||
return | ||
} else if requiredDownloads < 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with this condition. requiredDownloads <= 0
includes this and the above conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's fixed. If requiredDownloads < 1
, it would have been handled on if requiredDownloads < 0
block above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's fixed. If requiredDownloads < 1, it would have been handled on if requiredDownloads < 0 block above
The requiredDownloads < 1
will trigger if the number of required downloads is between 0 and 1. It's a float64
value. Any number between 0 and 1 is interpreted as a probabilistic number of downloads.
return true | ||
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: | ||
return true | ||
case disperser.BlobStatus_CONFIRMED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONFIRMED isn't a terminal state as it can be transitioned to FINALIZED or FAILED (if reorg'd)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, although the behavior is intentional in this context. Added a comment to clarify.
case disperser.BlobStatus_CONFIRMED:
// Technically this isn't terminal, as confirmed blobs eventually should become finalized.
// But it is terminal from the status tracker's perspective, since we stop tracking the blob
// once it becomes either confirmed or finalized.
return true
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, one last comment
} else if requiredDownloads == 0 { | ||
// Do not download blob. | ||
return | ||
} else if requiredDownloads < 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's fixed. If requiredDownloads < 1
, it would have been handled on if requiredDownloads < 0
block above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly look good. A few comments, as I think it's still in the middle of the renaming.
) | ||
|
||
// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. | ||
// When blobs become confirmed, the status verifier updates the blob blobsToRead accordingly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: status verifier
-> status tracker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
} | ||
|
||
// NewBlobVerifier creates a new BlobStatusTracker instance. | ||
func NewBlobVerifier( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name it consistently as struct name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed naming
} | ||
|
||
// monitor periodically polls the disperser service to verify the status of blobs. | ||
func (verifier *BlobStatusTracker) monitor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verifier -> tracker. May be worth fixing others like VerifierInterval to PollInterval etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I searched for the string "verifier" and replaced all instances.
Signed-off-by: Cody Littley <[email protected]>
Why are these changes needed?
This is a fragment of a larger PR. This particular PR adds a new worker called the "blob verifier", whose job is to check on the status of a blob until it becomes confirmed/finalized by the disperser.
Checks