Skip to content

Commit

Permalink
Made suggested changes.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 14, 2024
1 parent 71ab20e commit 029a258
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 111 deletions.
5 changes: 2 additions & 3 deletions api/proto/relay/relay.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ message ChunkRequestByRange {
uint32 end_index = 3;
}

// A request for chunks within a specific blob.
// A request for chunks within a specific blob. Requests are fulfilled in all-or-nothing fashion. If any of the
// requested chunks are not found or are unable to be fetched, the entire request will fail.
message ChunkRequest {
oneof request {
// Request chunks by their individual indices.
Expand All @@ -70,8 +71,6 @@ message ChunkRequest {
}

// The reply to a GetChunks request.
// Requests are fulfilled in all-or-nothing fashion. If any of the requested chunks are not found, the entire request
// will fail.
message GetChunksReply {
// The chunks requested. The order of these chunks will be the same as the order of the requested chunks.
// data is the raw data of the bundle (i.e. serialized byte array of the frames)
Expand Down
18 changes: 9 additions & 9 deletions relay/blob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
)

// blobManager encapsulates logic for fetching blobs. Utilized by the relay Server.
// This struct adds caching and threading on top of blobstore.BlobStore.
// This struct adds caching and concurrency limitation on top of blobstore.BlobStore.
type blobManager struct {
ctx context.Context
logger logging.Logger

// blobStore can be used to read blobs from S3.
// blobStore is used to read blobs from S3.
blobStore *blobstore.BlobStore

// blobCache is an LRU cache of blobs.
Expand All @@ -31,20 +31,20 @@ func newBlobManager(
logger logging.Logger,
blobStore *blobstore.BlobStore,
blobCacheSize int,
workPoolSize int) (*blobManager, error) {
maxIOConcurrency int) (*blobManager, error) {

server := &blobManager{
ctx: ctx,
logger: logger,
blobStore: blobStore,
concurrencyLimiter: make(chan struct{}, workPoolSize),
concurrencyLimiter: make(chan struct{}, maxIOConcurrency),
}

cache, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, server.fetchBlob)
c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
server.blobCache = cache
server.blobCache = c

return server, nil
}
Expand All @@ -63,16 +63,16 @@ func (s *blobManager) GetBlob(blobKey v2.BlobKey) ([]byte, error) {
return nil, err
}

return *data, nil
return data, nil
}

// fetchBlob retrieves a single blob from the blob store.
func (s *blobManager) fetchBlob(blobKey v2.BlobKey) (*[]byte, error) {
func (s *blobManager) fetchBlob(blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobStore.GetBlob(s.ctx, blobKey)
if err != nil {
s.logger.Error("Failed to fetch blob: %v", err)
return nil, err
}

return &data, nil
return data, nil
}
14 changes: 7 additions & 7 deletions relay/cache/cached_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// are expensive, and prevents multiple concurrent cache misses for the same key.
type CachedAccessor[K comparable, V any] interface {
// Get returns the value for the given key. If the value is not in the cache, it will be fetched using the Accessor.
Get(key K) (*V, error)
Get(key K) (V, error)
}

// Accessor is function capable of fetching a value from a resource. Used by CachedAccessor when there is a cache miss.
Expand All @@ -20,7 +20,7 @@ type accessResult[V any] struct {
// wg.Wait() will block until the value is fetched.
wg sync.WaitGroup
// value is the value fetched by the Accessor, or nil if there was an error.
value *V
value V
// err is the error returned by the Accessor, or nil if the fetch was successful.
err error
}
Expand All @@ -42,19 +42,19 @@ type cachedAccessor[K comparable, V any] struct {
lookupsInProgress map[K]*accessResult[V]

// cache is the LRU cache used to store values fetched by the accessor.
cache *lru.Cache[K, *V]
cache *lru.Cache[K, V]

// lock is used to protect the cache and lookupsInProgress map.
cacheLock sync.Mutex

// accessor is the function used to fetch values that are not in the cache.
accessor Accessor[K, *V]
accessor Accessor[K, V]
}

// NewCachedAccessor creates a new CachedAccessor.
func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K, *V]) (CachedAccessor[K, V], error) {
func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K, V]) (CachedAccessor[K, V], error) {

cache, err := lru.New[K, *V](cacheSize)
cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, err
}
Expand All @@ -76,7 +76,7 @@ func newAccessResult[V any]() *accessResult[V] {
return result
}

func (c *cachedAccessor[K, V]) Get(key K) (*V, error) {
func (c *cachedAccessor[K, V]) Get(key K) (V, error) {

c.cacheLock.Lock()

Expand Down
4 changes: 2 additions & 2 deletions relay/cache/cached_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) {
require.Equal(t, uint64(1), cacheMissCount.Load())

// The internal lookupsInProgress map should no longer contain the key.
require.Equal(t, 0, len(ca.(*cachedAccessor[int, string]).lookupsInProgress))
require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress))
}

