From 22e6913b5267afcfc05bac8df233603165fc2447 Mon Sep 17 00:00:00 2001 From: redscarf Date: Mon, 25 Sep 2023 20:27:38 +0800 Subject: [PATCH] feat: Support lookup function Signed-off-by: redscarf --- docs/content/connector-source.md | 35 ++- pom.xml | 3 + .../StarRocksJdbcConnectionIProvider.java | 15 +- .../StarRocksJdbcConnectionProvider.java | 25 +- .../converter/AbstractJdbcRowConverter.java | 272 +++++++++++++++++ .../flink/converter/JdbcRowConverter.java | 51 ++++ .../flink/dialect/AbstractDialect.java | 146 +++++++++ .../connector/flink/dialect/JdbcDialect.java | 69 +++++ .../flink/dialect/MySQLRowConverter.java | 38 +++ .../connector/flink/dialect/MySqlDialect.java | 93 ++++++ .../FieldNamedPreparedStatement.java | 279 ++++++++++++++++++ .../FieldNamedPreparedStatementImpl.java | 225 ++++++++++++++ .../StarRocksDynamicLookupFunction.java | 225 ++++++++------ .../source/StarRocksDynamicTableSource.java | 92 +++--- .../StarRocksDynamicTableSourceFactory.java | 48 ++- .../table/source/StarRocksSourceOptions.java | 22 +- .../flink/tools/ConnectionUtils.java | 65 ---- .../connector/flink/util/JdbcTypeUtil.java | 123 ++++++++ .../StarRocksDynamicTableSourceTest.java | 12 +- 19 files changed, 1614 insertions(+), 224 deletions(-) create mode 100644 src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java create mode 100644 src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java create mode 100644 src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java create mode 100644 src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java create mode 100644 src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java create mode 100644 src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java create mode 100644 src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java create mode 100644 src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java delete mode 100644 src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java create mode 100644 src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java diff --git a/docs/content/connector-source.md b/docs/content/connector-source.md index 2bcc1693..f7f446a7 100644 --- a/docs/content/connector-source.md +++ b/docs/content/connector-source.md @@ -92,20 +92,29 @@ Follow these steps to deploy the Flink connector: ### Common parameters -| Parameter | Required | Data type | Description | -| --------------------------- | -------- | --------- | ------------------------------------------------------------ | -| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. | -| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `:`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. | -| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://:`. The default port number is `9030`. | -| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). | -| password | Yes | STRING | The password of your StarRocks cluster account. | -| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. | -| table-name | Yes | STRING | The name of the StarRocks table you want to read. | -| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. | +| Parameter | Required | Data type | Description | +| --------------------------- | -------- |-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. | +| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `:`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. | +| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://:`. The default port number is `9030`. | +| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). | +| password | Yes | STRING | The password of your StarRocks cluster account. | +| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. | +| table-name | Yes | STRING | The name of the StarRocks table you want to read. | +| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. | | scan.params.keep-alive-min | No | STRING | The maximum amount of time during which the read task keeps alive. The keep-alive time is checked on a regular basis by using a polling mechanism. Unit: minutes. Default value: `10`. We recommend that you set this parameter to a value that is greater than or equal to `5`. | -| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. | -| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. | -| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. | +| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. | +| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. | +| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. | +| lookup.cache | No | ENUM | The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database). | +| lookup.max-retries | No | INTEGER | The max retry times if lookup database failed. Default value is 3 | +| lookup.partial-cache.expire-after-access | No | Duration | The max time to live for each rows in lookup cache after accessing the entry in the cache."lookup.cache" must be set to "PARTIAL" to use this option. | +| lookup.partial-cache.expire-after-write | No | Duration | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. | +| lookup.partial-cache.max-rows | No | LONG | The maximum number of rows to store in the cache | +| lookup.partial-cache.cache-missing-key | No | BOOLEAN | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table | + + + The following parameters apply only to reading data by using DataStream API. diff --git a/pom.xml b/pom.xml index 6c0120df..f95a0beb 100644 --- a/pom.xml +++ b/pom.xml @@ -527,6 +527,9 @@ limitations under the License. + + ${maven.compiler.source} + org.sonatype.plugins diff --git a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java index 848d7089..4df02d6a 100644 --- a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java +++ b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionIProvider.java @@ -15,7 +15,9 @@ package com.starrocks.connector.flink.connection; import org.apache.flink.annotation.Internal; + import java.sql.Connection; +import java.sql.SQLException; /** * connection provider. @@ -27,6 +29,17 @@ public interface StarRocksJdbcConnectionIProvider { Connection reestablishConnection() throws Exception; + boolean isConnectionValid() throws SQLException; + + /** + * Get existing connection or establish an new one if there is none. + * + * @return existing connection or newly established connection + * @throws SQLException sql exception + * @throws ClassNotFoundException driver class not found + */ + Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException; + void close(); - + } diff --git a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java index 6b3843b1..7f2a8693 100644 --- a/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java +++ b/src/main/java/com/starrocks/connector/flink/connection/StarRocksJdbcConnectionProvider.java @@ -63,7 +63,30 @@ public Connection getConnection() throws SQLException, ClassNotFoundException { @Override public Connection reestablishConnection() throws SQLException, ClassNotFoundException { close(); - connection = getConnection(); + return getOrEstablishConnection(); + } + + public boolean isConnectionValid() throws SQLException { + return connection != null && connection.isValid(60); + } + + @Override + public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { + if (isConnectionValid() && !connection.isClosed() ) { + return connection; + } + try { + Class.forName(jdbcOptions.getCjDriverName()); + } catch (ClassNotFoundException ex) { + LOG.warn("can not found class {}, try class {}", jdbcOptions.getCjDriverName(), jdbcOptions.getDriverName()); + Class.forName(jdbcOptions.getDriverName()); + } + if (jdbcOptions.getUsername().isPresent()) { + connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(), + jdbcOptions.getPassword().orElse(null)); + } else { + connection = DriverManager.getConnection(jdbcOptions.getDbURL()); + } return connection; } diff --git a/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java b/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java new file mode 100644 index 00000000..2cf2ef5a --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java @@ -0,0 +1,272 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.converter; + +import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement; +import com.starrocks.connector.flink.util.JdbcTypeUtil; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.ResultSet; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all converters that convert between JDBC object and Flink internal object. + */ +public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { + + protected final RowType rowType; + protected final JdbcDeserializationConverter[] toInternalConverters; + protected final JdbcSerializationConverter[] toExternalConverters; + protected final LogicalType[] fieldTypes; + + public abstract String converterName(); + + public AbstractJdbcRowConverter(RowType rowType) { + this.rowType = checkNotNull(rowType); + this.fieldTypes = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + this.toInternalConverters = new JdbcDeserializationConverter[rowType.getFieldCount()]; + this.toExternalConverters = new JdbcSerializationConverter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + toInternalConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i)); + toExternalConverters[i] = createNullableExternalConverter(fieldTypes[i]); + } + } + + @Override + public RowData toInternal(ResultSet resultSet) throws SQLException { + GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); + for (int pos = 0; pos < rowType.getFieldCount(); pos++) { + Object field = resultSet.getObject(pos + 1); + genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + } + return genericRowData; + } + + @Override + public FieldNamedPreparedStatement toExternal( + RowData rowData, FieldNamedPreparedStatement statement) throws SQLException { + for (int index = 0; index < rowData.getArity(); index++) { + toExternalConverters[index].serialize(rowData, index, statement); + } + return statement; + } + + /** Runtime converter to convert JDBC field to {@link RowData} type object. */ + @FunctionalInterface + public interface JdbcDeserializationConverter extends Serializable { + /** + * Convert a jdbc field object of {@link ResultSet} to the internal data structure object. + * + * @param jdbcField A single field of a {@link ResultSet} + * @return Object + * @throws SQLException maybe + */ + Object deserialize(Object jdbcField) throws SQLException; + } + + /** + * Runtime converter to convert {@link RowData} field to java object and fill into the {@link + * PreparedStatement}. + */ + @FunctionalInterface + public interface JdbcSerializationConverter extends Serializable { + void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement) + throws SQLException; + } + + /** + * Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link + * LogicalType}. + * @param type row type + * @return an converter for deserialize + */ + protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) { + return wrapIntoNullableInternalConverter(createInternalConverter(type)); + } + + /** + * + * @param jdbcDeserializationConverter converter for deserialization + * @return wrapped converter + */ + protected JdbcDeserializationConverter wrapIntoNullableInternalConverter( + JdbcDeserializationConverter jdbcDeserializationConverter) { + return val -> { + if (val == null) { + return null; + } else { + return jdbcDeserializationConverter.deserialize(val); + } + }; + } + + protected JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case BOOLEAN: + case FLOAT: + case DOUBLE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case BINARY: + case VARBINARY: + case BIGINT: + case INTEGER: + return val -> val; + case TINYINT: + return val -> ((Integer) val).byteValue(); + case SMALLINT: + // Converter for small type that casts value to int and then return short value, + // since + // JDBC 1.0 use int type for small values. + return val -> val instanceof Integer ? ((Integer) val).shortValue() : val; + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + // using decimal(20, 0) to support db type bigint unsigned, user should define + // decimal(20, 0) in SQL, + // but other precision like decimal(30, 0) can work too from lenient consideration. + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case DATE: + return val -> (int) (((Date) val).toLocalDate().toEpochDay()); + case TIME_WITHOUT_TIME_ZONE: + return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : TimestampData.fromTimestamp((Timestamp) val); + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case ARRAY: + case ROW: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) { + return wrapIntoNullableExternalConverter(createExternalConverter(type), type); + } + + protected JdbcSerializationConverter wrapIntoNullableExternalConverter( + JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) { + final int sqlType = JdbcTypeUtil.logicalTypeToSqlType(type.getTypeRoot()); + return (val, index, statement) -> { + if (val == null + || val.isNullAt(index) + || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { + statement.setNull(index, sqlType); + } else { + jdbcSerializationConverter.serialize(val, index, statement); + } + }; + } + + protected JdbcSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return (val, index, statement) -> + statement.setBoolean(index, val.getBoolean(index)); + case TINYINT: + return (val, index, statement) -> statement.setByte(index, val.getByte(index)); + case SMALLINT: + return (val, index, statement) -> statement.setShort(index, val.getShort(index)); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return (val, index, statement) -> statement.setInt(index, val.getInt(index)); + case BIGINT: + case INTERVAL_DAY_TIME: + return (val, index, statement) -> statement.setLong(index, val.getLong(index)); + case FLOAT: + return (val, index, statement) -> statement.setFloat(index, val.getFloat(index)); + case DOUBLE: + return (val, index, statement) -> statement.setDouble(index, val.getDouble(index)); + case CHAR: + case VARCHAR: + // value is BinaryString + return (val, index, statement) -> + statement.setString(index, val.getString(index).toString()); + case BINARY: + case VARBINARY: + return (val, index, statement) -> statement.setBytes(index, val.getBinary(index)); + case DATE: + return (val, index, statement) -> + statement.setDate( + index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)))); + case TIME_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTime( + index, + Time.valueOf( + LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L))); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return (val, index, statement) -> + statement.setTimestamp( + index, val.getTimestamp(index, timestampPrecision).toTimestamp()); + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return (val, index, statement) -> + statement.setBigDecimal( + index, + val.getDecimal(index, decimalPrecision, decimalScale) + .toBigDecimal()); + case ARRAY: + case MAP: + case MULTISET: + case ROW: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java b/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java new file mode 100644 index 00000000..e53129b0 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.converter; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; +import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.RowData; + +/** + * Converter that is responsible to convert between JDBC object and Flink SQL internal data + * structure {@link RowData}. + */ +@PublicEvolving +public interface JdbcRowConverter extends Serializable { + + /** + * Convert data retrieved from {@link ResultSet} to internal {@link RowData}. + * + * @param resultSet ResultSet from JDBC + * @return resultSet to row + * @throws SQLException sql exception + */ + RowData toInternal(ResultSet resultSet) throws SQLException; + + /** + * Convert data retrieved from Flink internal RowData to JDBC Object. + * + * @param rowData The given internal {@link RowData}. + * @param statement The statement to be filled. + * @return The filled statement. + * @throws SQLException if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + FieldNamedPreparedStatement toExternal(RowData rowData, FieldNamedPreparedStatement statement) + throws SQLException; +} diff --git a/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java new file mode 100644 index 00000000..c78f7ea8 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.dialect; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.Preconditions; +import java.util.Optional; +import java.util.Set; + +/** + * Base class for {@link JdbcDialect JdbcDialects} that implements basic data type validation and + * the construction of basic {@code INSERT}, {@code UPDATE}, {@code DELETE}, and {@code SELECT} + * statements. + * + *

