Skip to content

Commit

Permalink
Merge pull request #555 from openmeterio/benthos-presets
Browse files Browse the repository at this point in the history
feat: add benthos collector presets
  • Loading branch information
sagikazarmark authored Jan 23, 2024
2 parents d4e5725 + a801bfd commit 25835bf
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 5 deletions.
4 changes: 1 addition & 3 deletions benthos-collector.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ WORKDIR /etc/benthos

COPY cloudevents.spec.json /etc/benthos/

# COPY examples/http-server/input.yaml /etc/benthos/examples/http-server/input.yaml
# COPY examples/http-server/output.yaml /etc/benthos/examples/http-server/output.yaml
# COPY examples/kubernetes-pod-exec-time/config.yaml /etc/benthos/examples/kubernetes-pod-exec-time/config.yaml
COPY collector/benthos/presets /etc/benthos/presets

COPY --from=builder /usr/local/bin/benthos /usr/local/bin/

Expand Down
71 changes: 71 additions & 0 deletions collector/benthos/presets/http-server/input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
input:
http_server:
address: 0.0.0.0:4196
path: /api/v1/events
sync_response:
status: '${! meta("http_response_status").or("204") }'

pipeline:
processors:
- switch:
- check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json"
processors:
- unarchive:
format: json_array
- check: meta("Content-Type").lowercase() == "application/cloudevents+json"
processors:
- noop: {}
- check: ""
processors:
- log:
level: ERROR
message: 'Unexpected Content-Type: ${!meta("Content-Type")}'
- mapping: |
meta http_response_status = "400"
root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")),
}
- sync_response: {}
- mapping: "root = deleted()"
- json_schema:
schema_path: "file://./cloudevents.spec.json"
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- mapping: |
meta http_response_status = "400"
root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: %s".format(error()),
}
- sync_response: {}
- mapping: "root = deleted()"

output:
switch:
cases:
- check: ""
continue: true
output:
broker:
pattern: fan_out
outputs:
- sync_response: {}
processors:
- mapping: root = null
# https://github.com/benthosdev/benthos/discussions/2324
# https://github.com/benthosdev/benthos/issues/1946
- inproc: openmeter

- check: '"${DEBUG:false}" == "true"'
output:
stdout:
codec: lines
13 changes: 13 additions & 0 deletions collector/benthos/presets/http-server/output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
input:
inproc: openmeter

buffer:
memory: {}

output:
openmeter:
url: "${OPENMETER_URL:https://openmeter.cloud}"
token: "${OPENMETER_TOKEN:}"
batching:
count: ${BATCH_SIZE:20}
period: ${BATCH_PERIOD:}
52 changes: 52 additions & 0 deletions collector/benthos/presets/kubernetes-pod-exec-time/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
input:
schedule:
input:
kubernetes_resources:
resource:
version: v1
name: pods
namespaces:
- ${SCRAPE_NAMESPACE:default}
interval: "${SCRAPE_INTERVAL:15s}"

pipeline:
processors:
- mapping: |
root = {
"id": uuid_v4(),
"specversion": "1.0",
"type": "kube-pod-exec-time",
"source": "kubernetes-api",
"time": meta("schedule_time"),
"subject": this.metadata.annotations."openmeter.io/subject".or(this.metadata.name),
"data": this.metadata.annotations.filter(item -> item.key.has_prefix("data.openmeter.io/")).map_each_key(key -> key.trim_prefix("data.openmeter.io/")).assign({
"pod_name": this.metadata.name,
"pod_namespace": this.metadata.namespace,
"duration_seconds": (meta("schedule_interval").parse_duration() / 1000 / 1000 / 1000).round().int64(),
}),
}
- json_schema:
schema_path: "file://./cloudevents.spec.json"
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- mapping: "root = deleted()"

output:
switch:
cases:
- check: ""
continue: true
output:
openmeter:
url: "${OPENMETER_URL:https://openmeter.cloud}"
token: "${OPENMETER_TOKEN:}"
batching:
count: ${BATCH_SIZE:20}
period: ${BATCH_PERIOD:}

- check: '"${DEBUG:false}" == "true"'
output:
stdout:
codec: lines
4 changes: 2 additions & 2 deletions deploy/charts/benthos-collector/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ Create args for the deployment
["benthos", "-c", "{{ .Values.configFile }}"]
{{- else if .Values.preset }}
{{- if eq .Values.preset "http-server" -}}
["benthos", "streams", "--no-api", "/etc/benthos/examples/http-server/input.yaml", "/etc/benthos/examples/http-server/output.yaml"]
["benthos", "streams", "--no-api", "/etc/benthos/presets/http-server/input.yaml", "/etc/benthos/presets/http-server/output.yaml"]
{{- else if eq .Values.preset "kubernetes-pod-exec-time" -}}
["benthos", "-c", "/etc/benthos/examples/kubernetes-pod-exec-time/config.yaml"]
["benthos", "-c", "/etc/benthos/presets/kubernetes-pod-exec-time/config.yaml"]
{{- else }}
{{- fail (printf "Invalid example '%s" .Values.preset) }}
{{- end }}
Expand Down

0 comments on commit 25835bf

Please sign in to comment.