diff --git a/benthos-collector.Dockerfile b/benthos-collector.Dockerfile index 1ab8f241c..23b56c437 100644 --- a/benthos-collector.Dockerfile +++ b/benthos-collector.Dockerfile @@ -30,9 +30,7 @@ WORKDIR /etc/benthos COPY cloudevents.spec.json /etc/benthos/ -# COPY examples/http-server/input.yaml /etc/benthos/examples/http-server/input.yaml -# COPY examples/http-server/output.yaml /etc/benthos/examples/http-server/output.yaml -# COPY examples/kubernetes-pod-exec-time/config.yaml /etc/benthos/examples/kubernetes-pod-exec-time/config.yaml +COPY collector/benthos/presets /etc/benthos/presets COPY --from=builder /usr/local/bin/benthos /usr/local/bin/ diff --git a/collector/benthos/presets/http-server/input.yaml b/collector/benthos/presets/http-server/input.yaml new file mode 100644 index 000000000..9186d7568 --- /dev/null +++ b/collector/benthos/presets/http-server/input.yaml @@ -0,0 +1,71 @@ +input: + http_server: + address: 0.0.0.0:4196 + path: /api/v1/events + sync_response: + status: '${! meta("http_response_status").or("204") }' + +pipeline: + processors: + - switch: + - check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json" + processors: + - unarchive: + format: json_array + - check: meta("Content-Type").lowercase() == "application/cloudevents+json" + processors: + - noop: {} + - check: "" + processors: + - log: + level: ERROR + message: 'Unexpected Content-Type: ${!meta("Content-Type")}' + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")), + } + - sync_response: {} + - mapping: "root = deleted()" + - json_schema: + schema_path: "file://./cloudevents.spec.json" + - catch: + - log: + level: ERROR + message: "Schema validation failed due to: ${!error()}" + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: %s".format(error()), + } + - sync_response: {} + - mapping: "root = deleted()" + +output: + switch: + cases: + - check: "" + continue: true + output: + broker: + pattern: fan_out + outputs: + - sync_response: {} + processors: + - mapping: root = null + # https://github.com/benthosdev/benthos/discussions/2324 + # https://github.com/benthosdev/benthos/issues/1946 + - inproc: openmeter + + - check: '"${DEBUG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/collector/benthos/presets/http-server/output.yaml b/collector/benthos/presets/http-server/output.yaml new file mode 100644 index 000000000..7c6b12c4d --- /dev/null +++ b/collector/benthos/presets/http-server/output.yaml @@ -0,0 +1,13 @@ +input: + inproc: openmeter + +buffer: + memory: {} + +output: + openmeter: + url: "${OPENMETER_URL:https://openmeter.cloud}" + token: "${OPENMETER_TOKEN:}" + batching: + count: ${BATCH_SIZE:20} + period: ${BATCH_PERIOD:} diff --git a/collector/benthos/presets/kubernetes-pod-exec-time/config.yaml b/collector/benthos/presets/kubernetes-pod-exec-time/config.yaml new file mode 100644 index 000000000..b189c5fb4 --- /dev/null +++ b/collector/benthos/presets/kubernetes-pod-exec-time/config.yaml @@ -0,0 +1,52 @@ +input: + schedule: + input: + kubernetes_resources: + resource: + version: v1 + name: pods + namespaces: + - ${SCRAPE_NAMESPACE:default} + interval: "${SCRAPE_INTERVAL:15s}" + +pipeline: + processors: + - mapping: | + root = { + "id": uuid_v4(), + "specversion": "1.0", + "type": "kube-pod-exec-time", + "source": "kubernetes-api", + "time": meta("schedule_time"), + "subject": this.metadata.annotations."openmeter.io/subject".or(this.metadata.name), + "data": this.metadata.annotations.filter(item -> item.key.has_prefix("data.openmeter.io/")).map_each_key(key -> key.trim_prefix("data.openmeter.io/")).assign({ + "pod_name": this.metadata.name, + "pod_namespace": this.metadata.namespace, + "duration_seconds": (meta("schedule_interval").parse_duration() / 1000 / 1000 / 1000).round().int64(), + }), + } + - json_schema: + schema_path: "file://./cloudevents.spec.json" + - catch: + - log: + level: ERROR + message: "Schema validation failed due to: ${!error()}" + - mapping: "root = deleted()" + +output: + switch: + cases: + - check: "" + continue: true + output: + openmeter: + url: "${OPENMETER_URL:https://openmeter.cloud}" + token: "${OPENMETER_TOKEN:}" + batching: + count: ${BATCH_SIZE:20} + period: ${BATCH_PERIOD:} + + - check: '"${DEBUG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/deploy/charts/benthos-collector/templates/_helpers.tpl b/deploy/charts/benthos-collector/templates/_helpers.tpl index 0b7642ee2..5e596dda5 100644 --- a/deploy/charts/benthos-collector/templates/_helpers.tpl +++ b/deploy/charts/benthos-collector/templates/_helpers.tpl @@ -84,9 +84,9 @@ Create args for the deployment ["benthos", "-c", "{{ .Values.configFile }}"] {{- else if .Values.preset }} {{- if eq .Values.preset "http-server" -}} -["benthos", "streams", "--no-api", "/etc/benthos/examples/http-server/input.yaml", "/etc/benthos/examples/http-server/output.yaml"] +["benthos", "streams", "--no-api", "/etc/benthos/presets/http-server/input.yaml", "/etc/benthos/presets/http-server/output.yaml"] {{- else if eq .Values.preset "kubernetes-pod-exec-time" -}} -["benthos", "-c", "/etc/benthos/examples/kubernetes-pod-exec-time/config.yaml"] +["benthos", "-c", "/etc/benthos/presets/kubernetes-pod-exec-time/config.yaml"] {{- else }} {{- fail (printf "Invalid example '%s" .Values.preset) }} {{- end }}