func TestParallelAccess(t *testing.T) {
Expand Down Expand Up @@ -252,5 +252,5 @@ func TestParallelAccessWithError(t *testing.T) {
require.Equal(t, count+1, cacheMissCount.Load())

// The internal lookupsInProgress map should no longer contain the key.
require.Equal(t, 0, len(ca.(*cachedAccessor[int, string]).lookupsInProgress))
require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress))
}
34 changes: 19 additions & 15 deletions relay/chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type chunkManager struct {
ctx context.Context
logger logging.Logger

// metadataCache is an LRU cache of blob metadata. Blobs that do not belong to one of the relay shards
// assigned to this server will not be in the cache.
// metadataCache is an LRU cache of blob metadata. Each relay is authorized to serve data assigned to one or more
// relay IDs. Blobs that do not belong to one of the relay IDs assigned to this server will not be in the cache.
frameCache cache.CachedAccessor[blobKeyWithMetadata, []*encoding.Frame]

// chunkReader is used to read chunks from the chunk store.
Expand All @@ -44,13 +44,13 @@ func newChunkManager(
logger logging.Logger,
chunkReader chunkstore.ChunkReader,
cacheSize int,
workPoolSize int) (*chunkManager, error) {
maxIOConcurrency int) (*chunkManager, error) {

server := &chunkManager{
ctx: ctx,
logger: logger,
chunkReader: chunkReader,
concurrencyLimiter: make(chan struct{}, workPoolSize),
concurrencyLimiter: make(chan struct{}, maxIOConcurrency),
}

c, err := cache.NewCachedAccessor[blobKeyWithMetadata, []*encoding.Frame](cacheSize, server.fetchFrames)
Expand All @@ -66,10 +66,14 @@ func newChunkManager(
type frameMap map[v2.BlobKey][]*encoding.Frame

// GetFrames retrieves the frames for a blob.
func (s *chunkManager) GetFrames(ctx context.Context, mMap *metadataMap) (*frameMap, error) {
func (s *chunkManager) GetFrames(ctx context.Context, mMap metadataMap) (frameMap, error) {

keys := make([]*blobKeyWithMetadata, 0, len(*mMap))
for k, v := range *mMap {
if len(mMap) == 0 {
return nil, fmt.Errorf("no metadata provided")
}

keys := make([]*blobKeyWithMetadata, 0, len(mMap))
for k, v := range mMap {
keys = append(keys, &blobKeyWithMetadata{blobKey: k, metadata: *v})
}

Expand All @@ -96,7 +100,7 @@ func (s *chunkManager) GetFrames(ctx context.Context, mMap *metadataMap) (*frame
} else {
completionChannel <- &framesResult{
key: boundKey.blobKey,
data: *frames,
data: frames,
}
}

Expand All @@ -112,11 +116,11 @@ func (s *chunkManager) GetFrames(ctx context.Context, mMap *metadataMap) (*frame
fMap[result.key] = result.data
}

return &fMap, nil
return fMap, nil
}

// fetchFrames retrieves the frames for a single blob.
func (s *chunkManager) fetchFrames(key blobKeyWithMetadata) (*[]*encoding.Frame, error) {
func (s *chunkManager) fetchFrames(key blobKeyWithMetadata) ([]*encoding.Frame, error) {

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down Expand Up @@ -153,19 +157,19 @@ func (s *chunkManager) fetchFrames(key blobKeyWithMetadata) (*[]*encoding.Frame,
return nil, err
}

return &frames, nil
return frames, nil
}

// assembleFrames assembles a slice of frames from its composite proofs and coefficients.
func assembleFrames(frames []*rs.Frame, proof []*encoding.Proof) ([]*encoding.Frame, error) {
if len(frames) != len(proof) {
return nil, fmt.Errorf("number of frames and proofs must be equal (%d != %d)", len(frames), len(proof))
func assembleFrames(frames []*rs.Frame, proofs []*encoding.Proof) ([]*encoding.Frame, error) {
if len(frames) != len(proofs) {
return nil, fmt.Errorf("number of frames and proofs must be equal (%d != %d)", len(frames), len(proofs))
}

assembledFrames := make([]*encoding.Frame, len(frames))
for i := range frames {
assembledFrames[i] = &encoding.Frame{
Proof: *proof[i],
Proof: *proofs[i],
Coeffs: frames[i].Coeffs,
}
}
Expand Down
18 changes: 9 additions & 9 deletions relay/chunk_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func TestFetchingIndividualBlobs(t *testing.T) {
fragmentSizeBytes: fragmentInfo.FragmentSizeBytes,
}

fMap, err := server.GetFrames(context.Background(), &mMap)
fMap, err := server.GetFrames(context.Background(), mMap)
require.NoError(t, err)

require.Equal(t, 1, len(*fMap))
readFrames := (*fMap)[key]
require.Equal(t, 1, len(fMap))
readFrames := (fMap)[key]
require.NotNil(t, readFrames)

// TODO: when I inspect this data using a debugger, the proofs are all made up of 0s... something
Expand All @@ -79,11 +79,11 @@ func TestFetchingIndividualBlobs(t *testing.T) {
fragmentSizeBytes: fragmentInfo.FragmentSizeBytes,
}

fMap, err := server.GetFrames(context.Background(), &mMap)
fMap, err := server.GetFrames(context.Background(), mMap)
require.NoError(t, err)

require.Equal(t, 1, len(*fMap))
readFrames := (*fMap)[key]
require.Equal(t, 1, len(fMap))
readFrames := (fMap)[key]
require.NotNil(t, readFrames)

require.Equal(t, frames, readFrames)
Expand Down Expand Up @@ -142,13 +142,13 @@ func TestFetchingBatchedBlobs(t *testing.T) {
}
}

fMap, err := server.GetFrames(context.Background(), &mMap)
fMap, err := server.GetFrames(context.Background(), mMap)
require.NoError(t, err)

require.Equal(t, batchSize, len(*fMap))
require.Equal(t, batchSize, len(fMap))
for key := range mMap {

readFrames := (*fMap)[key]
readFrames := (fMap)[key]
require.NotNil(t, readFrames)

expectedFramesForBlob := expectedFrames[key]
Expand Down
40 changes: 20 additions & 20 deletions relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,40 @@ import core "github.com/Layr-Labs/eigenda/core/v2"
// Config is the configuration for the relay Server.
type Config struct {

// Shards contains the IDs of the relays that this server is willing to serve data for. If empty, the server will
// RelayIDs contains the IDs of the relays that this server is willing to serve data for. If empty, the server will
// serve data for any shard it can.
Shards []core.RelayKey
RelayIDs []core.RelayKey

// MetadataCacheSize is the size of the metadata cache. Current cache implementation is unaware of data sizes, and
// so this is a total count, not a size in bytes. Default is 1024 * 1024.
// MetadataCacheSize is the maximum number of items in the metadata cache. Default is 1024 * 1024.
MetadataCacheSize int

// MetadataWorkPoolSize is the size of the metadata work pool. Default is 32.
MetadataWorkPoolSize int
// MetadataMaxConcurrency puts a limit on the maximum number of concurrent metadata fetches actively running on
// goroutines. Default is 32.
MetadataMaxConcurrency int

// BlobCacheSize is the size of the blob cache. Current cache implementation is unaware of data sizes, and so
// this is a total count, not a size in bytes. Default is 32.
// BlobCacheSize is the maximum number of items in the blob cache. Default is 32.
BlobCacheSize int

// BlobWorkPoolSize is the size of the blob work pool. Default is 32.
BlobWorkPoolSize int
// BlobMaxConcurrency puts a limit on the maximum number of concurrent blob fetches actively running on goroutines.
// Default is 32.
BlobMaxConcurrency int

// ChunkCacheSize is the size of the chunk cache. Current cache implementation is unaware of data sizes, and so
// this is a total count, not a size in bytes. Default is 32.
// ChunkCacheSize is the maximum number of items in the chunk cache. Default is 32.
ChunkCacheSize int

// ChunkWorkPoolSize is the size of the chunk work pool. Default is 32.
ChunkWorkPoolSize int
// ChunkMaxConcurrency is the size of the work pool for fetching chunks. Default is 32. Note that this does not
// impact concurrency utilized by the s3 client to upload/download fragmented files.
ChunkMaxConcurrency int
}

// DefaultConfig returns the default configuration for the relay Server.
func DefaultConfig() *Config {
return &Config{
MetadataCacheSize: 1024 * 1024,
MetadataWorkPoolSize: 32,
BlobCacheSize: 32,
BlobWorkPoolSize: 32,
ChunkCacheSize: 32,
ChunkWorkPoolSize: 32,
MetadataCacheSize: 1024 * 1024,
MetadataMaxConcurrency: 32,
BlobCacheSize: 32,
BlobMaxConcurrency: 32,
ChunkCacheSize: 32,
ChunkMaxConcurrency: 32,
}
}
Loading

0 comments on commit 029a258

Please sign in to comment.