diff --git a/build.sh b/build.sh index 21bcf4dc..d78f3230 100755 --- a/build.sh +++ b/build.sh @@ -33,7 +33,7 @@ check_flink_version_supported $flink_minor_version flink_version="$(get_flink_version $flink_minor_version)" kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)" -${MVN_CMD} clean package -DskipTests \ +${MVN_CMD} clean package -DskipTests -Drat.skip=true \ -Dflink.minor.version=${flink_minor_version} \ -Dflink.version=${flink_version} \ -Dkafka.connector.version=${kafka_connector_version} diff --git a/deploy.sh b/deploy.sh old mode 100644 new mode 100755 diff --git a/pom.xml b/pom.xml index 77422f3a..8a701d41 100644 --- a/pom.xml +++ b/pom.xml @@ -376,7 +376,7 @@ limitations under the License. package - jar + test-jar @@ -588,7 +588,7 @@ limitations under the License. - + release diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 0764ed5d..5b79f77c 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -58,12 +58,13 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction( - options, flinkSchema, - this.pushDownHolder.getFilter(), - this.pushDownHolder.getLimit(), - this.pushDownHolder.getSelectColumns(), - this.pushDownHolder.getColumns(), + options, flinkSchema, + this.pushDownHolder.getFilter(), + this.pushDownHolder.getLimit(), + this.pushDownHolder.getSelectColumns(), + this.pushDownHolder.getColumns(), this.pushDownHolder.getQueryType()); + return SourceFunctionProvider.of(sourceFunction, true); } @@ -73,7 +74,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length]; for (int i = 0; i < projectedFields.length; i ++) { ColumnRichInfo columnRichInfo = new ColumnRichInfo( - this.flinkSchema.getFieldName(projectedFields[i]).get(), + this.flinkSchema.getFieldName(projectedFields[i]).get(), projectedFields[i], this.flinkSchema.getFieldDataType(projectedFields[i]).get() ); @@ -114,9 +115,9 @@ public void applyProjection(int[][] projectedFields) { this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns); ArrayList columnList = new ArrayList<>(); - ArrayList selectColumns = new ArrayList(); + ArrayList selectColumns = new ArrayList(); for (int index : curProjectedFields) { - String columnName = flinkSchema.getFieldName(index).get(); + String columnName = "`" + flinkSchema.getFieldName(index).get() + "`"; columnList.add(columnName); selectColumns.add(new SelectColumn(columnName, index)); } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java index c8d4a4c3..8fa7d739 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java @@ -57,7 +57,7 @@ public String visit(CallExpression call) { } if (SUPPORT_FUNC.containsKey(funcDef)) { - + List operands = new ArrayList<>(); for (Expression child : call.getChildren()) { String operand = child.accept(this); @@ -74,8 +74,8 @@ public String visit(CallExpression call) { @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); - if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) || - typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) || + if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) || + typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) || typeRoot.equals(LogicalTypeRoot.DATE)) { return "'" + valueLiteral.toString() + "'"; @@ -85,7 +85,7 @@ public String visit(ValueLiteralExpression valueLiteral) { @Override public String visit(FieldReferenceExpression fieldReference) { - return fieldReference.getName(); + return "`" + fieldReference.getName() + "`"; } @Override @@ -98,4 +98,4 @@ public String visit(Expression other) { return null; } -} \ No newline at end of file +}