Skip to content

Commit

Permalink
feat!: multiple mutexes and semaphores
Browse files Browse the repository at this point in the history
Fixes #5022
Fixes #11859

This is mostly backwards compatbile. It is minorly breaking only in as
much as semaphore + mutex at the same time will now require both,
whereas before it ignored the mutex.

As a workflows users I would like to be able to use more than one sync
option in a workflow or template.

Added `mutexes` and `semaphores` to the `synchronization` block in
workflow types as lists. Legacy `mutex` and `semaphore` kept in and
continue to work (will be added to the mutexes/semaphores list) but
marked for deprecation.

Logic changed to acquire a full list of all synchronization
items (`syncItem`) when `TryAcquire` and `Release` from sync manager.

`TryAcquire` from sync_manager` calls a new `checkAcquire` method,
whilst holding the sync_manager internal golang `sync.Mutex` to check
if they are all acquirable before acquiring them. This honors
priority.

`TryAcquire` also returns the failed lock name in the case of failure
to acquire to reduce the number of exported `package sync` methods and
complexity outside of `sync`.

The interface to the workflow status block has not changed.

Reduced the exported types and methods from the sync module as far as
possible (lower case instead of upper case).

Removed the internal golang sync mutex from `PrioritySemaphore` as all
methods are called from `sync_manager.go` when under it's own
lock. The alternative was unnecessarily complicating the calls inside
semaphore.go. This is faster, but less clean in case someone wants to
reuse the PrioritySemaphore type.

Renamed the manager in `sync_manager.go` where used: `cm`->`sm` and
`concurrencyManager`->`syncManager`.

New and updated unit tests.
New and updated e2e smoke tests.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Jul 17, 2024
1 parent e662199 commit 6f34440
Show file tree
Hide file tree
Showing 55 changed files with 3,428 additions and 1,188 deletions.
1 change: 1 addition & 0 deletions .spelling
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ memoizing
metadata
minikube
mutex
mutexes
namespace
namespaces
natively
Expand Down
18 changes: 16 additions & 2 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions docs/executor_swagger.md
Original file line number Diff line number Diff line change
Expand Up @@ -4416,7 +4416,9 @@ Could also be a Duration, e.g.: "2m", "6h" | |
| Name | Type | Go type | Required | Default | Description | Example |
|------|------|---------|:--------:| ------- |-------------|---------|
| mutex | [Mutex](#mutex)| `Mutex` | | | | |
| mutexes | [][Mutex](#mutex)| `[]*Mutex` | | | Mutexes holds the list of Mutex lock details | |
| semaphore | [SemaphoreRef](#semaphore-ref)| `SemaphoreRef` | | | | |
| semaphores | [][SemaphoreRef](#semaphore-ref)| `[]*SemaphoreRef` | | | Semaphores holds the list of Semaphores configuration | |



Expand Down
22 changes: 4 additions & 18 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -1632,8 +1632,10 @@ Synchronization holds synchronization lock configuration
### Fields
| Field Name | Field Type | Description |
|:----------:|:----------:|---------------|
|`mutex`|[`Mutex`](#mutex)|Mutex holds the Mutex lock details|
|`semaphore`|[`SemaphoreRef`](#semaphoreref)|Semaphore holds the Semaphore configuration|
|`mutex`|[`Mutex`](#mutex)|Mutex holds the Mutex lock details - deprecated, use mutexes instead|
|`mutexes`|`Array<`[`Mutex`](#mutex)`>`|Mutexes holds the list of Mutex lock details|
|`semaphore`|[`SemaphoreRef`](#semaphoreref)|Semaphore holds the Semaphore configuration - deprecated, use semaphores instead|
|`semaphores`|`Array<`[`SemaphoreRef`](#semaphoreref)`>`|Semaphores holds the list of Semaphores configuration|

## Template

Expand Down Expand Up @@ -2349,14 +2351,6 @@ Backoff is a backoff strategy to use within retryStrategy

Mutex holds Mutex configuration

<details markdown>
<summary>Examples with this field (click to open)</summary>

- [`synchronization-mutex-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-tmpl-level.yaml)

