Skip to content

Commit

Permalink
chore: remove PriorityMutex implementation
Browse files Browse the repository at this point in the history
This is just a code refactor, removing unnecessary extra code.

As part of #11859 and #5022 I noticed that PriorityMutex is not useful
and can just be replaced by PrioritySemaphore.

The only code path that is optimised vs PrioritySempahore is resize
which will [never be
called](https://github.com/argoproj/argo-workflows/blob/c59ff53a37a7e7ac41f73b23e621f601eab9d604/workflow/sync/sync_manager.go#L340).

Otherwise we're performing double sync.Mutex locking for every call
because all the implementation of PriorityMutex just calls the
relevant method from PrioritySemaphore with it's own lock held, which
is wasteful.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Jul 17, 2024
1 parent d7495b8 commit 1f1bb7a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 89 deletions.
75 changes: 3 additions & 72 deletions workflow/sync/mutex.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,6 @@
package sync

import (
"sync"
"time"
)

type PriorityMutex struct {
name string
mutex *PrioritySemaphore
lock *sync.Mutex
}

func (m *PriorityMutex) getCurrentPending() []string {
return m.mutex.getCurrentPending()
}

var _ Semaphore = &PriorityMutex{}

// NewMutex creates new mutex lock object
// name of the mutex
// callbackFunc is a release notification function.
func NewMutex(name string, nextWorkflow NextWorkflow) *PriorityMutex {
return &PriorityMutex{
name: name,
lock: &sync.Mutex{},
mutex: NewSemaphore(name, 1, nextWorkflow, "mutex"),
}
}

func (m *PriorityMutex) getName() string {
return m.name
}

func (m *PriorityMutex) getLimit() int {
return m.mutex.limit
}

func (m *PriorityMutex) getCurrentHolders() []string {
return m.mutex.getCurrentHolders()
}

func (m *PriorityMutex) resize(n int) bool {
return false
}

func (m *PriorityMutex) release(key string) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.release(key)
}

func (m *PriorityMutex) acquire(holderKey string) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.acquire(holderKey)
}

func (m *PriorityMutex) addToQueue(holderKey string, priority int32, creationTime time.Time) {
m.lock.Lock()
defer m.lock.Unlock()
m.mutex.addToQueue(holderKey, priority, creationTime)
}

func (m *PriorityMutex) removeFromQueue(holderKey string) {
m.lock.Lock()
defer m.lock.Unlock()
m.mutex.removeFromQueue(holderKey)
}

func (m *PriorityMutex) tryAcquire(holderKey string) (bool, string) {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.tryAcquire(holderKey)
// NewMutex creates a size 1 semaphore
func NewMutex(name string, nextWorkflow NextWorkflow) *PrioritySemaphore {
return NewSemaphore(name, 1, nextWorkflow, "mutex")
}
27 changes: 10 additions & 17 deletions workflow/sync/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ metadata:
selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflows/hello-world-prtl9
uid: 790f5c47-211f-4a3b-8949-514ae916633b
spec:
entrypoint: whalesay
synchronization:
semaphore:
configMapKeyRef:
key: workflow
name: my-config
templates:
-
container:
- container:
args:
- hello world
command:
Expand Down Expand Up @@ -118,17 +116,14 @@ metadata:
name: semaphore-tmpl-level-xjvln
namespace: default
spec:
entrypoint: semaphore-tmpl-level-example
templates:
-
inputs: {}
- inputs: {}
metadata: {}
name: semaphore-tmpl-level-example
outputs: {}
steps:
- -
name: generate
- - name: generate
template: gen-number-list
- - arguments:
parameters:
Expand All @@ -137,8 +132,7 @@ spec:
name: sleep
template: sleep-n-sec
withParam: '{{steps.generate.outputs.result}}'
-
inputs: {}
- inputs: {}
metadata: {}
name: gen-number-list
outputs: {}
Expand All @@ -152,8 +146,7 @@ spec:
import json
import sys
json.dump([i for i in range(1, 3)], sys.stdout)
-
container:
- container:
args:
- echo sleeping for {{inputs.parameters.seconds}} seconds; sleep 10; echo done
command:
Expand Down Expand Up @@ -654,13 +647,13 @@ func TestMutexWfLevel(t *testing.T) {
assert.False(t, status)
assert.True(t, wfUpdate)

mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore)
assert.NotNil(t, mutex)
assert.Len(t, mutex.mutex.pending.items, 2)
assert.Len(t, mutex.pending.items, 2)
concurrenyMgr.ReleaseAll(wf1)
assert.Len(t, mutex.mutex.pending.items, 1)
assert.Len(t, mutex.pending.items, 1)
concurrenyMgr.ReleaseAll(wf2)
assert.Len(t, mutex.mutex.pending.items, 0)
assert.Len(t, mutex.pending.items, 0)
})
}

Expand Down Expand Up @@ -692,7 +685,7 @@ func TestCheckWorkflowExistence(t *testing.T) {
_, _, _, _ = concurrenyMgr.TryAcquire(wfMutex1, "", wfMutex.Spec.Synchronization)
_, _, _, _ = concurrenyMgr.TryAcquire(wfSema, "", wfSema.Spec.Synchronization)
_, _, _, _ = concurrenyMgr.TryAcquire(wfSema1, "", wfSema.Spec.Synchronization)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore)
semaphore := concurrenyMgr.syncLockMap["default/ConfigMap/my-config/workflow"]

assert.Len(mutex.getCurrentHolders(), 1)
Expand Down

0 comments on commit 1f1bb7a

Please sign in to comment.