From 1f1bb7a4ffb5684bf1905f17ac55fdef319917ed Mon Sep 17 00:00:00 2001 From: Alan Clucas Date: Thu, 11 Jul 2024 11:56:20 +0100 Subject: [PATCH] chore: remove PriorityMutex implementation This is just a code refactor, removing unnecessary extra code. As part of #11859 and #5022 I noticed that PriorityMutex is not useful and can just be replaced by PrioritySemaphore. The only code path that is optimised vs PrioritySempahore is resize which will [never be called](https://github.com/argoproj/argo-workflows/blob/c59ff53a37a7e7ac41f73b23e621f601eab9d604/workflow/sync/sync_manager.go#L340). Otherwise we're performing double sync.Mutex locking for every call because all the implementation of PriorityMutex just calls the relevant method from PrioritySemaphore with it's own lock held, which is wasteful. Signed-off-by: Alan Clucas --- workflow/sync/mutex.go | 75 ++---------------------------- workflow/sync/sync_manager_test.go | 27 ++++------- 2 files changed, 13 insertions(+), 89 deletions(-) diff --git a/workflow/sync/mutex.go b/workflow/sync/mutex.go index 7a4749825a48..ab90d305088d 100644 --- a/workflow/sync/mutex.go +++ b/workflow/sync/mutex.go @@ -1,75 +1,6 @@ package sync -import ( - "sync" - "time" -) - -type PriorityMutex struct { - name string - mutex *PrioritySemaphore - lock *sync.Mutex -} - -func (m *PriorityMutex) getCurrentPending() []string { - return m.mutex.getCurrentPending() -} - -var _ Semaphore = &PriorityMutex{} - -// NewMutex creates new mutex lock object -// name of the mutex -// callbackFunc is a release notification function. -func NewMutex(name string, nextWorkflow NextWorkflow) *PriorityMutex { - return &PriorityMutex{ - name: name, - lock: &sync.Mutex{}, - mutex: NewSemaphore(name, 1, nextWorkflow, "mutex"), - } -} - -func (m *PriorityMutex) getName() string { - return m.name -} - -func (m *PriorityMutex) getLimit() int { - return m.mutex.limit -} - -func (m *PriorityMutex) getCurrentHolders() []string { - return m.mutex.getCurrentHolders() -} - -func (m *PriorityMutex) resize(n int) bool { - return false -} - -func (m *PriorityMutex) release(key string) bool { - m.lock.Lock() - defer m.lock.Unlock() - return m.mutex.release(key) -} - -func (m *PriorityMutex) acquire(holderKey string) bool { - m.lock.Lock() - defer m.lock.Unlock() - return m.mutex.acquire(holderKey) -} - -func (m *PriorityMutex) addToQueue(holderKey string, priority int32, creationTime time.Time) { - m.lock.Lock() - defer m.lock.Unlock() - m.mutex.addToQueue(holderKey, priority, creationTime) -} - -func (m *PriorityMutex) removeFromQueue(holderKey string) { - m.lock.Lock() - defer m.lock.Unlock() - m.mutex.removeFromQueue(holderKey) -} - -func (m *PriorityMutex) tryAcquire(holderKey string) (bool, string) { - m.lock.Lock() - defer m.lock.Unlock() - return m.mutex.tryAcquire(holderKey) +// NewMutex creates a size 1 semaphore +func NewMutex(name string, nextWorkflow NextWorkflow) *PrioritySemaphore { + return NewSemaphore(name, 1, nextWorkflow, "mutex") } diff --git a/workflow/sync/sync_manager_test.go b/workflow/sync/sync_manager_test.go index 591b8509055a..69194467c65c 100644 --- a/workflow/sync/sync_manager_test.go +++ b/workflow/sync/sync_manager_test.go @@ -44,7 +44,6 @@ metadata: selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflows/hello-world-prtl9 uid: 790f5c47-211f-4a3b-8949-514ae916633b spec: - entrypoint: whalesay synchronization: semaphore: @@ -52,8 +51,7 @@ spec: key: workflow name: my-config templates: - - - container: + - container: args: - hello world command: @@ -118,17 +116,14 @@ metadata: name: semaphore-tmpl-level-xjvln namespace: default spec: - entrypoint: semaphore-tmpl-level-example templates: - - - inputs: {} + - inputs: {} metadata: {} name: semaphore-tmpl-level-example outputs: {} steps: - - - - name: generate + - - name: generate template: gen-number-list - - arguments: parameters: @@ -137,8 +132,7 @@ spec: name: sleep template: sleep-n-sec withParam: '{{steps.generate.outputs.result}}' - - - inputs: {} + - inputs: {} metadata: {} name: gen-number-list outputs: {} @@ -152,8 +146,7 @@ spec: import json import sys json.dump([i for i in range(1, 3)], sys.stdout) - - - container: + - container: args: - echo sleeping for {{inputs.parameters.seconds}} seconds; sleep 10; echo done command: @@ -654,13 +647,13 @@ func TestMutexWfLevel(t *testing.T) { assert.False(t, status) assert.True(t, wfUpdate) - mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex) + mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore) assert.NotNil(t, mutex) - assert.Len(t, mutex.mutex.pending.items, 2) + assert.Len(t, mutex.pending.items, 2) concurrenyMgr.ReleaseAll(wf1) - assert.Len(t, mutex.mutex.pending.items, 1) + assert.Len(t, mutex.pending.items, 1) concurrenyMgr.ReleaseAll(wf2) - assert.Len(t, mutex.mutex.pending.items, 0) + assert.Len(t, mutex.pending.items, 0) }) } @@ -692,7 +685,7 @@ func TestCheckWorkflowExistence(t *testing.T) { _, _, _, _ = concurrenyMgr.TryAcquire(wfMutex1, "", wfMutex.Spec.Synchronization) _, _, _, _ = concurrenyMgr.TryAcquire(wfSema, "", wfSema.Spec.Synchronization) _, _, _, _ = concurrenyMgr.TryAcquire(wfSema1, "", wfSema.Spec.Synchronization) - mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex) + mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore) semaphore := concurrenyMgr.syncLockMap["default/ConfigMap/my-config/workflow"] assert.Len(mutex.getCurrentHolders(), 1)