Skip to content

Commit

Permalink
Rename classes.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 14, 2024
1 parent ac17f6a commit ee5adab
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 48 deletions.
16 changes: 8 additions & 8 deletions relay/blob_manager.go → relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
)

// blobManager encapsulates logic for fetching blobs. Utilized by the relay Server.
// blobProvider encapsulates logic for fetching blobs. Utilized by the relay Server.
// This struct adds caching and concurrency limitation on top of blobstore.BlobStore.
type blobManager struct {
type blobProvider struct {
ctx context.Context
logger logging.Logger

Expand All @@ -25,15 +25,15 @@ type blobManager struct {
concurrencyLimiter chan struct{}
}

// newBlobManager creates a new blobManager.
func newBlobManager(
// newBlobProvider creates a new blobProvider.
func newBlobProvider(
ctx context.Context,
logger logging.Logger,
blobStore *blobstore.BlobStore,
blobCacheSize int,
maxIOConcurrency int) (*blobManager, error) {
maxIOConcurrency int) (*blobProvider, error) {

server := &blobManager{
server := &blobProvider{
ctx: ctx,
logger: logger,
blobStore: blobStore,
Expand All @@ -50,7 +50,7 @@ func newBlobManager(
}

// GetBlob retrieves a blob from the blob store.
func (s *blobManager) GetBlob(blobKey v2.BlobKey) ([]byte, error) {
func (s *blobProvider) GetBlob(blobKey v2.BlobKey) ([]byte, error) {

s.concurrencyLimiter <- struct{}{}
data, err := s.blobCache.Get(blobKey)
Expand All @@ -67,7 +67,7 @@ func (s *blobManager) GetBlob(blobKey v2.BlobKey) ([]byte, error) {
}

// fetchBlob retrieves a single blob from the blob store.
func (s *blobManager) fetchBlob(blobKey v2.BlobKey) ([]byte, error) {
func (s *blobProvider) fetchBlob(blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobStore.GetBlob(s.ctx, blobKey)
if err != nil {
s.logger.Error("Failed to fetch blob: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions relay/blob_manager_test.go → relay/blob_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestReadWrite(t *testing.T) {
require.NoError(t, err)
}

server, err := newBlobManager(context.Background(), logger, blobStore, 10, 32)
server, err := newBlobProvider(context.Background(), logger, blobStore, 10, 32)
require.NoError(t, err)

// Read the blobs back.
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestNonExistentBlob(t *testing.T) {

blobStore := buildBlobStore(t, logger)

server, err := newBlobManager(context.Background(), logger, blobStore, 10, 32)
server, err := newBlobProvider(context.Background(), logger, blobStore, 10, 32)
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand Down
14 changes: 7 additions & 7 deletions relay/chunk_manager.go → relay/chunk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sync"
)

type chunkManager struct {
type chunkProvider struct {
ctx context.Context
logger logging.Logger

Expand All @@ -38,15 +38,15 @@ func (m *blobKeyWithMetadata) Compare(other *blobKeyWithMetadata) int {
return bytes.Compare(m.blobKey[:], other.blobKey[:])
}

// newChunkManager creates a new chunkManager.
func newChunkManager(
// newChunkProvider creates a new chunkProvider.
func newChunkProvider(
ctx context.Context,
logger logging.Logger,
chunkReader chunkstore.ChunkReader,
cacheSize int,
maxIOConcurrency int) (*chunkManager, error) {
maxIOConcurrency int) (*chunkProvider, error) {

server := &chunkManager{
server := &chunkProvider{
ctx: ctx,
logger: logger,
chunkReader: chunkReader,
Expand All @@ -66,7 +66,7 @@ func newChunkManager(
type frameMap map[v2.BlobKey][]*encoding.Frame

// GetFrames retrieves the frames for a blob.
func (s *chunkManager) GetFrames(ctx context.Context, mMap metadataMap) (frameMap, error) {
func (s *chunkProvider) GetFrames(ctx context.Context, mMap metadataMap) (frameMap, error) {

if len(mMap) == 0 {
return nil, fmt.Errorf("no metadata provided")
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *chunkManager) GetFrames(ctx context.Context, mMap metadataMap) (frameMa
}

// fetchFrames retrieves the frames for a single blob.
func (s *chunkManager) fetchFrames(key blobKeyWithMetadata) ([]*encoding.Frame, error) {
func (s *chunkProvider) fetchFrames(key blobKeyWithMetadata) ([]*encoding.Frame, error) {

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions relay/chunk_manager_test.go → relay/chunk_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestFetchingIndividualBlobs(t *testing.T) {
fragmentInfoMap[blobKey] = fragmentInfo
}

server, err := newChunkManager(context.Background(), logger, chunkReader, 10, 32)
server, err := newChunkProvider(context.Background(), logger, chunkReader, 10, 32)
require.NoError(t, err)

// Read it back.
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestFetchingBatchedBlobs(t *testing.T) {
fragmentInfoMap[blobKey] = fragmentInfo
}

server, err := newChunkManager(context.Background(), logger, chunkReader, 10, 32)
server, err := newChunkProvider(context.Background(), logger, chunkReader, 10, 32)
require.NoError(t, err)

// Read it back.
Expand Down
16 changes: 8 additions & 8 deletions relay/metadata_manager.go → relay/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type blobMetadata struct {
fragmentSizeBytes uint32
}

// metadataManager encapsulates logic for fetching metadata for blobs. Utilized by the relay Server.
type metadataManager struct {
// metadataProvider encapsulates logic for fetching metadata for blobs. Utilized by the relay Server.
type metadataProvider struct {
ctx context.Context
logger logging.Logger

Expand All @@ -41,21 +41,21 @@ type metadataManager struct {
concurrencyLimiter chan struct{}
}

// newMetadataManager creates a new metadataManager.
func newMetadataManager(
// newMetadataProvider creates a new metadataProvider.
func newMetadataProvider(
ctx context.Context,
logger logging.Logger,
metadataStore *blobstore.BlobMetadataStore,
metadataCacheSize int,
maxIOConcurrency int,
relayIDs []v2.RelayKey) (*metadataManager, error) {
relayIDs []v2.RelayKey) (*metadataProvider, error) {

relayIDSet := make(map[v2.RelayKey]struct{}, len(relayIDs))
for _, id := range relayIDs {
relayIDSet[id] = struct{}{}
}

server := &metadataManager{
server := &metadataProvider{
ctx: ctx,
logger: logger,
metadataStore: metadataStore,
Expand All @@ -77,7 +77,7 @@ func newMetadataManager(
type metadataMap map[v2.BlobKey]*blobMetadata

// GetMetadataForBlobs retrieves metadata about multiple blobs in parallel.
func (m *metadataManager) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap, error) {
func (m *metadataProvider) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap, error) {

// blobMetadataResult is the result of a metadata fetch operation.
type blobMetadataResult struct {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (m *metadataManager) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap, e
}

// fetchMetadata retrieves metadata about a blob. Fetches from the cache if available, otherwise from the store.
func (m *metadataManager) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) {
func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) {
// Retrieve the metadata from the store.
cert, fragmentInfo, err := m.metadataStore.GetBlobCertificate(m.ctx, key)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGetNonExistentBlob(t *testing.T) {
defer teardown()
metadataStore := buildMetadataStore(t)

server, err := newMetadataManager(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
require.NoError(t, err)

// Try to fetch a non-existent blobs
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestFetchingIndividualMetadata(t *testing.T) {
require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes)
}

server, err := newMetadataManager(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
require.NoError(t, err)

// Fetch the metadata from the server.
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestBatchedFetch(t *testing.T) {
require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes)
}

server, err := newMetadataManager(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil)
require.NoError(t, err)

// Each iteration, choose a random subset of the keys to fetch
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestIndividualFetchWithSharding(t *testing.T) {
require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes)
}

server, err := newMetadataManager(context.Background(), logger, metadataStore, 1024*1024, 32, shardList)
server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList)
require.NoError(t, err)

// Fetch the metadata from the server.
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestBatchedFetchWithSharding(t *testing.T) {
require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes)
}

server, err := newMetadataManager(context.Background(), logger, metadataStore, 1024*1024, 32, shardList)
server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList)
require.NoError(t, err)

// Each iteration, choose two random keys to fetch. There will be a 25% chance that both blobs map to valid shards.
Expand Down
32 changes: 16 additions & 16 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ var _ pb.RelayServer = &Server{}
type Server struct {
pb.UnimplementedRelayServer

// metadataServer encapsulates logic for fetching metadata for blobs.
metadataServer *metadataManager
// metadataProvider encapsulates logic for fetching metadata for blobs.
metadataProvider *metadataProvider

// blobServer encapsulates logic for fetching blobs.
blobServer *blobManager
// blobProvider encapsulates logic for fetching blobs.
blobProvider *blobProvider

// chunkServer encapsulates logic for fetching chunks.
chunkServer *chunkManager
// chunkProvider encapsulates logic for fetching chunks.
chunkProvider *chunkProvider
}

// NewServer creates a new relay Server.
Expand All @@ -37,7 +37,7 @@ func NewServer(
blobStore *blobstore.BlobStore,
chunkReader chunkstore.ChunkReader) (*Server, error) {

ms, err := newMetadataManager(
ms, err := newMetadataProvider(
ctx,
logger,
metadataStore,
Expand All @@ -48,7 +48,7 @@ func NewServer(
return nil, fmt.Errorf("error creating metadata server: %w", err)
}

bs, err := newBlobManager(
bs, err := newBlobProvider(
ctx,
logger,
blobStore,
Expand All @@ -58,7 +58,7 @@ func NewServer(
return nil, fmt.Errorf("error creating blob server: %w", err)
}

cs, err := newChunkManager(
cs, err := newChunkProvider(
ctx,
logger,
chunkReader,
Expand All @@ -69,9 +69,9 @@ func NewServer(
}

return &Server{
metadataServer: ms,
blobServer: bs,
chunkServer: cs,
metadataProvider: ms,
blobProvider: bs,
chunkProvider: cs,
}, nil
}

Expand All @@ -89,7 +89,7 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G
}

keys := []v2.BlobKey{key}
mMap, err := s.metadataServer.GetMetadataForBlobs(keys)
mMap, err := s.metadataProvider.GetMetadataForBlobs(keys)
if err != nil {
return nil, fmt.Errorf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err)
Expand All @@ -99,7 +99,7 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G
return nil, fmt.Errorf("blob not found")
}

data, err := s.blobServer.GetBlob(key)
data, err := s.blobProvider.GetBlob(key)
if err != nil {
return nil, fmt.Errorf("error fetching blob %s: %w", key.Hex(), err)
}
Expand Down Expand Up @@ -144,13 +144,13 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
keys = append(keys, key)
}

mMap, err := s.metadataServer.GetMetadataForBlobs(keys)
mMap, err := s.metadataProvider.GetMetadataForBlobs(keys)
if err != nil {
return nil, fmt.Errorf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err)
}

frames, err := s.chunkServer.GetFrames(ctx, mMap)
frames, err := s.chunkProvider.GetFrames(ctx, mMap)
if err != nil {
return nil, fmt.Errorf("error fetching frames: %w", err)
}
Expand Down

0 comments on commit ee5adab

Please sign in to comment.