diff --git a/docs/content/connector-sink.md b/docs/content/connector-sink.md index 6ac099d3..ae31523b 100644 --- a/docs/content/connector-sink.md +++ b/docs/content/connector-sink.md @@ -86,34 +86,35 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency ## Options -| **Option** | **Required** | **Default value** | **Description** | -|-----------------------------------|--------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connector | Yes | NONE | The value must be "starrocks". | -| jdbc-url | Yes | NONE | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://:,:,:`. | -| load-url | Yes | NONE | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `:;:`. | -| database-name | Yes | NONE | The name of the StarRocks database into which you want to load data. | -| table-name | Yes | NONE | The name of the table that you want to use to load data into StarRocks. | -| username | Yes | NONE | The username of the account that you want to use to load data into StarRocks. The account needs [SELECT and INSERT privileges](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/account-management/GRANT). | -| password | Yes | NONE | The password of the preceding account. | -| sink.semantic | No | at-least-once | The semantic guaranteed by sink. Valid values: **at-least-once** and **exactly-once**. | +| **Option** | **Required** | **Default value** | **Description** | +|-----------------------------------|--------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | Yes | NONE | The value must be "starrocks". | +| jdbc-url | Yes | NONE | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://:,:,:`. | +| load-url | Yes | NONE | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `:;:`. | +| database-name | Yes | NONE | The name of the StarRocks database into which you want to load data. | +| table-name | Yes | NONE | The name of the table that you want to use to load data into StarRocks. | +| username | Yes | NONE | The username of the account that you want to use to load data into StarRocks. The account needs [SELECT and INSERT privileges](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/account-management/GRANT). | +| password | Yes | NONE | The password of the preceding account. | +| sink.semantic | No | at-least-once | The semantic guaranteed by sink. Valid values: **at-least-once** and **exactly-once**. | | sink.version | No | AUTO | The interface used to load data. This parameter is supported from Flink connector version 1.2.4 onwards.
  • `V1`: Use [Stream Load](https://docs.starrocks.io/en-us/latest/loading/StreamLoad) interface to load data. Connectors before 1.2.4 only support this mode.
  • `V2`: Use [Transaction Stream Load](https://docs.starrocks.io/en-us/latest/loading/Stream_Load_transaction_interface) interface to load data. It requires StarRocks to be at least version 2.4. Recommends `V2` because it optimizes the memory usage and provides a more stable exactly-once implementation.
  • `AUTO`: If the version of StarRocks supports transaction Stream Load, will choose `V2` automatically, otherwise choose `V1`
| -| sink.label-prefix | No | NONE | The label prefix used by Stream Load. Recommend to configure it if you are using exactly-once with connector 1.2.8 and later. See [exactly-once usage notes](#exactly-once). | -| sink.buffer-flush.max-bytes | No | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. This parameter only takes effect when `sink.semantic` is set to `at-least-once`. If `sink.semantic` is set to `exactly-once`, the data in memory is flushed when a Flink checkpoint is triggered. In this circumstance, this parameter does not take effect. | -| sink.buffer-flush.max-rows | No | 500000 | The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when `sink.version` is `V1` and `sink.semantic` is `at-least-once`. Valid values: 64000 to 5000000. | -| sink.buffer-flush.interval-ms | No | 300000 | The interval at which data is flushed. This parameter is available only when `sink.semantic` is `at-least-once`. Valid values: 1000 to 3600000. Unit: ms. | -| sink.max-retries | No | 3 | The number of times that the system retries to perform the Stream Load job. This parameter is available only when you set `sink.version` to `V1`. Valid values: 0 to 10. | +| sink.label-prefix | No | NONE | The label prefix used by Stream Load. Recommend to configure it if you are using exactly-once with connector 1.2.8 and later. See [exactly-once usage notes](#exactly-once). | +| sink.buffer-flush.max-bytes | No | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. This parameter only takes effect when `sink.semantic` is set to `at-least-once`. If `sink.semantic` is set to `exactly-once`, the data in memory is flushed when a Flink checkpoint is triggered. In this circumstance, this parameter does not take effect. | +| sink.buffer-flush.max-rows | No | 500000 | The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when `sink.version` is `V1` and `sink.semantic` is `at-least-once`. Valid values: 64000 to 5000000. | +| sink.buffer-flush.interval-ms | No | 300000 | The interval at which data is flushed. This parameter is available only when `sink.semantic` is `at-least-once`. Valid values: 1000 to 3600000. Unit: ms. | +| sink.max-retries | No | 3 | The number of times that the system retries to perform the Stream Load job. This parameter is available only when you set `sink.version` to `V1`. Valid values: 0 to 10. | | sink.connect.timeout-ms | No | 30000 | The timeout for establishing HTTP connection. Valid values: 100 to 60000. Unit: ms. Before 1.2.9, the default value is 1000. | -| sink.socket.timeout-ms | No | -1 | Supported since 1.2.10. The time duration for which the HTTP client waits for data. Unit: ms. The default value `-1` means there is no timeout. | -| sink.wait-for-continue.timeout-ms | No | 10000 | Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from the FE. Valid values: `3000` to `600000`. Unit: ms | -| sink.ignore.update-before | No | true | Supported since version 1.2.8. Whether to ignore `UPDATE_BEFORE` records from Flink when loading data to Primary Key tables. If this parameter is set to false, the record is treated as a delete operation to StarRocks table. | -| sink.parallelism | No | NONE | The parallelism of loading. Only available for Flink SQL. If this parameter is not specified, Flink planner decides the parallelism. **In the scenario of multi-parallelism, users need to guarantee data is written in the correct order.** | -| sink.properties.* | No | NONE | The parameters that control Stream Load behavior. For example, the parameter `sink.properties.format` specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). | -| sink.properties.format | No | csv | The format used for Stream Load. The Flink connector transforms each batch of data to the format before sending them to StarRocks. Valid values: `csv` and `json`. | -| sink.properties.column_separator | No | \t | The column separator for CSV-formatted data. | -| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. | -| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | -| sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | -| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. | +| sink.socket.timeout-ms | No | -1 | Supported since 1.2.10. The time duration for which the HTTP client waits for data. Unit: ms. The default value `-1` means there is no timeout. | +| sink.wait-for-continue.timeout-ms | No | 10000 | Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from the FE. Valid values: `3000` to `600000`. Unit: ms | +| sink.transaction-commit.timeout-ms| No | 30000 | Supported since 1.2.11. The timeout for committing transaction to FE. Valid values: `500` to `60000`. Unit: ms | +| sink.ignore.update-before | No | true | Supported since version 1.2.8. Whether to ignore `UPDATE_BEFORE` records from Flink when loading data to Primary Key tables. If this parameter is set to false, the record is treated as a delete operation to StarRocks table. | +| sink.parallelism | No | NONE | The parallelism of loading. Only available for Flink SQL. If this parameter is not specified, Flink planner decides the parallelism. **In the scenario of multi-parallelism, users need to guarantee data is written in the correct order.** | +| sink.properties.* | No | NONE | The parameters that control Stream Load behavior. For example, the parameter `sink.properties.format` specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). | +| sink.properties.format | No | csv | The format used for Stream Load. The Flink connector transforms each batch of data to the format before sending them to StarRocks. Valid values: `csv` and `json`. | +| sink.properties.column_separator | No | \t | The column separator for CSV-formatted data. | +| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. | +| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | +| sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | +| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. | ## Data type mapping between Flink and StarRocks diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index 251391df..160f13d5 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -79,6 +79,7 @@ public Set> optionalOptions() { optionalOptions.add(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT); optionalOptions.add(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT); optionalOptions.add(StarRocksSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT); + optionalOptions.add(StarRocksSinkOptions.SINK_TRANSACTION_COMMIT_TIMEOUT); optionalOptions.add(StarRocksSinkOptions.SINK_IO_THREAD_COUNT); optionalOptions.add(StarRocksSinkOptions.SINK_CHUNK_LIMIT); optionalOptions.add(StarRocksSinkOptions.SINK_SCAN_FREQUENCY); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 749f4176..ce434c41 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -95,6 +95,8 @@ public enum StreamLoadFormat { "connector side, you can set this option to an acceptable value."); public static final ConfigOption SINK_WAIT_FOR_CONTINUE_TIMEOUT = ConfigOptions.key("sink.wait-for-continue.timeout-ms") .intType().defaultValue(30000).withDescription("Timeout in millisecond to wait for 100-continue response for http client."); + public static final ConfigOption SINK_TRANSACTION_COMMIT_TIMEOUT = ConfigOptions.key("sink.transaction-commit.timeout-ms") + .intType().defaultValue(30000).withDescription("Timeout in millisecond for commit transaction to FE."); public static final ConfigOption SINK_IO_THREAD_COUNT = ConfigOptions.key("sink.io.thread-count") .intType().defaultValue(2).withDescription("Stream load thread count"); @@ -288,6 +290,14 @@ public int getWaitForContinueTimeout() { return Math.min(waitForContinueTimeoutMs, 600000); } + public int getTransactionCommitTimeout() { + int waitForContinueTimeoutMs = tableOptions.get(SINK_TRANSACTION_COMMIT_TIMEOUT); + if (waitForContinueTimeoutMs < 500) { + return 500; + } + return Math.min(waitForContinueTimeoutMs, 60000); + } + public int getIoThreadCount() { return tableOptions.get(SINK_IO_THREAD_COUNT); } @@ -563,6 +573,7 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .connectTimeout(getConnectTimeout()) .waitForContinueTimeoutMs(getWaitForContinueTimeout()) .socketTimeout(getSocketTimeout()) + .transactionTimeout(getTransactionCommitTimeout()) .ioThreadCount(getIoThreadCount()) .scanningFrequency(getScanFrequency()) .labelPrefix(getLabelPrefix()) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java index c927f715..1a2aedfc 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java @@ -247,6 +247,7 @@ public boolean commit(StreamLoadSnapshot.Transaction transaction) { httpPost.addHeader("label", transaction.getLabel()); httpPost.addHeader("db", transaction.getDatabase()); httpPost.addHeader("table", transaction.getTable()); + httpPost.addHeader("timeout", String.valueOf(properties.getTransactionTimeout())); httpPost.setConfig(RequestConfig.custom() .setSocketTimeout(properties.getSocketTimeout()) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index 5fa1edff..1793aad1 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -71,6 +71,7 @@ public class StreamLoadProperties implements Serializable { */ private final int connectTimeout; private final int socketTimeout; + private final int transactionTimeout; private final int waitForContinueTimeoutMs; private final int ioThreadCount; @@ -111,6 +112,7 @@ private StreamLoadProperties(Builder builder) { this.connectTimeout = builder.connectTimeout; this.socketTimeout = builder.socketTimeout; + this.transactionTimeout = builder.transactionTimeout; this.waitForContinueTimeoutMs = builder.waitForContinueTimeoutMs; this.ioThreadCount = builder.ioThreadCount; @@ -203,7 +205,9 @@ public int getWaitForContinueTimeoutMs() { public int getSocketTimeout() { return socketTimeout; } - + public int getTransactionTimeout() { + return transactionTimeout; + } public int getIoThreadCount() { return ioThreadCount; } @@ -261,6 +265,7 @@ public static class Builder { private int connectTimeout = 60000; // Default value -1 is the same as that in RequestConfig.Builder#socketTimeout private int socketTimeout = -1; + private int transactionTimeout = 30000; private int waitForContinueTimeoutMs = DEFAULT_WAIT_FOR_CONTINUE; private int ioThreadCount = Runtime.getRuntime().availableProcessors(); @@ -377,6 +382,11 @@ public Builder socketTimeout(int socketTimeout) { return this; } + public Builder transactionTimeout(int transactionTimeout) { + this.transactionTimeout = transactionTimeout; + return this; + } + public Builder ioThreadCount(int ioThreadCount) { if (ioThreadCount <= 0) { throw new IllegalArgumentException("ioThreadCount `" + ioThreadCount + "` set failed, must greater to 0");