From dc20fdcba1e7cddaba3be7588bb762809019b3e6 Mon Sep 17 00:00:00 2001 From: Kathryn May <44557882+kathancox@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:00:47 -0500 Subject: [PATCH] Add CompressionLevel and make v2 Kafka sink default (#19169) --- .../compression-level-kafka-config.md | 1 + .../advanced-changefeed-configuration.md | 2 +- src/current/v24.3/changefeed-sinks.md | 108 ++++++++++++------ .../v24.3/create-and-configure-changefeeds.md | 1 + src/current/v24.3/known-limitations.md | 1 + 5 files changed, 80 insertions(+), 33 deletions(-) create mode 100644 src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md diff --git a/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md b/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md new file mode 100644 index 00000000000..0635319c5af --- /dev/null +++ b/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md @@ -0,0 +1 @@ +Changefeeds created in v24.3 of CockroachDB that emit to [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), or changefeeds created in earlier versions with the `changefeed.new_kafka_sink.enabled` cluster setting enabled, do not support negative compression level values for `GZIP` compression in the [`kafka_sink_config = {... "CompressionLevel" = ...}`]({% link {{ page.version.version }}/changefeed-sinks.md %}#compressionlevel) option field. [#136492](https://github.com/cockroachdb/cockroach/issues/136492) \ No newline at end of file diff --git a/src/current/v24.3/advanced-changefeed-configuration.md b/src/current/v24.3/advanced-changefeed-configuration.md index ff2322336a2..0a64c56e0f6 100644 --- a/src/current/v24.3/advanced-changefeed-configuration.md +++ b/src/current/v24.3/advanced-changefeed-configuration.md @@ -180,7 +180,7 @@ If you are setting the `resolved` option when you are aiming for high throughput ### Batching and buffering messages - Batch messages to your sink: - - For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-flush) parameter for the `kafka_sink_config` option. + - For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#flush) parameter for the `kafka_sink_config` option. - For a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), use the [`file_size`]({% link {{ page.version.version }}/create-changefeed.md %}#file-size) parameter to flush a file when it exceeds the specified size. - For a [webhook sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink-configuration) parameter for the `webhook_sink_config` option. - Set the [`changefeed.memory.per_changefeed_limit`]({% link {{ page.version.version }}/cluster-settings.md %}) cluster setting to a higher limit to give more memory for buffering changefeed data. This setting influences how often the changefeed will flush buffered messages. This is useful during heavy traffic. diff --git a/src/current/v24.3/changefeed-sinks.md b/src/current/v24.3/changefeed-sinks.md index 82d0dd06308..76d0497d9e3 100644 --- a/src/current/v24.3/changefeed-sinks.md +++ b/src/current/v24.3/changefeed-sinks.md @@ -40,17 +40,6 @@ To set a different sink URI to an existing changefeed, use the [`sink` option]({ ## Kafka -{{site.data.alerts.callout_info}} -CockroachDB uses a different version of the Kafka sink that is implemented with the [franz-go](https://github.com/twmb/franz-go) Kafka client library. If you are using a [testing release]({% link releases/index.md %}#patch-releases) of v24.2 or v24.2.0, we recommend that you enable this updated version of the Kafka sink to avoid a potential bug in the previous version of the CockroachDB Kafka sink; for more details, refer to the [technical advisory 122372]({% link advisories/a122372.md %}). You can enable this Kafka sink with the cluster setting [`changefeed.new_kafka_sink.enabled`]({% link v24.2/show-cluster-setting.md %}). - -{% include_cached copy-clipboard.html %} -~~~ sql -SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true; -~~~ - -If you are running v24.2.1 and later, the `changefeed.new_kafka_sink.enabled` cluster setting is enabled by default. -{{site.data.alerts.end}} - ### Kafka sink connection Example of a Kafka sink URI using `SCRAM-SHA-256` authentication: @@ -117,17 +106,41 @@ Kafka has the following topic limitations: ### Kafka sink configuration - The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters. +You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following: + +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). `changefeed.sink_io_workers` will also affect changefeeds running to [Google Cloud Pub/Sub](#google-cloud-pub-sub) sinks and [webhook sinks](#webhook-sink). + + {{site.data.alerts.callout_info}} + `changefeed.sink_io_workers` only applies to Kafka sinks created in v24.2.1+, or if the `changefeed.new_kafka_sink.enabled` cluster setting has been enabled in CockroachDB clusters running v23.2.10+ and v24.1.4+. + {{site.data.alerts.end}} + +- The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters. {{site.data.alerts.callout_danger}} Each of the following settings have significant impact on a changefeed's behavior, such as latency. For example, it is possible to configure batching parameters to be very high, which would negatively impact changefeed latency. As a result it would take a long time to see messages coming through to the sink. Also, large batches may be rejected by the Kafka server unless it's separately configured to accept a high [`max.message.bytes`](https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes). {{site.data.alerts.end}} ~~~ -kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP" }' +kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}' ~~~ -`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker. +Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately. + +The configurable fields are as follows: + +Field | Type | Description | Default +-------------------+---------------------+------------------+------------------- +`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. For more details, refer to [`ClientID`](#clientid). | "" +`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"` +New in v24.3:`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to [`CompressionLevel`](#compressionlevel).

**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | Refer to [`CompressionLevel`](#compressionlevel) +`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0` +`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. For more details, refer to [`Flush`](#flush). | `"0s"` +`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. For more details, refer to [`Flush`](#flush). | `1000` +`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0` +`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: `ONE`, `NONE`, `ALL`. For details on each value, refer to [`RequiredAcks`](#requiredacks). | `"ONE"` +`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"` + +#### `ClientID` Implement a Kafka resource usage limit per changefeed by setting a client ID and Kafka quota. You can set the quota for the client ID in your Kafka server's configuration: @@ -136,24 +149,55 @@ Implement a Kafka resource usage limit per changefeed by setting a client ID and bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name client-changefeed-1 ~~~ -Refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas) for details on setting quotas to client IDs. - When you create a changefeed, include the `"ClientID"` field with the unique client ID (e.g., `kafka_client_ID_1`) you have configured in your Kafka server configuration. This will subject the changefeed to the Kafka quota applied to that client ID. We recommend tracking the [`changefeed.kafka_throttling_hist_nanos` metric]({% link {{ page.version.version }}/metrics.md %}) to monitor the time spent throttling due to changefeed messages exceeding Kafka quotas. -Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately. - -The configurable fields are as follows: - -Field | Type | Description | Default --------------------+---------------------+------------------+------------------- -`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. | "" -`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. | `1000` -`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0` -`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0` -`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. | `"0s"` -`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"` -`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are:

`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.

`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.

`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"` -`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"` +For details on setting quotas to client IDs, refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas). + +#### `CompressionLevel` + +{% include_cached new-in.html version="v24.3" %} The `CompressionLevel` field allows you to implement a level of compression for your set `Compression` protocol. `CompressionLevel` determines the level of the compression ratio versus the compression speed. That is, how much the data is reduced for _better_ compression and how quickly the compression is completed for _faster_ compression. The compression protocols support the following values: + +- `GZIP`: + - `0`: No compression + - `1` to `9`: From fastest compression to best compression + {% comment %} + These values are not available yet per KL #136492 + - `-1`: Default compression + - `-2`: [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding) + - `-3`: Stateless compression + {% endcomment %} + The default compression level for `GZIP` is `-1`; however, the `CompressionLevel` field does **not** support manually set negative values. For more details, refer to [Known Limitations]({% link {{ page.version.version }}/create-and-configure-changefeeds.md %}#known-limitations). +- `ZSTD`: + - `1`: Fastest compression + - `2`: Default compression + - `3`: Better compression + - `4`: Best compression +- `LZ4`: The supported values from fastest compression to best compression: + - `0`: Fastest compression (Default) + - `512` + - `1024` + - `2048` + - `4096` + - `8192` + - `16384` + - `32768` + - `65536` + - `131072`: Best compression + + If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. +- `SNAPPY` does not support the `CompressionLevel` field. + +#### `Flush` + +`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker. + +#### `RequiredAcks` + +The `RequiredAcks` field defines what a successful write to Kafka is. The possible values are: + +- `"ONE"`: A write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails. +- `"NONE"`: No Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency. +- `"ALL"`: A quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} ### Kafka sink messages @@ -359,7 +403,7 @@ For a list of compatible parameters and options, refer to [Parameters]({% link { You can configure flushing, retry, and concurrency behavior of changefeeds running to a Pub/Sub sink with the following: -- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink). +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink) and [Kafka](#kafka). - Set the `pubsub_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `pubsub_sink_config` option's configurable fields, refer to the following table and examples. Field | Type | Description | Default @@ -562,7 +606,7 @@ The following are considerations when using the webhook sink: You can configure flushing, retry, and concurrency behavior of changefeeds running to a webhook sink with the following: -- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub). +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub) and [Kafka](#kafka). - Set the `webhook_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `webhook_sink_config` option's configurable fields, refer to the following table and examples. Field | Type | Description | Default diff --git a/src/current/v24.3/create-and-configure-changefeeds.md b/src/current/v24.3/create-and-configure-changefeeds.md index ea2e6cba14b..494c1653c58 100644 --- a/src/current/v24.3/create-and-configure-changefeeds.md +++ b/src/current/v24.3/create-and-configure-changefeeds.md @@ -168,6 +168,7 @@ For more information, see [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.versio - {% include {{ page.version.version }}/known-limitations/alter-changefeed-cdc-queries.md %} - {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %} - {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %} +- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %} ## See also diff --git a/src/current/v24.3/known-limitations.md b/src/current/v24.3/known-limitations.md index 9a7ea807fce..792ba38c91f 100644 --- a/src/current/v24.3/known-limitations.md +++ b/src/current/v24.3/known-limitations.md @@ -493,6 +493,7 @@ Change data capture (CDC) provides efficient, distributed, row-level changefeeds {% include {{ page.version.version }}/known-limitations/cdc-queries.md %} - {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %} - {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %} +- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %} #### `ALTER CHANGEFEED` limitations