Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-8622 v1.8.0 argo controller to support DLQ feature #7

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ debug.test
site/
/go-diagrams/
argo-events
# ignore temp vi files
*.swo
*.swp
44 changes: 44 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
image: golang:1.20

.install_docker:
before_script:
- |
wget -O docker-cli.deb https://download.docker.com/linux/debian/dists/bullseye/pool/stable/amd64/docker-ce-cli_20.10.18~3-0~debian-bullseye_amd64.deb && \
dpkg -i docker-cli.deb && \
rm docker-cli.deb

variables:
MAJOR_VERSION: "0"
MINOR_VERSION: "1"
ORGANIZATION_NAME: "artificial-intelligence"
TEAM_NAME: "ai-platform"
ARGO_EVENTS_VERSION: "v1.8.0"
.build_templatized_docker_image:
before_script:
- !reference [.install_docker, before_script]
# Inject org & team names to conform to AIP naming conventions that make it possible to set
# permissions and retention policies at each level.
- IMAGE_REPOSITORY_PREFIX="${DOCKER_REPO_URL}/${ORGANIZATION_NAME}/${TEAM_NAME}"
# Ensure image tag is unique for each commit so we invalidate caching on machines pulling images.
- IMAGE_TAG="${ARGO_EVENTS_VERSION}-${MAJOR_VERSION}.${MINOR_VERSION}.${CI_PIPELINE_IID}"
# Actually set up the naming for the image we're currently building.
- IMAGE_REPOSITORY="${IMAGE_REPOSITORY_PREFIX}/${ARTIFACT_NAME}"
- IMAGE_REPOSITORY_TAG="${IMAGE_REPOSITORY}:${IMAGE_TAG}"
script:
- make build
- |
DOCKER_BUILDKIT=1 docker build \
--build-arg BUILDKIT_INLINE_CACHE=1 \
--build-arg "IMAGE_REPOSITORY_TAG=${IMAGE_REPOSITORY_TAG}" \
-t ${IMAGE_REPOSITORY_TAG} \
${CONTEXT}
- echo ${DOCKER_API_KEY} | docker login -u ${DOCKER_USERNAME} --password-stdin ${DOCKER_REPO_URL}
- docker push ${IMAGE_REPOSITORY_TAG}
stages:
- build:images
build:events:
extends: .build_templatized_docker_image
variables:
ARTIFACT_NAME: "argo-events"
CONTEXT: "./"
stage: build:images
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ K := $(foreach exec,$(EXECUTABLES), $(if $(shell which $(exec)),some string,$(er

# build
.PHONY: build
build: dist/$(BINARY_NAME)-linux-amd64.gz dist/$(BINARY_NAME)-linux-arm64.gz dist/$(BINARY_NAME)-linux-arm.gz dist/$(BINARY_NAME)-linux-ppc64le.gz dist/$(BINARY_NAME)-linux-s390x.gz
build: dist/$(BINARY_NAME)-linux-amd64.gz #dist/$(BINARY_NAME)-linux-arm64.gz dist/$(BINARY_NAME)-linux-arm.gz dist/$(BINARY_NAME)-linux-ppc64le.gz dist/$(BINARY_NAME)-linux-s390x.gz

dist/$(BINARY_NAME)-%.gz: dist/$(BINARY_NAME)-%
@[ -e dist/$(BINARY_NAME)-$*.gz ] || gzip -k dist/$(BINARY_NAME)-$*

dist/$(BINARY_NAME): GOARGS = GOOS= GOARCH=
dist/$(BINARY_NAME)-linux-amd64: GOARGS = GOOS=linux GOARCH=amd64
dist/$(BINARY_NAME)-linux-arm64: GOARGS = GOOS=linux GOARCH=arm64
dist/$(BINARY_NAME)-linux-arm: GOARGS = GOOS=linux GOARCH=arm
dist/$(BINARY_NAME)-linux-ppc64le: GOARGS = GOOS=linux GOARCH=ppc64le
dist/$(BINARY_NAME)-linux-s390x: GOARGS = GOOS=linux GOARCH=s390x
# dist/$(BINARY_NAME)-linux-arm64: GOARGS = GOOS=linux GOARCH=arm64
# dist/$(BINARY_NAME)-linux-arm: GOARGS = GOOS=linux GOARCH=arm
# dist/$(BINARY_NAME)-linux-ppc64le: GOARGS = GOOS=linux GOARCH=ppc64le
# dist/$(BINARY_NAME)-linux-s390x: GOARGS = GOOS=linux GOARCH=s390x

dist/$(BINARY_NAME):
go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/$(BINARY_NAME) ./cmd
Expand Down
4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4304,6 +4304,10 @@
"description": "AtLeastOnce determines the trigger execution semantics. Defaults to false. Trigger execution will use at-most-once semantics. If set to true, Trigger execution will switch to at-least-once semantics.",
"type": "boolean"
},
"dlqTrigger": {
"$ref": "#/definitions/io.argoproj.sensor.v1alpha1.Trigger",
"description": "If the trigger fails, it will retry up to the configured number of retries. If the maximum retries are reached and the trigger is set to execute atLeastOnce, the dead letter queue (DLQ) trigger will be invoked if specified. Invoking the dead letter queue trigger helps prevent data loss."
},
"parameters": {
"description": "Parameters is the list of parameters applied to the trigger template definition",
"items": {
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 36 additions & 7 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,52 @@ func validateTriggers(triggers []v1alpha1.Trigger) error {
trigNames := make(map[string]bool)

for _, trigger := range triggers {
if err := validateTriggerTemplate(trigger.Template); err != nil {
if err := validateTrigger(trigger); err != nil {
return err
}
if _, ok := trigNames[trigger.Template.Name]; ok {
return fmt.Errorf("duplicate trigger name: %s", trigger.Template.Name)
}
trigNames[trigger.Template.Name] = true
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
}
return nil
}

func validateTrigger(trigger v1alpha1.Trigger) error {
if err := validateTriggerTemplate(trigger.Template); err != nil {
return err
}
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
if err := validateDlqTrigger(&trigger); err != nil {
return err
}

return nil
}

// validateDlqTrigger validates trigger.atLeastOnce==true and the trigger.dlqTrigger
func validateDlqTrigger(trigger *v1alpha1.Trigger) error {
if trigger == nil {
return fmt.Errorf("trigger can't be nil")
}
if trigger.DlqTrigger == nil {
return nil
}
if !trigger.AtLeastOnce {
return fmt.Errorf("to use dlqTrigger, trigger.atLeastOnce must be set to true")
}
if !trigger.DlqTrigger.AtLeastOnce {
return fmt.Errorf("atLeastOnce must be set to true within the dlqTrigger")
}

return validateTrigger(*trigger.DlqTrigger)
}

// validateTriggerTemplate validates trigger template
func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error {
if template == nil {
Expand Down
80 changes: 80 additions & 0 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,86 @@ func TestValidTriggers(t *testing.T) {
assert.Equal(t, true, strings.Contains(err.Error(), "trigger template can't be nil"))
})

t.Run("vanilla dlqTrigger", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
AtLeastOnce: true,
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.Nil(t, err)
})

t.Run("!dlqTrigger.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "atLeastOnce must be set to true within the dlqTrigger"))
})

t.Run("dlqTrigger !.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "to use dlqTrigger, trigger.atLeastOnce must be set to true"))
})

t.Run("invalid conditions reset - cron", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Expand Down
40 changes: 40 additions & 0 deletions docs/sensors/more-about-sensors-and-triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,43 @@ spec:
# Optional
revisionHistoryLimit: 3
```

## Dead Letter Queue Trigger

To help avoid data loss and dropping a message on failure after all the retries are
exhausted, optionally, a `dlqTrigger` may be configured as following to invoke
any of the [10+ triggers](https://argoproj.github.io/argo-events/concepts/trigger/):

```yaml
spec:
triggers:
- template:
name: http-trigger
http:
url: https://xxxxx.com/
method: GET
# must be true for dlqTrigger
atLeastOnce: true
retryStrategy:
steps: 3
dlqTrigger:
template:
name: dlq-http-trigger
http:
url: https://xxxxx.com/
method: PUT
# must be true for dlqTrigger
atLeastOnce: true
# retries the dlqTrigger 5 times
retryStrategy:
steps: 5
```

If the trigger fails, it will retry up to the configured number of retries based
on `retryStrategy`. If the maximum retries are reached and the trigger, the
`dlqTrigger` will be invoked if specified. In order to use the `dlqTrigger`,
the `atLeastOnce` must be set to true within the trigger and the `dlqTrigger` for
the Sensor to know about the failure and invoke the `dlqTrigger`.

**note:** `dlqTrigger` is only available for the top level trigger and not
*recursively within the `dlqTrigger` template.
Loading