From e20697dc9eb61c13fdf88d2f9151d44873df10c7 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Wed, 22 Jan 2025 07:33:04 +0000 Subject: [PATCH] Removing timer based cleanup logic --- .../gcsx/multi_range_downloader_wrapper.go | 45 +------------------ .../multi_range_downloader_wrapper_test.go | 12 +---- internal/gcsx/random_reader_stretchr_test.go | 9 ++-- 3 files changed, 8 insertions(+), 58 deletions(-) diff --git a/internal/gcsx/multi_range_downloader_wrapper.go b/internal/gcsx/multi_range_downloader_wrapper.go index 13187d27e3..0ecaaf362b 100644 --- a/internal/gcsx/multi_range_downloader_wrapper.go +++ b/internal/gcsx/multi_range_downloader_wrapper.go @@ -23,24 +23,14 @@ import ( "github.com/google/uuid" "github.com/googlecloudplatform/gcsfuse/v2/common" - "github.com/googlecloudplatform/gcsfuse/v2/internal/clock" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/monitor" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "golang.org/x/net/context" ) -// Timeout value which determines when the MultiRangeDownloader will be closed after -// it's refcount reaches 0. -const multiRangeDownloaderTimeout = 60 * time.Second - func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) MultiRangeDownloaderWrapper { - return NewMultiRangeDownloaderWrapperWithClock(bucket, object, clock.RealClock{}) -} - -func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) MultiRangeDownloaderWrapper { return MultiRangeDownloaderWrapper{ - clock: clock, bucket: bucket, object: object, } @@ -63,10 +53,6 @@ type MultiRangeDownloaderWrapper struct { refCount int // Mutex is used to synchronize access over refCount. mu sync.Mutex - // Holds the cancel function, which can be called to cancel the cleanup function. - cancelCleanup context.CancelFunc - // Used for waiting for timeout (helps us in mocking the functionality). - clock clock.Clock } // Returns current refcount. @@ -84,10 +70,6 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) IncrementRefCount() { defer mrdWrapper.mu.Unlock() mrdWrapper.refCount++ - if mrdWrapper.cancelCleanup != nil { - mrdWrapper.cancelCleanup() - mrdWrapper.cancelCleanup = nil - } } // Decrement the refcount. In case refcount reaches 0, cleanup the MRD. @@ -105,35 +87,12 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) DecrementRefCount() (err error) { mrdWrapper.refCount-- if mrdWrapper.refCount == 0 { - mrdWrapper.cleanupMultiRangeDownloader() + mrdWrapper.Wrapped.Close() + mrdWrapper.Wrapped = nil } return } -// Spawns a cancellable go routine to close the MRD after the timeout. -// Always call after taking MultiRangeDownloaderWrapper's mutex lock. -func (mrdWrapper *MultiRangeDownloaderWrapper) cleanupMultiRangeDownloader() { - closeMRD := func(ctx context.Context) { - select { - case <-mrdWrapper.clock.After(multiRangeDownloaderTimeout): - mrdWrapper.mu.Lock() - defer mrdWrapper.mu.Unlock() - - if mrdWrapper.refCount == 0 && mrdWrapper.Wrapped != nil { - mrdWrapper.Wrapped.Close() - mrdWrapper.Wrapped = nil - mrdWrapper.cancelCleanup = nil - } - case <-ctx.Done(): - return - } - } - - ctx, cancel := context.WithCancel(context.Background()) - mrdWrapper.cancelCleanup = cancel - go closeMRD(ctx) -} - // Ensures that MultiRangeDownloader exists, creating it if it does not exist. func (mrdWrapper *MultiRangeDownloaderWrapper) ensureMultiRangeDownloader() (err error) { if mrdWrapper.Wrapped == nil { diff --git a/internal/gcsx/multi_range_downloader_wrapper_test.go b/internal/gcsx/multi_range_downloader_wrapper_test.go index 7102008a0f..44223509fe 100644 --- a/internal/gcsx/multi_range_downloader_wrapper_test.go +++ b/internal/gcsx/multi_range_downloader_wrapper_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/googlecloudplatform/gcsfuse/v2/common" - "github.com/googlecloudplatform/gcsfuse/v2/internal/clock" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" @@ -54,7 +53,7 @@ func (t *mrdWrapperTest) SetupTest() { // Create the bucket. t.mockBucket = new(storage.TestifyMockBucket) t.mrdTimeout = time.Millisecond - t.mrdWrapper = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout}) + t.mrdWrapper = NewMultiRangeDownloaderWrapper(t.mockBucket, t.object) t.mrdWrapper.Wrapped = fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond) t.mrdWrapper.refCount = 0 } @@ -80,14 +79,11 @@ func (t *mrdWrapperTest) Test_IncrementRefCount_CancelCleanup() { err := t.mrdWrapper.DecrementRefCount() assert.Nil(t.T(), err) - assert.NotNil(t.T(), t.mrdWrapper.cancelCleanup) - assert.NotNil(t.T(), t.mrdWrapper.Wrapped) + assert.Nil(t.T(), t.mrdWrapper.Wrapped) t.mrdWrapper.IncrementRefCount() assert.Equal(t.T(), finalRefCount, t.mrdWrapper.refCount) - assert.Nil(t.T(), t.mrdWrapper.cancelCleanup) - assert.NotNil(t.T(), t.mrdWrapper.Wrapped) } func (t *mrdWrapperTest) Test_DecrementRefCount_ParallelUpdates() { @@ -115,10 +111,6 @@ func (t *mrdWrapperTest) Test_DecrementRefCount_ParallelUpdates() { wg.Wait() assert.Equal(t.T(), finalRefCount, t.mrdWrapper.GetRefCount()) - assert.NotNil(t.T(), t.mrdWrapper.Wrapped) - assert.NotNil(t.T(), t.mrdWrapper.cancelCleanup) - // Waiting for the cleanup to be done. - time.Sleep(t.mrdTimeout + time.Millisecond) assert.Nil(t.T(), t.mrdWrapper.Wrapped) } diff --git a/internal/gcsx/random_reader_stretchr_test.go b/internal/gcsx/random_reader_stretchr_test.go index dbe298c4ca..27c58e2535 100644 --- a/internal/gcsx/random_reader_stretchr_test.go +++ b/internal/gcsx/random_reader_stretchr_test.go @@ -31,7 +31,6 @@ import ( "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file/downloader" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util" - "github.com/googlecloudplatform/gcsfuse/v2/internal/clock" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" @@ -647,7 +646,7 @@ func (t *RandomReaderStretchrTest) Test_ReadAt_MRDRead() { t.rr.wrapped.seeks = minSeeksForRandom + 1 t.object.Size = uint64(tc.dataSize) testContent := testutil.GenerateRandomBytes(int(t.object.Size)) - fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{}) + fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object) t.rr.wrapped.mrdWrapper = &fakeMRDWrapper t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1) t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1) @@ -688,7 +687,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadFull() { t.rr.wrapped.isMRDInUse = false t.object.Size = uint64(tc.dataSize) testContent := testutil.GenerateRandomBytes(int(t.object.Size)) - fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{}) + fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object) t.rr.wrapped.mrdWrapper = &fakeMRDWrapper t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1) t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1) @@ -722,7 +721,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadChunk() { t.rr.wrapped.reader = nil t.object.Size = uint64(tc.dataSize) testContent := testutil.GenerateRandomBytes(int(t.object.Size)) - fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{}) + fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object) t.rr.wrapped.mrdWrapper = &fakeMRDWrapper t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1) t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1) @@ -761,7 +760,7 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ValidateTimeout t.rr.wrapped.isMRDInUse = false t.object.Size = uint64(tc.dataSize) testContent := testutil.GenerateRandomBytes(int(t.object.Size)) - fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{}) + fakeMRDWrapper := NewMultiRangeDownloaderWrapper(t.mockBucket, t.object) t.rr.wrapped.mrdWrapper = &fakeMRDWrapper t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, tc.sleepTime)).Times(1) t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)