- [`synchronization-mutex-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-wf-level.yaml)
</details>

### Fields
| Field Name | Field Type | Description |
|:----------:|:----------:|---------------|
Expand Down Expand Up @@ -3231,14 +3225,6 @@ NodeSynchronizationStatus stores the status of a node

MutexStatus contains which objects hold mutex locks, and which objects this workflow is waiting on to release locks.

<details markdown>
<summary>Examples with this field (click to open)</summary>

- [`synchronization-mutex-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-tmpl-level.yaml)

- [`synchronization-mutex-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-wf-level.yaml)
</details>

### Fields
| Field Name | Field Type | Description |
|:----------:|:----------:|---------------|
Expand Down
38 changes: 38 additions & 0 deletions docs/parallelism.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Limiting parallelism

You can restrict the number of workflows being executed at any time using a number of mechanisms

## Controller level

You can limit the total number of workflows that can execute at any one time in the [workflow controller ConfigMap](./workflow-controller-configmap.yaml).

```yaml
data:
parallelism: "10"
```
You can also limit the number of workflows that can execute in a single namespace.
```yaml
data:
namespaceParallelism: "4"
```
Workflows that are executing but restricted from running more nodes due to the other mechanisms on this page will still count towards the parallelism limits.
## Workflow level
You can restrict parallelism within a workflow using `parallelism` within a workflow or template.
This only restricts total concurrent executions of steps or tasks within the same workflow.

Examples

1 [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/parallelism-limit.yaml) restricts the parallelism of a [loop](./walk-through/loops.md)
1 [`parallelism-nested.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/parallelism-nested.yaml) restricts the parallelism of a nested loop
1 [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/parallelism-nested-dag.yaml) restricts the number of dag tasks that can be run at any one time
1 [`parallelism-nested-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/parallelism-nested-workflow.yaml) shows how parallelism is inherited by children
1 [`parallelism-template-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/main/examples/parallelism-template-limit.yaml) shows how parallelism of looped templates is also restricted

## Synchronization

You can use [mutexes and semaphores](./synchronization.md) to control the parallel execution of sections of a workflow.
115 changes: 84 additions & 31 deletions docs/synchronization.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# Synchronization

> v2.10 and after
> v3.6 for multiple
## Introduction

Synchronization enables users to limit the parallel execution of certain workflows or
templates within a workflow without having to restrict others.
You can use synchronization to limit the parallel execution of workflows or templates.
You can use mutexes to restrict workflows or templates to only having a single concurrent section.
You can use semaphores to restrict workflows or templates to a configured number of parallel runs.

Users can create multiple synchronization configurations in the `ConfigMap` that can be referred to
from a workflow or template within a workflow. Alternatively, users can
configure a mutex to prevent concurrent execution of templates or
workflows using the same mutex.
You can create multiple synchronization configurations in the `ConfigMap` that can be referred to from a workflow or template.

For example:

Expand All @@ -26,9 +25,10 @@ data:
### Workflow-level Synchronization
Workflow-level synchronization limits parallel execution of the workflow if workflows have the same synchronization reference.
In this example, Workflow refers to `workflow` synchronization key which is configured as limit 1,
so only one workflow instance will be executed at given time even multiple workflows created.
You can limit parallel execution of a workflow by using Workflow-level synchronization.
If multiple workflows have the same synchronization reference they will be limited by that synchronization reference.
In this example, Workflow refers to `workflow` synchronization key which is configured as limit `"1"`, so only one workflow instance will be executed at given time even if multiple workflows are created.

Using a semaphore configured by a `ConfigMap`:

Expand All @@ -40,10 +40,10 @@ metadata:
spec:
entrypoint: whalesay
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: workflow
semaphores:
- configMapKeyRef:
name: my-config
key: workflow
templates:
- name: whalesay
container:
Expand All @@ -52,7 +52,7 @@ spec:
args: ["hello world"]
```

Using a mutex:
Using a mutex achieves the same thing as a count `"1"` semaphore:

```yaml
apiVersion: argoproj.io/v1alpha1
Expand All @@ -62,8 +62,8 @@ metadata:
spec:
entrypoint: whalesay
synchronization:
mutex:
name: workflow
mutexes:
- name: workflow
templates:
- name: whalesay
container:
Expand All @@ -74,9 +74,11 @@ spec:

### Template-level Synchronization

Template-level synchronization limits parallel execution of the template across workflows, if templates have the same synchronization reference.
In this example, `acquire-lock` template has synchronization reference of `template` key which is configured as limit 2,
so two instances of templates will be executed at a given time: even multiple steps/tasks within workflow or different workflows referring to the same template.
You can limit parallel execution of a template by using Template-level synchronization.
If templates have the same synchronization reference they will be limited by that synchronization reference, across all workflows.

