diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index 33fb8e187..676ff2c53 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -2309,6 +2309,19 @@ "title": "Readinessprobe", "type": "object" }, + "replicaCount": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The number of Kafka Streams replicas.", + "title": "Replicacount" + }, "resources": { "allOf": [ { diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index ec795608f..6666255c7 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -1945,6 +1945,19 @@ "title": "Readinessprobe", "type": "object" }, + "replicaCount": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The number of Kafka Streams replicas.", + "title": "Replicacount" + }, "resources": { "allOf": [ { diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 93f696006..5b83c67a6 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -21,9 +21,10 @@ $ kpops [OPTIONS] COMMAND [ARGS]... * `generate`: Generate enriched pipeline representation * `init`: Initialize a new KPOps project. * `manifest`: Render final resource representation -* `patch`: Render final resource representation +* `pause`: Pauses the pipeline * `reset`: Reset pipeline steps * `schema`: Generate JSON schema. +* `sync`: Render final resource representation ## `kpops clean` @@ -170,14 +171,14 @@ $ kpops manifest [OPTIONS] PIPELINE_PATHS... * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] * `--help`: Show this message and exit. -## `kpops patch` +## `kpops pause` In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests. **Usage**: ```console -$ kpops patch [OPTIONS] PIPELINE_PATHS... +$ kpops pause [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: @@ -251,3 +252,25 @@ $ kpops schema [OPTIONS] SCOPE:{pipeline|defaults|config} **Options**: * `--help`: Show this message and exit. + +## `kpops sync` + +In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests. + +**Usage**: + +```console +$ kpops sync [OPTIONS] PIPELINE_PATHS... +``` + +**Arguments**: + +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] + +**Options**: + +* `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] +* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] +* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] +* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] +* `--help`: Show this message and exit. diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 640dc6957..71a3bdc47 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -386,7 +386,7 @@ def init( init_project(path, config_include_opt) -def patch( +def sync( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), @@ -408,6 +408,28 @@ def patch( return resources +def pause( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + environment: str | None = None, + verbose: bool = True, +) -> list[Resource]: + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + environment=environment, + verbose=verbose, + ) + resources: list[Resource] = [] + + for component in pipeline.components: + resource = component.manifest_pause() + resources.append(resource) + return resources + + def _create_pipeline( pipeline_path: Path, kpops_config: KpopsConfig, diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 269eeef65..809a31342 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -342,7 +342,7 @@ def clean( short_help="Render final resource representation", help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", ) -def patch( +def sync( pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: list[Path] | None = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, @@ -350,7 +350,31 @@ def patch( verbose: bool = VERBOSE_OPTION, ): for pipeline_file_path in collect_pipeline_paths(pipeline_paths): - resources = kpops.patch( + resources = kpops.sync( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + environment=environment, + verbose=verbose, + ) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest) + + +@app.command( + short_help="Pauses the pipeline", + help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", +) +def pause( + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, + dotenv: list[Path] | None = DOTENV_PATH_OPTION, + config: Path = CONFIG_PATH_OPTION, + environment: str | None = ENVIRONMENT, + verbose: bool = VERBOSE_OPTION, +): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + resources = kpops.pause( pipeline_path=pipeline_file_path, dotenv=dotenv, config=config, diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index aeea18130..2f3e8b65b 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -250,6 +250,10 @@ def manifest_clean(self) -> Resource: """Render final component resources, e.g. Kubernetes manifests.""" return [] + def manifest_pause(self) -> Resource: + """Render final component resources, e.g. Kubernetes manifests.""" + return [] + async def deploy(self, dry_run: bool) -> None: """Deploy component, e.g. to Kubernetes cluster. diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 88cbbbab9..5e2fe78c1 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -304,6 +304,7 @@ class StreamsAppValues(StreamsBootstrapValues): The attributes correspond to keys and values that are used as values for the streams bootstrap helm chart. + :param replica_count: The number of Kafka Streams replicas. :param kafka: streams-bootstrap kafka section :param autoscaling: Kubernetes event-driven autoscaling config, defaults to None :param stateful_set: Whether to use a StatefulSet instead of a Deployment to deploy the streams app. @@ -313,6 +314,11 @@ class StreamsAppValues(StreamsBootstrapValues): :param termination_grace_period_seconds: Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html """ + replica_count: int | None = Field( + default=None, + description=describe_attr("replica_count", __doc__), + ) + kafka: StreamsConfig = Field( description=describe_attr("kafka", __doc__), ) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 4ea2417a6..7ccdee18d 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -200,3 +200,11 @@ def manifest_clean(self) -> Resource: self.namespace, values, ) + + def manifest_pause(self) -> Resource: + autoscaling = self.values.autoscaling + if autoscaling and autoscaling.enabled: + autoscaling.max_replicas = 0 + else: + self.values.replica_count = 0 + return self.manifest_deploy() diff --git a/tests/pipeline/snapshots/test_manifest/test_pause/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_pause/manifest.yaml new file mode 100644 index 000000000..38deb77d9 --- /dev/null +++ b/tests/pipeline/snapshots/test_manifest/test_pause/manifest.yaml @@ -0,0 +1,221 @@ +--- +apiVersion: v1 +data: + jmx-kafka-streams-app-prometheus.yml: "jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi\n\ + lowercaseOutputName: true\nlowercaseOutputLabelNames: true\nssl: false\nrules:\n\ + \ - pattern: \".*\"\n" +kind: ConfigMap +metadata: + labels: + app: resources-streams-bootstrap-argo-my-streams-app + chart: streams-app-3.0.1 + heritage: Helm + release: resources-streams-bootstrap-argo-my-streams-app + name: resources-streams-bootstrap-argo-my-streams-app-jmx-configmap + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + annotations: + argocd.argoproj.io/sync-wave: '1' + consumerGroup: my-streams-app-id + labels: + app: resources-streams-bootstrap-argo-my-streams-app + chart: streams-app-3.0.1 + release: resources-streams-bootstrap-argo-my-streams-app + name: resources-streams-bootstrap-argo-my-streams-app +spec: + replicas: 0 + selector: + matchLabels: + app: resources-streams-bootstrap-argo-my-streams-app + release: resources-streams-bootstrap-argo-my-streams-app + template: + metadata: + annotations: + prometheus.io/port: '5556' + prometheus.io/scrape: 'true' + labels: + app: resources-streams-bootstrap-argo-my-streams-app + release: resources-streams-bootstrap-argo-my-streams-app + spec: + containers: + - env: + - name: ENV_PREFIX + value: APP_ + - name: KAFKA_LARGE_MESSAGE_ID_GENERATOR + value: com.bakdata.kafka.MurmurHashIdGenerator + - name: KAFKA_JMX_PORT + value: '5555' + - name: APP_VOLATILE_GROUP_INSTANCE_ID + value: 'true' + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_INPUT_TOPICS + value: producer-1-output-topic + - name: APP_INPUT_PATTERN + value: my-input-pattern + - name: APP_OUTPUT_TOPIC + value: streams-app-1-output-topic + - name: APP_ERROR_TOPIC + value: streams-app-1-error-topic + - name: APP_LABELED_OUTPUT_TOPICS + value: my-output-topic-label=streams-app-1-labeled-output-topic, + - name: APP_LABELED_INPUT_TOPICS + value: producer-1-labeled-topic-output=producer-1-labeled-topic-output, + - name: APP_LABELED_INPUT_PATTERNS + value: my-input-topic-labeled-pattern=my-labeled-input-pattern, + - name: APP_APPLICATION_ID + value: my-streams-app-id + - name: APP_CONVERT_XML + value: 'true' + - name: JAVA_TOOL_OPTIONS + value: '-Dcom.sun.management.jmxremote.port=5555 -Dcom.sun.management.jmxremote.authenticate=false + -Dcom.sun.management.jmxremote.ssl=false -XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-streams-app-image:1.0.0 + imagePullPolicy: Always + name: resources-streams-bootstrap-argo-my-streams-app + ports: + - containerPort: 5555 + name: jmx + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + - command: + - java + - -XX:+UnlockExperimentalVMOptions + - -XX:+UseCGroupMemoryLimitForHeap + - -XX:MaxRAMFraction=1 + - -XshowSettings:vm + - -jar + - jmx_prometheus_httpserver.jar + - '5556' + - /etc/jmx-streams-app/jmx-kafka-streams-app-prometheus.yml + image: solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143 + name: prometheus-jmx-exporter + ports: + - containerPort: 5556 + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + volumeMounts: + - mountPath: /etc/jmx-streams-app + name: jmx-config + terminationGracePeriodSeconds: 300 + volumes: + - configMap: + name: resources-streams-bootstrap-argo-my-streams-app-jmx-configmap + name: jmx-config + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + name: streams-app-1-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + name: streams-app-1-error-topic +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + name: streams-app-1-labeled-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: batch/v1 +kind: Job +metadata: + annotations: + argocd.argoproj.io/hook: PostDelete + labels: + app: resources-streams-bootstrap-argo-my-streams-app + chart: streams-app-cleanup-job-3.0.2 + release: resources-streams-bootstrap-argo-my-streams-app-clean + name: resources-streams-bootstrap-argo-my-streams-app +spec: + backoffLimit: 6 + template: + metadata: + labels: + app: resources-streams-bootstrap-argo-my-streams-app + release: resources-streams-bootstrap-argo-my-streams-app-clean + spec: + containers: + - args: + - reset + env: + - name: ENV_PREFIX + value: APP_ + - name: KAFKA_LARGE_MESSAGE_ID_GENERATOR + value: com.bakdata.kafka.MurmurHashIdGenerator + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_INPUT_TOPICS + value: producer-1-output-topic + - name: APP_INPUT_PATTERN + value: my-input-pattern + - name: APP_OUTPUT_TOPIC + value: streams-app-1-output-topic + - name: APP_ERROR_TOPIC + value: streams-app-1-error-topic + - name: APP_LABELED_OUTPUT_TOPICS + value: my-output-topic-label=streams-app-1-labeled-output-topic, + - name: APP_LABELED_INPUT_TOPICS + value: producer-1-labeled-topic-output=producer-1-labeled-topic-output, + - name: APP_LABELED_INPUT_PATTERNS + value: my-input-topic-labeled-pattern=my-labeled-input-pattern, + - name: APP_APPLICATION_ID + value: my-streams-app-id + - name: APP_CONVERT_XML + value: 'true' + - name: JAVA_TOOL_OPTIONS + value: '-XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-streams-app-image:1.0.0 + imagePullPolicy: Always + name: resources-streams-bootstrap-argo-my-streams-app + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + restartPolicy: OnFailure + ttlSecondsAfterFinished: 30 + diff --git a/tests/pipeline/snapshots/test_manifest/test_patch/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_sync/manifest.yaml similarity index 100% rename from tests/pipeline/snapshots/test_manifest/test_patch/manifest.yaml rename to tests/pipeline/snapshots/test_manifest/test_sync/manifest.yaml diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index e003c36e8..e287bffa3 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -193,11 +193,25 @@ def test_streams_bootstrap_manifest_clean(self, snapshot: Snapshot): assert result.exit_code == 0, result.stdout snapshot.assert_match(result.stdout, MANIFEST_YAML) - def test_patch(self, snapshot: Snapshot): + def test_sync(self, snapshot: Snapshot): result = runner.invoke( app, [ - "patch", + "sync", + str(RESOURCE_PATH / "streams-bootstrap-argo" / PIPELINE_YAML), + "--config", + str(RESOURCE_PATH / "streams-bootstrap-argo"), + ], + catch_exceptions=False, + ) + assert result.exit_code == 0, result.stdout + snapshot.assert_match(result.stdout, MANIFEST_YAML) + + def test_pause(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "pause", str(RESOURCE_PATH / "streams-bootstrap-argo" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "streams-bootstrap-argo"),