Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement core relay functionality #869

Merged
merged 63 commits into from
Nov 15, 2024

Conversation

cody-littley
Copy link
Contributor

@cody-littley cody-littley commented Nov 7, 2024

Why are these changes needed?

This PR adds core functionality for the relay server. The following features have intentionally been omitted, and will be merged as follow-up PRs:

  • main function entrypoint
  • rate limiting
  • request timeouts
  • benchmarks
  • authentication for GetChunks() requests
  • metrics

Checks

  • I've made sure the lint is passing in this PR.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
  • I've checked the new test coverage and the coverage percentage didn't drop.
  • Testing Strategy
    • Unit tests
    • Integration tests
    • This PR is not tested :(

Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
@cody-littley cody-littley self-assigned this Nov 7, 2024
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
relay/blob_manager.go Outdated Show resolved Hide resolved
relay/blob_manager.go Outdated Show resolved Hide resolved
fMap := make(frameMap, len(keys))
hadError := atomic.Bool{}

wg := sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about the wg and errgroup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor changed. Let me know if you'd like to move away from the errgroup abstraction, and I'll change both locations.

relay/chunk_manager.go Outdated Show resolved Hide resolved
relay/config.go Show resolved Hide resolved
relay/metadata_manager.go Outdated Show resolved Hide resolved
relay/metadata_manager.go Outdated Show resolved Hide resolved
@mooselumph mooselumph requested review from jianoaix and dmanc November 13, 2024 01:56
Copy link
Contributor

@dmanc dmanc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial review, will give it another go tomorrow

func (s *blobManager) GetBlob(blobKey v2.BlobKey) ([]byte, error) {

// Even though we don't need extra parallelism here, we still use the work pool to ensure that we don't
// permit too many concurrent requests to the blob store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there are too many concurrent requests to S3? I'm wondering if it could be worth using this blob manager on the encoder (although it doesn't need the caching part).

See:

data, err := s.blobStore.GetBlob(ctx, blobKey)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing catastrophic if there is too much S3 traffic, other than a potential slowdown. After a certain point, adding more goroutines attempting to pull data will result in a net reduction in capacity. Think of this sort of code as a defensive programming in case the node is overloaded with work.

relay/config.go Outdated
// serve data for any shard it can.
Shards []core.RelayKey

// MetadataCacheSize is the size of the metadata cache. Default is 1024 * 1024.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some units to the config descriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description added.

// MetadataCacheSize is the size of the metadata cache. Current cache implementation is unaware of data sizes, and 
	// so this is a total count, not a size in bytes. Default is 1024 * 1024.
	MetadataCacheSize int

@cody-littley cody-littley mentioned this pull request Nov 13, 2024
5 tasks
Signed-off-by: Cody Littley <[email protected]>
api/proto/common/common.proto Show resolved Hide resolved
relay/server.go Outdated Show resolved Hide resolved
relay/server.go Show resolved Hide resolved
relay/blob_manager.go Outdated Show resolved Hide resolved
@cody-littley
Copy link
Contributor Author

hmm don't see it yet. Has it been pushed?

@ian-shim Check again. Forgot to push. 🙃

Signed-off-by: Cody Littley <[email protected]>
Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more comments. LGTM otherwise

type frameMap map[v2.BlobKey][]*encoding.Frame

// GetFrames retrieves the frames for a blob.
func (s *chunkManager) GetFrames(ctx context.Context, mMap *metadataMap) (*frameMap, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps are already reference types, so don't need to pass a pointer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All map pointers converted to non-pointer type

relay/chunk_manager.go Outdated Show resolved Hide resolved
relay/chunk_manager.go Outdated Show resolved Hide resolved
Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, please address the remaining comments

relay/blob_manager.go Outdated Show resolved Hide resolved
relay/blob_manager.go Outdated Show resolved Hide resolved
relay/blob_manager.go Outdated Show resolved Hide resolved
relay/blob_manager.go Outdated Show resolved Hide resolved
api/proto/relay/relay.proto Outdated Show resolved Hide resolved
relay/chunk_manager.go Outdated Show resolved Hide resolved
relay/config.go Outdated Show resolved Hide resolved
relay/config.go Outdated Show resolved Hide resolved
relay/metadata_manager.go Outdated Show resolved Hide resolved
relay/metadata_manager.go Outdated Show resolved Hide resolved
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Copy link
Contributor

@dmanc dmanc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some minor comments but lgtm

if err != nil {
// It should not be possible for external users to force an error here since we won't
// even call this method if the blob key is invalid (so it's ok to have a noisy log here).
s.logger.Error("Failed to fetch blob: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be errorf?, same comment for fetchBlob

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed

go func() {
frames, err := s.frameCache.Get(*boundKey)
if err != nil {
s.logger.Error("Failed to get frames for blob %v: %v", boundKey.blobKey, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@cody-littley cody-littley merged commit 94c11ad into Layr-Labs:master Nov 15, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants