Skip to content

Commit

Permalink
Concurrency limiting cache (#903)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 18, 2024
1 parent c70f576 commit d941c43
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 43 deletions.
14 changes: 4 additions & 10 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ type blobProvider struct {

// blobCache is an LRU cache of blobs.
blobCache cache.CachedAccessor[v2.BlobKey, []byte]

// concurrencyLimiter is a channel that limits the number of concurrent operations.
concurrencyLimiter chan struct{}
}

// newBlobProvider creates a new blobProvider.
Expand All @@ -34,13 +31,12 @@ func newBlobProvider(
maxIOConcurrency int) (*blobProvider, error) {

server := &blobProvider{
ctx: ctx,
logger: logger,
blobStore: blobStore,
concurrencyLimiter: make(chan struct{}, maxIOConcurrency),
ctx: ctx,
logger: logger,
blobStore: blobStore,
}

c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, server.fetchBlob)
c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, maxIOConcurrency, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
Expand All @@ -52,9 +48,7 @@ func newBlobProvider(
// GetBlob retrieves a blob from the blob store.
func (s *blobProvider) GetBlob(blobKey v2.BlobKey) ([]byte, error) {

s.concurrencyLimiter <- struct{}{}
data, err := s.blobCache.Get(blobKey)
<-s.concurrencyLimiter

if err != nil {
// It should not be possible for external users to force an error here since we won't
Expand Down
35 changes: 30 additions & 5 deletions relay/cache/cached_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,25 @@ type cachedAccessor[K comparable, V any] struct {
// cache is the LRU cache used to store values fetched by the accessor.
cache *lru.Cache[K, V]

// concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress.
concurrencyLimiter chan struct{}

// lock is used to protect the cache and lookupsInProgress map.
cacheLock sync.Mutex

// accessor is the function used to fetch values that are not in the cache.
accessor Accessor[K, V]
}

// NewCachedAccessor creates a new CachedAccessor.
func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K, V]) (CachedAccessor[K, V], error) {
// NewCachedAccessor creates a new CachedAccessor. The cacheSize parameter specifies the maximum number of items
// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent
// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess
// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor
// parameter is the function used to fetch values that are not in the cache.
func NewCachedAccessor[K comparable, V any](
cacheSize int,
concurrencyLimit int,
accessor Accessor[K, V]) (CachedAccessor[K, V], error) {

cache, err := lru.New[K, V](cacheSize)
if err != nil {
Expand All @@ -61,10 +71,16 @@ func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K,

lookupsInProgress := make(map[K]*accessResult[V])

var concurrencyLimiter chan struct{}
if concurrencyLimit > 0 {
concurrencyLimiter = make(chan struct{}, concurrencyLimit)
}

return &cachedAccessor[K, V]{
cache: cache,
accessor: accessor,
lookupsInProgress: lookupsInProgress,
cache: cache,
concurrencyLimiter: concurrencyLimiter,
accessor: accessor,
lookupsInProgress: lookupsInProgress,
}, nil
}

Expand Down Expand Up @@ -102,8 +118,17 @@ func (c *cachedAccessor[K, V]) Get(key K) (V, error) {
return result.value, result.err
} else {
// We are the first goroutine to request this key.

if c.concurrencyLimiter != nil {
c.concurrencyLimiter <- struct{}{}
}

value, err := c.accessor(key)

if c.concurrencyLimiter != nil {
<-c.concurrencyLimiter
}

c.cacheLock.Lock()

// Update the cache if the fetch was successful.
Expand Down
64 changes: 60 additions & 4 deletions relay/cache/cached_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRandomOperationsSingleThread(t *testing.T) {
}
cacheSize := rand.Intn(dataSize) + 1

ca, err := NewCachedAccessor(cacheSize, accessor)
ca, err := NewCachedAccessor(cacheSize, 0, accessor)
require.NoError(t, err)

for i := 0; i < dataSize; i++ {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestCacheMisses(t *testing.T) {
return &str, nil
}

ca, err := NewCachedAccessor(cacheSize, accessor)
ca, err := NewCachedAccessor(cacheSize, 0, accessor)
require.NoError(t, err)

// Get the first cacheSize keys. This should fill the cache.
Expand Down Expand Up @@ -142,7 +142,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) {
}
cacheSize := rand.Intn(dataSize) + 1

ca, err := NewCachedAccessor(cacheSize, accessor)
ca, err := NewCachedAccessor(cacheSize, 0, accessor)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestParallelAccessWithError(t *testing.T) {
}
cacheSize := 100

ca, err := NewCachedAccessor(cacheSize, accessor)
ca, err := NewCachedAccessor(cacheSize, 0, accessor)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down Expand Up @@ -254,3 +254,59 @@ func TestParallelAccessWithError(t *testing.T) {
// The internal lookupsInProgress map should no longer contain the key.
require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress))
}

func TestConcurrencyLimiter(t *testing.T) {
tu.InitializeRandom()

dataSize := 1024

baseData := make(map[int]string)
for i := 0; i < dataSize; i++ {
baseData[i] = tu.RandomString(10)
}

maxConcurrency := 10 + rand.Intn(10)

accessorLock := sync.RWMutex{}
accessorLock.Lock()
activeAccessors := atomic.Int64{}
accessor := func(key int) (*string, error) {
activeAccessors.Add(1)
accessorLock.Lock()
defer func() {
activeAccessors.Add(-1)
}()
accessorLock.Unlock()

value := baseData[key]
return &value, nil
}

cacheSize := 100
ca, err := NewCachedAccessor(cacheSize, maxConcurrency, accessor)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(dataSize)
for i := 0; i < dataSize; i++ {
boundI := i
go func() {
value, err := ca.Get(boundI)
require.NoError(t, err)
require.Equal(t, baseData[boundI], *value)
wg.Done()
}()
}

// Wait for the goroutines to start. We want to give the goroutines a chance to do naughty things if they want.
// Eliminating this sleep will not cause the test to fail, but it may cause the test not to exercise the
// desired race condition.
time.Sleep(100 * time.Millisecond)

// The number of active accessors should be less than or equal to the maximum concurrency.
require.True(t, activeAccessors.Load() <= int64(maxConcurrency))

// Unlock the accessor. This will allow the goroutines to proceed.
accessorLock.Unlock()
wg.Wait()
}
17 changes: 7 additions & 10 deletions relay/chunk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type chunkProvider struct {

// chunkReader is used to read chunks from the chunk store.
chunkReader chunkstore.ChunkReader

// concurrencyLimiter is a channel that limits the number of concurrent operations.
concurrencyLimiter chan struct{}
}

// blobKeyWithMetadata attaches some additional metadata to a blobKey.
Expand All @@ -47,13 +44,15 @@ func newChunkProvider(
maxIOConcurrency int) (*chunkProvider, error) {

server := &chunkProvider{
ctx: ctx,
logger: logger,
chunkReader: chunkReader,
concurrencyLimiter: make(chan struct{}, maxIOConcurrency),
ctx: ctx,
logger: logger,
chunkReader: chunkReader,
}

c, err := cache.NewCachedAccessor[blobKeyWithMetadata, []*encoding.Frame](cacheSize, server.fetchFrames)
c, err := cache.NewCachedAccessor[blobKeyWithMetadata, []*encoding.Frame](
cacheSize,
maxIOConcurrency,
server.fetchFrames)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -128,11 +127,9 @@ func (s *chunkProvider) fetchFrames(key blobKeyWithMetadata) ([]*encoding.Frame,
var proofs []*encoding.Proof
var proofsErr error

s.concurrencyLimiter <- struct{}{}
go func() {
defer func() {
wg.Done()
<-s.concurrencyLimiter
}()
proofs, proofsErr = s.chunkReader.GetChunkProofs(s.ctx, key.blobKey)
}()
Expand Down
22 changes: 8 additions & 14 deletions relay/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ type metadataProvider struct {
// relayIDSet is the set of relay IDs assigned to this relay. This relay will refuse to serve metadata for blobs
// that are not assigned to one of these IDs.
relayIDSet map[v2.RelayKey]struct{}

// concurrencyLimiter is a channel that limits the number of concurrent operations.
concurrencyLimiter chan struct{}
}

// newMetadataProvider creates a new metadataProvider.
Expand All @@ -56,14 +53,16 @@ func newMetadataProvider(
}

server := &metadataProvider{
ctx: ctx,
logger: logger,
metadataStore: metadataStore,
relayIDSet: relayIDSet,
concurrencyLimiter: make(chan struct{}, maxIOConcurrency),
ctx: ctx,
logger: logger,
metadataStore: metadataStore,
relayIDSet: relayIDSet,
}

metadataCache, err := cache.NewCachedAccessor[v2.BlobKey, *blobMetadata](metadataCacheSize, server.fetchMetadata)
metadataCache, err := cache.NewCachedAccessor[v2.BlobKey, *blobMetadata](
metadataCacheSize,
maxIOConcurrency,
server.fetchMetadata)
if err != nil {
return nil, fmt.Errorf("error creating metadata cache: %w", err)
}
Expand Down Expand Up @@ -99,12 +98,7 @@ func (m *metadataProvider) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap,
}

boundKey := key
m.concurrencyLimiter <- struct{}{}
go func() {
defer func() {
<-m.concurrencyLimiter
}()

metadata, err := m.metadataCache.Get(boundKey)
if err != nil {
// Intentionally log at debug level. External users can force this condition to trigger
Expand Down

0 comments on commit d941c43

Please sign in to comment.