Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing timer based cleanup logic #2927

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions internal/gcsx/multi_range_downloader_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,14 @@ import (

"github.com/google/uuid"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/clock"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/net/context"
)

// Timeout value which determines when the MultiRangeDownloader will be closed after
// it's refcount reaches 0.
const multiRangeDownloaderTimeout = 60 * time.Second

func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) MultiRangeDownloaderWrapper {
return NewMultiRangeDownloaderWrapperWithClock(bucket, object, clock.RealClock{})
}

func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) MultiRangeDownloaderWrapper {
return MultiRangeDownloaderWrapper{
clock: clock,
bucket: bucket,
object: object,
}
Expand All @@ -63,10 +53,6 @@ type MultiRangeDownloaderWrapper struct {
refCount int
// Mutex is used to synchronize access over refCount.
mu sync.Mutex
// Holds the cancel function, which can be called to cancel the cleanup function.
cancelCleanup context.CancelFunc
// Used for waiting for timeout (helps us in mocking the functionality).
clock clock.Clock
}

// Returns current refcount.
Expand All @@ -84,10 +70,6 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) IncrementRefCount() {
defer mrdWrapper.mu.Unlock()

mrdWrapper.refCount++
if mrdWrapper.cancelCleanup != nil {
mrdWrapper.cancelCleanup()
mrdWrapper.cancelCleanup = nil
}
}

// Decrement the refcount. In case refcount reaches 0, cleanup the MRD.
Expand All @@ -105,35 +87,12 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) DecrementRefCount() (err error) {

mrdWrapper.refCount--
if mrdWrapper.refCount == 0 {
mrdWrapper.cleanupMultiRangeDownloader()
mrdWrapper.Wrapped.Close()
mrdWrapper.Wrapped = nil
}
return
}

// Spawns a cancellable go routine to close the MRD after the timeout.
// Always call after taking MultiRangeDownloaderWrapper's mutex lock.
func (mrdWrapper *MultiRangeDownloaderWrapper) cleanupMultiRangeDownloader() {
closeMRD := func(ctx context.Context) {
select {
case <-mrdWrapper.clock.After(multiRangeDownloaderTimeout):
mrdWrapper.mu.Lock()
defer mrdWrapper.mu.Unlock()

if mrdWrapper.refCount == 0 && mrdWrapper.Wrapped != nil {
mrdWrapper.Wrapped.Close()
mrdWrapper.Wrapped = nil
mrdWrapper.cancelCleanup = nil
}
case <-ctx.Done():
return
}
}

ctx, cancel := context.WithCancel(context.Background())
mrdWrapper.cancelCleanup = cancel
go closeMRD(ctx)
}

// Ensures that MultiRangeDownloader exists, creating it if it does not exist.
func (mrdWrapper *MultiRangeDownloaderWrapper) ensureMultiRangeDownloader() (err error) {
if mrdWrapper.Wrapped == nil {
Expand Down
12 changes: 2 additions & 10 deletions internal/gcsx/multi_range_downloader_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/clock"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
Expand Down Expand Up @@ -54,7 +53,7 @@ func (t *mrdWrapperTest) SetupTest() {
// Create the bucket.
t.mockBucket = new(storage.TestifyMockBucket)
t.mrdTimeout = time.Millisecond
t.mrdWrapper = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout})
t.mrdWrapper = NewMultiRangeDownloaderWrapper(t.mockBucket, t.object)
t.mrdWrapper.Wrapped = fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond)
t.mrdWrapper.refCount = 0
}
Expand All @@ -80,14 +79,11 @@ func (t *mrdWrapperTest) Test_IncrementRefCount_CancelCleanup() {
err := t.mrdWrapper.DecrementRefCount()

assert.Nil(t.T(), err)
assert.NotNil(t.T(), t.mrdWrapper.cancelCleanup)
assert.NotNil(t.T(), t.mrdWrapper.Wrapped)
assert.Nil(t.T(), t.mrdWrapper.Wrapped)

t.mrdWrapper.IncrementRefCount()

assert.Equal(t.T(), finalRefCount, t.mrdWrapper.refCount)
assert.Nil(t.T(), t.mrdWrapper.cancelCleanup)
assert.NotNil(t.T(), t.mrdWrapper.Wrapped)
}

func (t *mrdWrapperTest) Test_DecrementRefCount_ParallelUpdates() {
Expand Down Expand Up @@ -115,10 +111,6 @@ func (t *mrdWrapperTest) Test_DecrementRefCount_ParallelUpdates() {
wg.Wait()

assert.Equal(t.T(), finalRefCount, t.mrdWrapper.GetRefCount())
assert.NotNil(t.T(), t.mrdWrapper.Wrapped)
assert.NotNil(t.T(), t.mrdWrapper.cancelCleanup)
// Waiting for the cleanup to be done.
time.Sleep(t.mrdTimeout + time.Millisecond)
assert.Nil(t.T(), t.mrdWrapper.Wrapped)
}

Expand Down
9 changes: 4 additions & 5 deletions internal/gcsx/random_reader_stretchr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file/downloader"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/clock"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
Expand Down Expand Up @@ -647,7 +646,7 @@ func (t *RandomReaderStretchrTest) Test_ReadAt_MRDRead() {
t.rr.wrapped.seeks = minSeeksForRandom + 1
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object)
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -688,7 +687,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadFull() {
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object)
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -722,7 +721,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadChunk() {
t.rr.wrapped.reader = nil
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object)
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -761,7 +760,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ValidateTimeout
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object)
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, tc.sleepTime)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down
Loading