From 688702ac631e76c60092c629e274822c294cadb1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 12 Apr 2024 12:35:24 -0700 Subject: [PATCH 1/3] perf(cache): Use AddRateLimited for batch enqueue --- flytestdlib/cache/auto_refresh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index f2257813a7..58490669f4 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -252,7 +252,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - w.workqueue.Add(&b) + w.workqueue.AddRateLimited(&b) } return nil From 75cd64d1079921355e7791db1719ce40bf0a6e47 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 12 Apr 2024 14:37:49 -0700 Subject: [PATCH 2/3] Fix tests Signed-off-by: Kevin Su --- flytestdlib/cache/auto_refresh_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index 4535e8c465..7507a863ce 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -19,11 +19,12 @@ import ( const fakeCacheItemValueLimit = 10 type fakeCacheItem struct { - val int + val int + isTerminal bool } func (f fakeCacheItem) IsTerminal() bool { - return false + return f.isTerminal } type terminalCacheItem struct { @@ -34,7 +35,7 @@ func (t terminalCacheItem) IsTerminal() bool { return true } -func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { +func syncFakeItem(ctx context.Context, batch Batch) ([]ItemSyncResponse, error) { items := make([]ItemSyncResponse, 0, len(batch)) for _, obj := range batch { item := obj.GetItem().(fakeCacheItem) @@ -42,11 +43,15 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { // After the item has gone through ten update cycles, leave it unchanged continue } - + isTerminal := false + if item.val == fakeCacheItemValueLimit-1 { + isTerminal = true + } items = append(items, ItemSyncResponse{ ID: obj.GetID(), Item: fakeCacheItem{ - val: item.val + 1, + val: item.val + 1, + isTerminal: isTerminal, }, Action: Update, }) @@ -60,7 +65,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error } func TestCacheFour(t *testing.T) { - testResyncPeriod := time.Millisecond + testResyncPeriod := 10 * time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() t.Run("normal operation", func(t *testing.T) { From 399656fd08c5d016058dfe924d6fd020494bbed9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 12 Apr 2024 14:40:13 -0700 Subject: [PATCH 3/3] lint Signed-off-by: Kevin Su --- flytestdlib/cache/auto_refresh_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index 7507a863ce..7707b593ff 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -35,7 +35,7 @@ func (t terminalCacheItem) IsTerminal() bool { return true } -func syncFakeItem(ctx context.Context, batch Batch) ([]ItemSyncResponse, error) { +func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { items := make([]ItemSyncResponse, 0, len(batch)) for _, obj := range batch { item := obj.GetItem().(fakeCacheItem)