Implementors should be careful to check the default SQL statements are performant for their + * specific dialect and override them if necessary. + */ +@PublicEvolving +public abstract class AbstractDialect implements JdbcDialect { + + @Override + public void validate(RowType rowType) throws ValidationException { + for (RowType.RowField field : rowType.getFields()) { + // TODO: We can't convert VARBINARY(n) data type to + // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in + // LegacyTypeInfoDataTypeConverter when n is smaller + // than Integer.MAX_VALUE + if (!supportedTypes().contains(field.getType().getTypeRoot()) + || (field.getType() instanceof VarBinaryType + && Integer.MAX_VALUE + != ((VarBinaryType) field.getType()).getLength())) { + throw new ValidationException( + String.format( + "The %s dialect doesn't support type: %s.", + dialectName(), field.getType())); + } + + if (field.getType() instanceof DecimalType) { + Range range = + decimalPrecisionRange() + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "JdbcDialect %s supports DECIMAL type but no precision range has been set. " + + "Ensure AbstractDialect#decimalPrecisionRange() is overriden to return a valid Range", + dialectName()))); + int precision = ((DecimalType) field.getType()).getPrecision(); + if (precision > range.max || precision < range.min) { + throw new ValidationException( + String.format( + "The precision of field '%s' is out of the DECIMAL " + + "precision range [%d, %d] supported by %s dialect.", + field.getName(), range.min, range.max, dialectName())); + } + } + + if (field.getType() instanceof TimestampType) { + Range range = + timestampPrecisionRange() + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "JdbcDialect %s supports TIMESTAMP type but no precision range has been set." + + "Ensure AbstractDialect#timestampPrecisionRange() is overriden to return a valid Range", + dialectName()))); + int precision = ((TimestampType) field.getType()).getPrecision(); + if (precision > range.max || precision < range.min) { + throw new ValidationException( + String.format( + "The precision of field '%s' is out of the TIMESTAMP " + + "precision range [%d, %d] supported by %s dialect.", + field.getName(), range.min, range.max, dialectName())); + } + } + } + } + + + + + + /** + * @return The inclusive range [min,max] of supported precisions for {@link TimestampType} + * columns. None if timestamp type is not supported. + */ + public Optional timestampPrecisionRange() { + return Optional.empty(); + } + + /** + * @return The inclusive range [min,max] of supported precisions for {@link DecimalType} + * columns. None if decimal type is not supported. + */ + public Optional decimalPrecisionRange() { + return Optional.empty(); + } + + /** + * Defines the set of supported types for the dialect. If the dialect supports {@code DECIMAL} + * or {@code TIMESTAMP} types, be sure to override {@link #decimalPrecisionRange()} and {@link + * #timestampPrecisionRange()} respectively. + * + * @return a set of logical type roots. + */ + public abstract Set supportedTypes(); + + @PublicEvolving + public static class Range { + private final int min; + + private final int max; + + public static Range of(int min, int max) { + Preconditions.checkArgument( + min <= max, + String.format( + "The range min value in range %d must be <= max value %d", min, max)); + return new Range(min, max); + } + + private Range(int min, int max) { + this.min = min; + this.max = max; + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java new file mode 100644 index 00000000..c32eac7f --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.dialect; + +import java.io.Serializable; +import com.starrocks.connector.flink.converter.JdbcRowConverter; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.RowType; + +/** + * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable + * and stateless. + * + */ +@PublicEvolving +public interface JdbcDialect extends Serializable { + + /** + * Get the name of jdbc dialect. + * + * @return the dialect name. + */ + String dialectName(); + + /** + * Get converter that convert jdbc object and Flink internal object each other. + * + * @param rowType the given row type + * @return a row converter for the database + */ + JdbcRowConverter getRowConverter(RowType rowType); + + + /** + * Check if this dialect instance support a specific data type in table schema. + * + * @param rowType the physical table datatype of a row in the database table. + * @exception ValidationException in case of the table schema contains unsupported type. + */ + void validate(RowType rowType) throws ValidationException; + + + /** + * Quotes the identifier. + * + *

Used to put quotes around the identifier if the column name is a reserved keyword or + * contains characters requiring quotes (e.g., space). + * @param identifier identifier + * + * @return the quoted identifier. + */ + String quoteIdentifier(String identifier); + + + +} diff --git a/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java b/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java new file mode 100644 index 00000000..5948f474 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/dialect/MySQLRowConverter.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.dialect; + +import com.starrocks.connector.flink.converter.AbstractJdbcRowConverter; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.RowType; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * MySQL. + */ +@Internal +public class MySQLRowConverter extends AbstractJdbcRowConverter { + + private static final long serialVersionUID = 1L; + + @Override + public String converterName() { + return "MySQL"; + } + + public MySQLRowConverter(RowType rowType) { + super(rowType); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java b/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java new file mode 100644 index 00000000..5bb3dc80 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.dialect; + +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import com.starrocks.connector.flink.converter.JdbcRowConverter; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +/** JDBC dialect for MySQL. */ +@Internal +public class MySqlDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs: + // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html + private static final int MAX_TIMESTAMP_PRECISION = 6; + private static final int MIN_TIMESTAMP_PRECISION = 0; + + // Define MAX/MIN precision of DECIMAL type according to Mysql docs: + // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html + private static final int MAX_DECIMAL_PRECISION = 65; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public JdbcRowConverter getRowConverter(RowType rowType) { + return new MySQLRowConverter(rowType); + } + + @Override + public String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + + + @Override + public String dialectName() { + return "MySQL"; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + // The data types used in Mysql are list at: + // https://dev.mysql.com/doc/refman/8.0/en/data-types.html + + // TODO: We can't convert BINARY data type to + // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in + // LegacyTypeInfoDataTypeConverter. + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + } + +} diff --git a/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java new file mode 100644 index 00000000..81ff4f0d --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java @@ -0,0 +1,279 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.statement; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import org.apache.flink.annotation.PublicEvolving; + +/** + * This is a wrapper around {@link PreparedStatement} and allows the users to set parameters by name + * instead of by index. This allows users to use the same variable parameter multiple times in a + * statement. + * + *

Code such as this: + * + *

+ *   Connection con = getConnection();
+ *   String query = "select * from my_table where first_name=? or last_name=?";
+ *   PreparedStatement st = con.prepareStatement(query);
+ *   st.setString(1, "bob");
+ *   st.setString(2, "bob");
+ *   ResultSet rs = st.executeQuery();
+ * 
+ * + *

Can be replaced with: + * + *

+ *   Connection con = getConnection();
+ *   String query = "select * from my_table where first_name=:name or last_name=:name";
+ *   FieldNamedPreparedStatement st = FieldNamedPreparedStatement.prepareStatement(con, query, new String[]{"name"});
+ *   st.setString(0, "bob");
+ *   ResultSet rs = st.executeQuery();
+ * 
+ */ +@PublicEvolving +public interface FieldNamedPreparedStatement extends AutoCloseable { + + /** + * Creates a NamedPreparedStatement object for sending parameterized SQL statements + * to the database. + * + * @param connection the connection used to connect to database. + * @param sql an SQL statement that may contain one or more ':fieldName' as parameter + * placeholders + * @param fieldNames the field names in schema order used as the parameter names + * @return filled prepared statement + * @throws SQLException – if a database access error occurs or this method is called on a closed connection + */ + static FieldNamedPreparedStatement prepareStatement( + Connection connection, String sql, String[] fieldNames) throws SQLException { + return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames); + } + + /** + * Executes the SQL query in this NamedPreparedStatement object and returns the + * ResultSet object generated by the query. + * + * @see PreparedStatement#executeQuery() + * @return a ResultSet object that contains the data produced by the query; never null + * @throws SQLException – if a database access error occurs; this method is called on a closed + * PreparedStatement or the SQL statement does not return a ResultSet object + */ + ResultSet executeQuery() throws SQLException; + + + /** + * Sets the designated parameter to SQL NULL. + * + *

Note: You must specify the parameter's SQL type. + * + * @see PreparedStatement#setNull(int, int) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param sqlType – the SQL type code defined in java.sql.Types + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + */ + void setNull(int fieldIndex, int sqlType) throws SQLException; + + /** + * Sets the designated parameter to the given Java boolean value. The driver + * converts this to an SQL BIT or BOOLEAN value when it sends it to + * the database. + * + * @see PreparedStatement#setBoolean(int, boolean) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setBoolean(int fieldIndex, boolean x) throws SQLException; + + /** + * Sets the designated parameter to the given Java byte value. The driver converts + * this to an SQL TINYINT value when it sends it to the database. + * + * @see PreparedStatement#setByte(int, byte) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setByte(int fieldIndex, byte x) throws SQLException; + + /** + * Sets the designated parameter to the given Java short value. The driver converts + * this to an SQL SMALLINT value when it sends it to the database. + * + * @see PreparedStatement#setShort(int, short) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setShort(int fieldIndex, short x) throws SQLException; + + /** + * Sets the designated parameter to the given Java int value. The driver converts + * this to an SQL INTEGER value when it sends it to the database. + * + * @see PreparedStatement#setInt(int, int) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setInt(int fieldIndex, int x) throws SQLException; + + /** + * Sets the designated parameter to the given Java long value. The driver converts + * this to an SQL BIGINT value when it sends it to the database. + * + * @see PreparedStatement#setLong(int, long) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setLong(int fieldIndex, long x) throws SQLException; + + /** + * Sets the designated parameter to the given Java float value. The driver converts + * this to an SQL REAL value when it sends it to the database. + * + * @see PreparedStatement#setFloat(int, float) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setFloat(int fieldIndex, float x) throws SQLException; + + /** + * Sets the designated parameter to the given Java double value. The driver + * converts this to an SQL DOUBLE value when it sends it to the database. + * + * @see PreparedStatement#setDouble(int, double) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setDouble(int fieldIndex, double x) throws SQLException; + + /** + * Sets the designated parameter to the given java.math.BigDecimal value. The + * driver converts this to an SQL NUMERIC value when it sends it to the database. + * + * @see PreparedStatement#setBigDecimal(int, BigDecimal) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setBigDecimal(int fieldIndex, BigDecimal x) throws SQLException; + + /** + * Sets the designated parameter to the given Java String value. The driver + * converts this to an SQL VARCHAR or LONGVARCHAR value (depending on + * the argument's size relative to the driver's limits on VARCHAR values) when it + * sends it to the database. + * + * @see PreparedStatement#setString(int, String) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setString(int fieldIndex, String x) throws SQLException; + + /** + * Sets the designated parameter to the given Java array of bytes. The driver converts this to + * an SQL VARBINARY or LONGVARBINARY (depending on the argument's size + * relative to the driver's limits on VARBINARY values) when it sends it to the + * database. + * + * @see PreparedStatement#setBytes(int, byte[]) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setBytes(int fieldIndex, byte[] x) throws SQLException; + + /** + * Sets the designated parameter to the given java.sql.Date value using the default + * time zone of the virtual machine that is running the application. The driver converts this to + * an SQL DATE value when it sends it to the database. + * + * @see PreparedStatement#setDate(int, Date) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setDate(int fieldIndex, Date x) throws SQLException; + + /** + * Sets the designated parameter to the given java.sql.Time value. The driver + * converts this to an SQL TIME value when it sends it to the database. + * + * @see PreparedStatement#setTime(int, Time) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setTime(int fieldIndex, Time x) throws SQLException; + + /** + * Sets the designated parameter to the given java.sql.Timestamp value. The driver + * converts this to an SQL TIMESTAMP value when it sends it to the database. + * + * @see PreparedStatement#setTimestamp(int, Timestamp) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setTimestamp(int fieldIndex, Timestamp x) throws SQLException; + + /** + * Sets the value of the designated parameter using the given object. + * + * @see PreparedStatement#setObject(int, Object) + * @param fieldIndex – the first parameter is 1, the second is 2, ... + * @param x – the parameter value + * @throws SQLException – if parameterIndex does not correspond to a parameter marker in the SQL statement; + * if a database access error occurs or this method is called on a closed PreparedStatement + */ + void setObject(int fieldIndex, Object x) throws SQLException; + + /** + * Releases this Statement object's database and JDBC resources immediately instead + * of waiting for this to happen when it is automatically closed. It is generally good practice + * to release resources as soon as you are finished with them to avoid tying up database + * resources. + * + * @see PreparedStatement#close() + * @throws SQLException – if a database access error occurs + */ + void close() throws SQLException; +} diff --git a/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java new file mode 100644 index 00000000..32407663 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java @@ -0,0 +1,225 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.statement; + +import java.math.BigDecimal; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.ResultSet; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Simple implementation of {@link FieldNamedPreparedStatement}. + */ +public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatement { + + private final PreparedStatement statement; + private final int[][] indexMapping; + + private FieldNamedPreparedStatementImpl(PreparedStatement statement, int[][] indexMapping) { + this.statement = statement; + this.indexMapping = indexMapping; + } + + + @Override + public ResultSet executeQuery() throws SQLException { + return statement.executeQuery(); + } + + @Override + public void setNull(int fieldIndex, int sqlType) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setNull(index, sqlType); + } + } + + @Override + public void setBoolean(int fieldIndex, boolean x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setBoolean(index, x); + } + } + + @Override + public void setByte(int fieldIndex, byte x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setByte(index, x); + } + } + + @Override + public void setShort(int fieldIndex, short x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setShort(index, x); + } + } + + @Override + public void setInt(int fieldIndex, int x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setInt(index, x); + } + } + + @Override + public void setLong(int fieldIndex, long x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setLong(index, x); + } + } + + @Override + public void setFloat(int fieldIndex, float x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setFloat(index, x); + } + } + + @Override + public void setDouble(int fieldIndex, double x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setDouble(index, x); + } + } + + @Override + public void setBigDecimal(int fieldIndex, BigDecimal x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setBigDecimal(index, x); + } + } + + @Override + public void setString(int fieldIndex, String x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setString(index, x); + } + } + + @Override + public void setBytes(int fieldIndex, byte[] x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setBytes(index, x); + } + } + + @Override + public void setDate(int fieldIndex, Date x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setDate(index, x); + } + } + + @Override + public void setTime(int fieldIndex, Time x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setTime(index, x); + } + } + + @Override + public void setTimestamp(int fieldIndex, Timestamp x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setTimestamp(index, x); + } + } + + @Override + public void setObject(int fieldIndex, Object x) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setObject(index, x); + } + } + + @Override + public void close() throws SQLException { + statement.close(); + } + + // ---------------------------------------------------------------------------------------- + + public static FieldNamedPreparedStatement prepareStatement( + Connection connection, String sql, String[] fieldNames) throws SQLException { + checkNotNull(connection, "connection must not be null."); + checkNotNull(sql, "sql must not be null."); + checkNotNull(fieldNames, "fieldNames must not be null."); + + if (sql.contains("?")) { + throw new IllegalArgumentException("SQL statement must not contain ? character."); + } + + HashMap> parameterMap = new HashMap<>(); + String parsedSQL = parseNamedStatement(sql, parameterMap); + // currently, the statements must contain all the field parameters + checkArgument(parameterMap.size() == fieldNames.length); + int[][] indexMapping = new int[fieldNames.length][]; + for (int i = 0; i < fieldNames.length; i++) { + String fieldName = fieldNames[i]; + checkArgument( + parameterMap.containsKey(fieldName), + fieldName + " doesn't exist in the parameters of SQL statement: " + sql); + indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray(); + } + + return new FieldNamedPreparedStatementImpl( + connection.prepareStatement(parsedSQL), indexMapping); + } + + /** + * Parses a sql with named parameters. The parameter-index mappings are put into the map, and + * the parsed sql is returned. + * + * @param sql sql to parse + * @param paramMap map to hold parameter-index mappings + * @return the parsed sql + */ + public static String parseNamedStatement(String sql, Map> paramMap) { + StringBuilder parsedSql = new StringBuilder(); + int fieldIndex = 1; // SQL statement parameter index starts from 1 + int length = sql.length(); + for (int i = 0; i < length; i++) { + char c = sql.charAt(i); + if (':' == c) { + int j = i + 1; + while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) { + j++; + } + String parameterName = sql.substring(i + 1, j); + checkArgument( + !parameterName.isEmpty(), + "Named parameters in SQL statement must not be empty."); + paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex); + fieldIndex++; + i = j - 1; + parsedSql.append('?'); + } else { + parsedSql.append(c); + } + } + return parsedSql.toString(); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java index 719cd35e..6e13a935 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java @@ -14,122 +14,169 @@ package com.starrocks.connector.flink.table.source; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; -import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; -import com.starrocks.connector.flink.table.source.struct.QueryInfo; -import com.starrocks.connector.flink.table.source.struct.SelectColumn; +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkArgument; + +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; +import com.starrocks.connector.flink.converter.JdbcRowConverter; +import com.starrocks.connector.flink.dialect.MySqlDialect; +import com.starrocks.connector.flink.statement.FieldNamedPreparedStatement; import com.starrocks.connector.flink.tools.EnvUtils; -import org.apache.flink.table.data.GenericRowData; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.types.Row; +import org.apache.flink.table.functions.LookupFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +public class StarRocksDynamicLookupFunction extends LookupFunction { -public class StarRocksDynamicLookupFunction extends TableFunction { - private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLookupFunction.class); - - private final ColumnRichInfo[] filterRichInfos; - private final StarRocksSourceOptions sourceOptions; - private QueryInfo queryInfo; - private final SelectColumn[] selectColumns; - private final List columnRichInfos; - - private final long cacheMaxSize; - private final long cacheExpireMs; + private final int maxRetryTimes; - // cache for lookup data - private Map> cacheMap; + private final String[] keyNames; + + private final String query; + + private final transient StarRocksJdbcConnectionProvider connectionProvider; + + private transient FieldNamedPreparedStatement statement; - private transient long nextLoadTime; + private final JdbcRowConverter lookupKeyRowConverter; - public StarRocksDynamicLookupFunction(StarRocksSourceOptions sourceOptions, - ColumnRichInfo[] filterRichInfos, - List columnRichInfos, - SelectColumn[] selectColumns - ) { - this.sourceOptions = sourceOptions; - this.filterRichInfos = filterRichInfos; - this.columnRichInfos = columnRichInfos; - this.selectColumns = selectColumns; + private final JdbcRowConverter jdbcRowConverter; + + public StarRocksDynamicLookupFunction(StarRocksSourceOptions sourceOptions, + String[] fieldNames, + DataType[] fieldTypes, + String[] keyNames, + RowType rowType + ) { - this.cacheMaxSize = sourceOptions.getLookupCacheMaxRows(); - this.cacheExpireMs = sourceOptions.getLookupCacheTTL(); this.maxRetryTimes = sourceOptions.getLookupMaxRetries(); - - this.cacheMap = new HashMap<>(); - this.nextLoadTime = -1L; + + this.keyNames = keyNames; + MySqlDialect mySqlDialect = new MySqlDialect(); + this.query = lookupSql(sourceOptions.getTableName(), fieldNames, keyNames); + connectionProvider = new StarRocksJdbcConnectionProvider( + new StarRocksJdbcConnectionOptions( + sourceOptions.getJdbcUrl(), sourceOptions.getUsername(), + sourceOptions.getPassword())); + this.jdbcRowConverter = mySqlDialect.getRowConverter(rowType); + List nameList = Arrays.asList(fieldNames); + DataType[] keyTypes = + Arrays.stream(keyNames) + .map( + s -> { + checkArgument( + nameList.contains(s), + "keyName %s can't find in fieldNames %s.", + s, + nameList); + return fieldTypes[nameList.indexOf(s)]; + }) + .toArray(DataType[]::new); + + this.lookupKeyRowConverter = mySqlDialect.getRowConverter(RowType.of( + Arrays.stream(keyTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new))); + } - + + private String lookupSql(String tableName, String[] selectFields, String[] conditionFields){ + String selectExpressions = + Arrays.stream(selectFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String fieldExpressions = + Arrays.stream(conditionFields) + .map(f -> format("%s = :%s", quoteIdentifier(f), f)) + .collect(Collectors.joining(" AND ")); + return "SELECT " + + selectExpressions + + " FROM " + + quoteIdentifier(tableName) + + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : ""); + } + + public String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + @Override public void open(FunctionContext context) throws Exception { super.open(context); + establishConnectionAndStatement(); LOG.info("Open lookup function. {}", EnvUtils.getGitInformation()); } - public void eval(Object... keys) { - reloadData(); - Row keyRow = Row.of(keys); - List curList = cacheMap.get(keyRow); - if (curList != null) { - curList.parallelStream().forEach(this::collect); - } - } + @Override + public Collection lookup(RowData keyRow) { + for (int retry = 0; retry <= maxRetryTimes; retry++) { + try { + statement = lookupKeyRowConverter.toExternal(keyRow, statement); + try (ResultSet resultSet = statement.executeQuery()) { + ArrayList rows = new ArrayList<>(); + while (resultSet.next()) { + RowData row = jdbcRowConverter.toInternal(resultSet); + rows.add(row); + } + rows.trimToSize(); + return rows; + } + } catch (SQLException e) { + LOG.error(String.format("StarRocks executeBatch error, retry times = %d", retry), e); + if (retry >= maxRetryTimes) { + throw new RuntimeException("Execution of StarRocks statement failed.", e); + } - private void reloadData() { - if (nextLoadTime > System.currentTimeMillis()) { - return; - } - if (nextLoadTime > 0) { - LOG.info("Lookup join cache has expired after {} (ms), reloading", this.cacheExpireMs); - } else { - LOG.info("Populating lookup join cache"); - } - cacheMap.clear(); - - StringBuilder sqlSb = new StringBuilder("select * from "); - sqlSb.append("`").append(sourceOptions.getDatabaseName()).append("`"); - sqlSb.append("."); - sqlSb.append("`" + sourceOptions.getTableName() + "`"); - LOG.info("LookUpFunction SQL [{}]", sqlSb.toString()); - this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sqlSb.toString()); - List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo); - cacheMap = lists.get(0).parallelStream().flatMap(beXTablets -> { - StarRocksSourceBeReader beReader = new StarRocksSourceBeReader( - beXTablets.getBeNode(), - columnRichInfos, - selectColumns, - sourceOptions); - beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions); - beReader.startToRead(); - List tmpDataList = new ArrayList<>(); - while (beReader.hasNext()) { - RowData row = beReader.getNext(); - tmpDataList.add(row); - } - return tmpDataList.stream(); - }).collect(Collectors.groupingBy(row -> { - GenericRowData gRowData = (GenericRowData)row; - Object[] keyObj = new Object[filterRichInfos.length]; - for (int i = 0; i < filterRichInfos.length; i ++) { - keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema()); + try { + if (!connectionProvider.isConnectionValid()) { + statement.close(); + connectionProvider.close(); + establishConnectionAndStatement(); + } + } catch (SQLException | ClassNotFoundException exception) { + LOG.error( + "StarRocks connection is not valid, and reestablish connection failed", + exception); + throw new RuntimeException("Reestablish StarRocks connection failed", exception); + } + + try { + Thread.sleep(1000L * retry); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } } - return Row.of(keyObj); - })); - nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs; + } + return Collections.emptyList(); } + @Override public void close() throws Exception { + connectionProvider.close(); super.close(); } + + private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { + Connection dbConn = connectionProvider.getConnection(); + statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames); + } } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 0764ed5d..0bd06cb6 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -14,7 +14,6 @@ package com.starrocks.connector.flink.table.source; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.connector.flink.table.source.struct.PushDownHolder; import com.starrocks.connector.flink.table.source.struct.SelectColumn; @@ -25,29 +24,47 @@ import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; -import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Optional; -public class StarRocksDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown { +public class StarRocksDynamicTableSource implements ScanTableSource, LookupTableSource, + SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown { private final TableSchema flinkSchema; private final StarRocksSourceOptions options; private final PushDownHolder pushDownHolder; - public StarRocksDynamicTableSource(StarRocksSourceOptions options, TableSchema schema, PushDownHolder pushDownHolder) { + @Nullable + private final LookupCache cache; + + private final DataType physicalRowDataType; + + public StarRocksDynamicTableSource(StarRocksSourceOptions options, + TableSchema schema, + PushDownHolder pushDownHolder, + @Nullable LookupCache cache, + DataType physicalRowDataType + ) { this.options = options; this.flinkSchema = schema; this.pushDownHolder = pushDownHolder; + this.cache = cache; + this.physicalRowDataType = physicalRowDataType; } @Override @@ -58,39 +75,45 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction( - options, flinkSchema, - this.pushDownHolder.getFilter(), - this.pushDownHolder.getLimit(), - this.pushDownHolder.getSelectColumns(), - this.pushDownHolder.getColumns(), + options, flinkSchema, + this.pushDownHolder.getFilter(), + this.pushDownHolder.getLimit(), + this.pushDownHolder.getSelectColumns(), + this.pushDownHolder.getColumns(), this.pushDownHolder.getQueryType()); return SourceFunctionProvider.of(sourceFunction, true); } @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray(); - ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length]; - for (int i = 0; i < projectedFields.length; i ++) { - ColumnRichInfo columnRichInfo = new ColumnRichInfo( - this.flinkSchema.getFieldName(projectedFields[i]).get(), - projectedFields[i], - this.flinkSchema.getFieldDataType(projectedFields[i]).get() - ); - filerRichInfo[i] = columnRichInfo; + // StarRocks only support non-nested look up keys + String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { + int[] innerKeyArr = context.getKeys()[i]; + Preconditions.checkArgument( + innerKeyArr.length == 1, "StarRocks only support non-nested look up keys"); + keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); + } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + + StarRocksDynamicLookupFunction lookupFunction = + new StarRocksDynamicLookupFunction( + options, + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), + keyNames, + rowType); + if (cache != null) { + return PartialCachingLookupProvider.of(lookupFunction, cache); + } else { + return LookupFunctionProvider.of(lookupFunction); } - - Map columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema); - List ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap); - SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos); - - StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns); - return TableFunctionProvider.of(tableFunction); } @Override public DynamicTableSource copy() { - return new StarRocksDynamicTableSource(this.options, this.flinkSchema, this.pushDownHolder); + return new StarRocksDynamicTableSource(this.options, this.flinkSchema, this.pushDownHolder, + cache, physicalRowDataType); } @Override @@ -106,15 +129,16 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields) { // if columns = "*", this func will not be called, so 'selectColumns' will be null - int[] curProjectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray(); - if (curProjectedFields.length == 0 ) { + int[] curProjectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]) + .toArray(); + if (curProjectedFields.length == 0) { this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QueryCount); return; } this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns); ArrayList columnList = new ArrayList<>(); - ArrayList selectColumns = new ArrayList(); + ArrayList selectColumns = new ArrayList<>(); for (int index : curProjectedFields) { String columnName = flinkSchema.getFieldName(index).get(); columnList.add(columnName); @@ -122,7 +146,8 @@ public void applyProjection(int[][] projectedFields) { } String columns = String.join(", ", columnList); this.pushDownHolder.setColumns(columns); - this.pushDownHolder.setSelectColumns(selectColumns.toArray(new SelectColumn[selectColumns.size()])); + this.pushDownHolder.setSelectColumns( + selectColumns.toArray(new SelectColumn[selectColumns.size()])); } @Override @@ -133,14 +158,15 @@ public Result applyFilters(List filtersExpressions) { StarRocksExpressionExtractor extractor = new StarRocksExpressionExtractor(); for (ResolvedExpression expression : filtersExpressions) { - if (expression.getOutputDataType().equals(DataTypes.BOOLEAN()) && expression.getChildren().size() == 0) { + if (expression.getOutputDataType().equals(DataTypes.BOOLEAN()) + && expression.getChildren().size() == 0) { filters.add(expression.accept(extractor) + " = true"); ac.add(expression); continue; } String str = expression.accept(extractor); + remain.add(expression); if (str == null) { - remain.add(expression); continue; } filters.add(str); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java index e340b2cd..bd32ca0a 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java @@ -19,13 +19,21 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; +import javax.annotation.Nullable; +import java.time.Duration; import java.util.HashSet; import java.util.Set; +import static com.starrocks.connector.flink.table.source.StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS; +import static com.starrocks.connector.flink.table.source.StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS; + public final class StarRocksDynamicTableSourceFactory implements DynamicTableSourceFactory { @@ -41,10 +49,34 @@ public DynamicTableSource createDynamicTableSource(Context context, boolean need } ReadableConfig options = helper.getOptions(); // validate some special properties - StarRocksSourceOptions sourceOptions = new StarRocksSourceOptions(options, context.getCatalogTable().getOptions()); - TableSchema flinkSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + StarRocksSourceOptions sourceOptions = new StarRocksSourceOptions(options, + context.getCatalogTable().getOptions()); + TableSchema flinkSchema = TableSchemaUtils.getPhysicalSchema( + context.getCatalogTable().getSchema()); PushDownHolder pushDownHolder = new PushDownHolder(); - return new StarRocksDynamicTableSource(sourceOptions, flinkSchema, pushDownHolder); + return new StarRocksDynamicTableSource(sourceOptions, flinkSchema, pushDownHolder, + getLookupCache(options), context.getPhysicalRowDataType()); + } + + @Nullable + private LookupCache getLookupCache(ReadableConfig tableOptions) { + LookupCache cache = null; + // Legacy cache options + if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0 + && tableOptions.get(LOOKUP_CACHE_TTL_MS).compareTo(0L) > 0) { + cache = + DefaultLookupCache.newBuilder() + .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS)) + .expireAfterWrite(Duration.ofMillis(tableOptions.get(LOOKUP_CACHE_TTL_MS))) + .cacheMissingKey(tableOptions.get(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY)) + .build(); + } + if (tableOptions + .get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { + cache = DefaultLookupCache.fromConfig(tableOptions); + } + return cache; } @Override @@ -76,9 +108,13 @@ public Set> optionalOptions() { options.add(StarRocksSourceOptions.SCAN_MEM_LIMIT); options.add(StarRocksSourceOptions.SCAN_MAX_RETRIES); options.add(StarRocksSourceOptions.SCAN_BE_HOST_MAPPING_LIST); - options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS); - options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS); - options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES); + options.add(LookupOptions.CACHE_TYPE); + options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS); + options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE); + options.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS); + options.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY); + options.add(LookupOptions.MAX_RETRIES); + return options; } } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java index cd739d76..d94c9931 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java @@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType().noDefaultValue().withDescription("Table name"); - - + + // optional Options public static final ConfigOption SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms") .intType().defaultValue(1000).withDescription("Connect timeout"); - + public static final ConfigOption SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows") .intType().defaultValue(1000).withDescription("Batch rows"); public static final ConfigOption SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties") .stringType().noDefaultValue().withDescription("Reserved params for use"); - + public static final ConfigOption SCAN_LIMIT = ConfigOptions.key("scan.params.limit") .intType().defaultValue(1).withDescription("The query limit, if specified."); public static final ConfigOption SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min") .intType().defaultValue(10).withDescription("Max keep alive time min"); - + public static final ConfigOption SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s") .intType().defaultValue(600).withDescription("Query timeout for a single query"); @@ -88,21 +88,23 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list") .stringType().defaultValue("").withDescription("List of be host mapping"); - + // lookup Options + @Deprecated public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows") .longType().defaultValue(-1L).withDescription( "the max number of rows of lookup cache, over this value, the oldest rows will " + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + "specified. Cache is not enabled as default."); + @Deprecated public static final ConfigOption LOOKUP_CACHE_TTL_MS = ConfigOptions.key("lookup.cache.ttl-ms") .longType().defaultValue(5000L).withDescription("the cache time to live."); + @Deprecated public static final ConfigOption LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries") .intType().defaultValue(1).withDescription("the max retry times if lookup database failed."); - public static final String SOURCE_PROPERTIES_PREFIX = "scan.params."; public StarRocksSourceOptions(ReadableConfig options, Map optionsMap) { @@ -150,7 +152,7 @@ public String getScanUrl() { public String getJdbcUrl() { return tableOptions.get(JDBC_URL); } - + public String getUsername() { return tableOptions.get(USERNAME); } @@ -169,8 +171,8 @@ public String getTableName() { // optional Options - public int getConnectTimeoutMs() { - return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue(); + public int getConnectTimeoutMs() { + return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue(); } public int getBatchRows() { diff --git a/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java b/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java deleted file mode 100644 index e1f9be8d..00000000 --- a/src/main/java/com/starrocks/connector/flink/tools/ConnectionUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.starrocks.connector.flink.tools; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.List; -import javax.annotation.Nullable; - -/** Utilities for HTTP connection. */ -public class ConnectionUtils { - - private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); - - /** Select an available host from the list. Each host is like 'ip:port'. */ - @Nullable - public static String selectAvailableHttpHost(List hostList, int connectionTimeout) { - for (String host : hostList) { - if (host == null) { - continue; - } - if (!host.startsWith("http")) { - host = "http://" + host; - } - if (testHttpConnection(host, connectionTimeout)) { - return host; - } - } - - return null; - } - - public static boolean testHttpConnection(String urlStr, int connectionTimeout) { - try { - URL url = new URL(urlStr); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(connectionTimeout); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e) { - LOG.warn("Failed to connect to {}", urlStr, e); - return false; - } - } -} diff --git a/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java b/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java new file mode 100644 index 00000000..19e512a4 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.util; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BINARY; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BOOLEAN; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.CHAR; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TINYINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +import java.sql.Types; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +/** + * Utils for jdbc type. + */ +@Internal +public class JdbcTypeUtil { + + private static final Map, Integer> TYPE_MAPPING; + + static { + HashMap, Integer> m = new HashMap<>(); + m.put(STRING_TYPE_INFO, Types.VARCHAR); + m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN); + m.put(BYTE_TYPE_INFO, Types.TINYINT); + m.put(SHORT_TYPE_INFO, Types.SMALLINT); + m.put(INT_TYPE_INFO, Types.INTEGER); + m.put(LONG_TYPE_INFO, Types.BIGINT); + m.put(FLOAT_TYPE_INFO, Types.REAL); + m.put(DOUBLE_TYPE_INFO, Types.DOUBLE); + m.put(SqlTimeTypeInfo.DATE, Types.DATE); + m.put(SqlTimeTypeInfo.TIME, Types.TIME); + m.put(SqlTimeTypeInfo.TIMESTAMP, Types.TIMESTAMP); + m.put(LocalTimeTypeInfo.LOCAL_DATE, Types.DATE); + m.put(LocalTimeTypeInfo.LOCAL_TIME, Types.TIME); + m.put(LocalTimeTypeInfo.LOCAL_DATE_TIME, Types.TIMESTAMP); + m.put(BIG_DEC_TYPE_INFO, Types.DECIMAL); + m.put(BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.BINARY); + TYPE_MAPPING = Collections.unmodifiableMap(m); + } + + private static final Map LOGICAL_TYPE_MAPPING = + Collections.unmodifiableMap( + new HashMap() { + { + put(VARCHAR, Types.VARCHAR); + put(CHAR, Types.CHAR); + put(VARBINARY, Types.VARBINARY); + put(BOOLEAN, Types.BOOLEAN); + put(BINARY, Types.BINARY); + put(TINYINT, Types.TINYINT); + put(SMALLINT, Types.SMALLINT); + put(INTEGER, Types.INTEGER); + put(BIGINT, Types.BIGINT); + put(FLOAT, Types.REAL); + put(DOUBLE, Types.DOUBLE); + put(DATE, Types.DATE); + put(TIMESTAMP_WITHOUT_TIME_ZONE, Types.TIMESTAMP); + put(TIMESTAMP_WITH_TIME_ZONE, Types.TIMESTAMP_WITH_TIMEZONE); + put(TIME_WITHOUT_TIME_ZONE, Types.TIME); + put(DECIMAL, Types.DECIMAL); + put(ARRAY, Types.ARRAY); + } + }); + + private JdbcTypeUtil() { + } + + public static int logicalTypeToSqlType(LogicalTypeRoot typeRoot) { + if (LOGICAL_TYPE_MAPPING.containsKey(typeRoot)) { + return LOGICAL_TYPE_MAPPING.get(typeRoot); + } else { + throw new IllegalArgumentException("Unsupported typeRoot: " + typeRoot); + } + } +} diff --git a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java index edfd49b7..77967e1b 100644 --- a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java +++ b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java @@ -44,7 +44,7 @@ public class StarRocksDynamicTableSourceTest extends StarRocksSourceBaseTest { @Before public void init() { pushDownHolder = new PushDownHolder(); - dynamicTableSource = new StarRocksDynamicTableSource(OPTIONS, TABLE_SCHEMA, pushDownHolder); + dynamicTableSource = new StarRocksDynamicTableSource(OPTIONS, TABLE_SCHEMA, pushDownHolder,null,null); } @Test @@ -55,7 +55,7 @@ public void testApplyProjection() { for (int i = 0; i < SELECT_COLUMNS.length; i ++) { assertEquals(SELECT_COLUMNS[i].getColumnIndexInFlinkTable(), pushDownHolder.getSelectColumns()[i].getColumnIndexInFlinkTable()); assertEquals(SELECT_COLUMNS[i].getColumnName(), pushDownHolder.getSelectColumns()[i].getColumnName()); - } + } assertEquals(StarRocksSourceQueryType.QuerySomeColumns, pushDownHolder.getQueryType()); dynamicTableSource.applyProjection(PROJECTION_ARRAY_NULL); @@ -121,7 +121,7 @@ public void testFilter() { BuiltInFunctionDefinitions.EQUALS, Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); - + dynamicTableSource.applyFilters(Arrays.asList(c1Exp, new CallExpression( BuiltInFunctionDefinitions.NOT_EQUALS, @@ -162,7 +162,7 @@ public void testFilter() { BuiltInFunctionDefinitions.LIKE, Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); - try { + try { dynamicTableSource.applyFilters(Collections.singletonList(c6Exp)); } catch (Exception e) { e.printStackTrace(); @@ -174,7 +174,7 @@ public void testFilter() { BuiltInFunctionDefinitions.IN, Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); - try { + try { dynamicTableSource.applyFilters(Collections.singletonList(c7Exp)); } catch (Exception e) { e.printStackTrace(); @@ -186,7 +186,7 @@ public void testFilter() { BuiltInFunctionDefinitions.BETWEEN, Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); - try { + try { dynamicTableSource.applyFilters(Collections.singletonList(c8Exp)); } catch (Exception e) { e.printStackTrace();