In this example, `acquire-lock` template has synchronization reference of `template` key which is configured as limit `"2"` so a maximum of two instances of the `acquire-lock` template will be executed at a given time.
This applies even multiple steps or tasks within a workflow or different workflows refer to the same template.

Using a semaphore configured by a `ConfigMap`:

Expand All @@ -100,17 +102,17 @@ spec:
- name: acquire-lock
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: template
semaphores:
- configMapKeyRef:
name: my-config
key: template
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 10; echo acquired lock"]
```

Using a mutex:
Using a mutex will limit to a single execution of the template at any one time:

```yaml
apiVersion: argoproj.io/v1alpha1
Expand All @@ -132,8 +134,8 @@ spec:
- name: acquire-lock
synchronization:
mutex:
name: template
mutexes:
- name: template
container:
image: alpine:latest
command: [sh, -c]
Expand All @@ -142,13 +144,64 @@ spec:

Examples:

1. [Workflow level semaphore](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-wf-level.yaml)
1. [Workflow level semaphore](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-wf-level.yamlxb)
1. [Workflow level mutex](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-wf-level.yaml)
1. [Step level semaphore](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-tmpl-level.yaml)
1. [Step level mutex](https://github.com/argoproj/argo-workflows/blob/main/examples/synchronization-mutex-tmpl-level.yaml)

### Other Parallelism support
### Multiple synchronization

You can specify multiple mutexes and semaphores to lock in a single workflow or template.

```yaml
synchronization:
mutexes:
- name: alpha
- name: beta
semaphores:
- configMapKeyRef:
key: foo
name: my-config
- configMapKeyRef:
key: bar
name: my-config
```

The workflow will block until all of these mutexes and semaphores are available.

### Priority

Workflows can have a `priority` set in their specification.
Workflows with a higher priority value will be queued to take a semaphore or mutex before a lower priority workflow, even if they have been waiting for less time.

!!! Warning
A high priority workflow waiting on multiple mutexes or semaphore will make all other workflows which want to acquire those mutexes wait for it to acquire and release all the mutexes or semaphores it is waiting on.
This applies even if the lower priority workflows only wish to acquire a subset of those mutexes or semaphores.

### Legacy

In workflows prior to 3.6 you can only specify one mutex or semaphore to lock in any one workflow or template using either a mutex:

```yaml
synchronization:
mutex:
...
```

or a semaphore:

```yaml
synchronizaion:
semamphore:
...
```

Specifying both would not work in <3.6, only the semaphore would be used.

The single `mutex` and `semaphore` syntax still works in version 3.6 but is considered deprecated.
Both mutex and semaphore will be taken in version 3.6 with this syntax.
This syntax can be mixed with `mutexes` and `semaphores`, all mutexes and semaphores will be used.

## Parallelism

In addition to this synchronization, the workflow controller supports a parallelism setting that applies to all workflows
in the system (it is not granular to a class of workflows, or tasks withing them). Furthermore, there is a parallelism setting
at the workflow and template level, but this only restricts total concurrent executions of tasks within the same workflow.
See also [how you can restrict parallelism](./parallelism.md) in other ways.
10 changes: 5 additions & 5 deletions examples/synchronization-mutex-tmpl-level.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ spec:

- name: acquire-lock
synchronization:
mutex:
name: welcome
mutexes:
- name: welcome
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 20; echo acquired lock"]

- name: acquire-lock-1
synchronization:
mutex:
name: test
mutexes:
- name: test
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 50; echo acquired lock"]
args: ["sleep 50; echo acquired lock"]
4 changes: 2 additions & 2 deletions examples/synchronization-mutex-wf-level.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ metadata:
spec:
entrypoint: whalesay
synchronization:
mutex:
name: test
mutexes:
- name: test
templates:
- name: whalesay
container:
Expand Down
8 changes: 4 additions & 4 deletions examples/synchronization-tmpl-level.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ spec:

- name: acquire-lock
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: template
semaphores:
- configMapKeyRef:
name: my-config
key: template
container:
image: alpine:latest
command: [sh, -c]
Expand Down
Loading

0 comments on commit 6f34440

Please sign in to comment.