From eb45ce958b0b77aceb32ea251c5c4de11c27d90d Mon Sep 17 00:00:00 2001 From: lijiangbo <245730400@qq.com> Date: Mon, 26 Nov 2018 20:50:47 +0800 Subject: [PATCH] Support static data and customize time format --- .../com/dtstack/flinkx/reader/MetaColumn.java | 132 ++++++++++++++++++ .../com/dtstack/flinkx/util/DateUtil.java | 21 ++- .../com/dtstack/flinkx/util/StringUtil.java | 17 ++- .../dtstack/flinkx/db2/Db2DatabaseMeta.java | 5 + .../java/com/dtstack/flinkx/es/EsUtil.java | 2 +- .../flinkx/ftp/reader/FtpInputFormat.java | 45 +++--- .../ftp/reader/FtpInputFormatBuilder.java | 13 +- .../dtstack/flinkx/ftp/reader/FtpReader.java | 31 +--- .../com/dtstack/flinkx/hdfs/HdfsUtil.java | 17 ++- .../flinkx/hdfs/reader/HdfsInputFormat.java | 9 +- .../hdfs/reader/HdfsInputFormatBuilder.java | 19 +-- .../hdfs/reader/HdfsOrcInputFormat.java | 43 ++++-- .../hdfs/reader/HdfsParquetInputFormat.java | 81 ++++++++--- .../flinkx/hdfs/reader/HdfsReader.java | 35 +---- .../hdfs/reader/HdfsTextInputFormat.java | 52 ++++--- .../hdfs/writer/HdfsOrcOutputFormat.java | 4 +- .../hdfs/writer/HdfsTextOutputFormat.java | 4 +- .../com/dtstack/flinkx/mongodb/Column.java | 64 --------- .../dtstack/flinkx/mongodb/MongodbUtil.java | 54 +------ .../mongodb/reader/MongodbInputFormat.java | 37 ++++- .../reader/MongodbInputFormatBuilder.java | 7 +- .../flinkx/mongodb/reader/MongodbReader.java | 15 +- .../mongodb/writer/MongodbOutputFormat.java | 4 +- .../writer/MongodbOutputFormatBuilder.java | 5 +- .../flinkx/mongodb/writer/MongodbWriter.java | 11 +- .../flinkx/mysql/MySqlDatabaseMeta.java | 5 + .../flinkx/odps/reader/OdpsInputFormat.java | 49 ++++--- .../odps/reader/OdpsInputFormatBuilder.java | 13 +- .../flinkx/odps/reader/OdpsReader.java | 30 +--- .../flinkx/odps/writer/OdpsOutputFormat.java | 2 +- .../flinkx/oracle/OracleDatabaseMeta.java | 5 + .../postgresql/PostgresqlDatabaseMeta.java | 5 + .../dtstack/flinkx/rdb/DatabaseInterface.java | 2 + .../datareader/DistributedJdbcDataReader.java | 8 +- .../flinkx/rdb/datareader/JdbcDataReader.java | 14 +- .../DistributedJdbcInputFormat.java | 16 ++- .../DistributedJdbcInputFormatBuilder.java | 5 +- .../rdb/inputformat/JdbcInputFormat.java | 13 +- .../inputformat/JdbcInputFormatBuilder.java | 5 +- .../rdb/outputformat/JdbcOutputFormat.java | 4 +- .../com/dtstack/flinkx/rdb/util/DBUtil.java | 26 +++- .../sqlserver/SqlServerDatabaseMeta.java | 5 + .../stream/reader/StreamInputFormat.java | 9 +- .../reader/StreamInputFormatBuilder.java | 4 +- .../flinkx/stream/reader/StreamReader.java | 6 +- 45 files changed, 519 insertions(+), 434 deletions(-) create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/reader/MetaColumn.java delete mode 100644 flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/Column.java diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/reader/MetaColumn.java b/flinkx-core/src/main/java/com/dtstack/flinkx/reader/MetaColumn.java new file mode 100644 index 0000000000..04035420a5 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/reader/MetaColumn.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.reader; + +import com.dtstack.flinkx.util.DateUtil; +import org.apache.commons.lang3.time.FastDateFormat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * + * @author jiangbo + * @date 2018/11/26 + */ +public class MetaColumn { + + private String name; + + private String type; + + private Integer index; + + private String value; + + private FastDateFormat timeFormat; + + private String splitter; + + public String getSplitter() { + return splitter; + } + + public void setSplitter(String splitter) { + this.splitter = splitter; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Integer getIndex() { + return index; + } + + public void setIndex(Integer index) { + this.index = index; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public FastDateFormat getTimeFormat() { + return timeFormat; + } + + public void setTimeFormat(FastDateFormat timeFormat) { + this.timeFormat = timeFormat; + } + + public static List getMetaColumns(List columns){ + List metaColumns = new ArrayList<>(); + if(columns != null && columns.size() > 0) { + if (columns.get(0) instanceof Map) { + for (int i = 0; i < columns.size(); i++) { + Map sm = (Map) columns.get(i); + MetaColumn mc = new MetaColumn(); + mc.setIndex((Integer) sm.get("index")); + mc.setType((String) sm.get("type")); + mc.setValue((String) sm.get("value")); + mc.setSplitter((String) sm.get("splitter")); + + if(sm.get("format") != null){ + mc.setTimeFormat(DateUtil.getDateFormatter((String) sm.get("format"))); + } + + metaColumns.add(mc); + } + } else if (columns.get(0) instanceof String) { + if(columns.size() == 1 && columns.get(0).equals("*")){ + MetaColumn mc = new MetaColumn(); + mc.setName("*"); + metaColumns.add(mc); + } else { + for (Object column : columns) { + MetaColumn mc = new MetaColumn(); + mc.setName(String.valueOf(column)); + metaColumns.add(mc); + } + } + } else { + throw new IllegalArgumentException("column argument error"); + } + } + + return metaColumns; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java index 198d827e2a..535a29a0c9 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java @@ -55,14 +55,14 @@ public class DateUtil { private DateUtil() {} - public static java.sql.Date columnToDate(Object column) { + public static java.sql.Date columnToDate(Object column,FastDateFormat customTimeFormat) { if(column == null) { return null; } else if(column instanceof String) { if (((String) column).length() == 0){ return null; } - return new java.sql.Date(stringToDate((String)column).getTime()); + return new java.sql.Date(stringToDate((String)column,customTimeFormat).getTime()); } else if (column instanceof Integer) { Integer rawData = (Integer) column; return new java.sql.Date(rawData.longValue()); @@ -82,14 +82,14 @@ public static java.sql.Date columnToDate(Object column) { throw new IllegalArgumentException("Can't convert " + column.getClass().getName() + " to Date"); } - public static java.sql.Timestamp columnToTimestamp(Object column) { + public static java.sql.Timestamp columnToTimestamp(Object column,FastDateFormat customTimeFormat) { if (column == null) { return null; } else if(column instanceof String) { if (((String) column).length() == 0){ return null; } - return new java.sql.Timestamp(stringToDate((String)column).getTime()); + return new java.sql.Timestamp(stringToDate((String)column,customTimeFormat).getTime()); } else if (column instanceof Integer) { Integer rawData = (Integer) column; return new java.sql.Timestamp(rawData.longValue()); @@ -108,11 +108,18 @@ public static java.sql.Timestamp columnToTimestamp(Object column) { throw new IllegalArgumentException("Can't convert " + column.getClass().getName() + " to Date"); } - public static Date stringToDate(String strDate) { + public static Date stringToDate(String strDate,FastDateFormat customTimeFormat) { if(strDate == null || strDate.trim().length() == 0) { return null; } + if(customTimeFormat != null){ + try { + return customTimeFormat.parse(strDate); + } catch (ParseException ignored) { + } + } + try { return datetimeFormatter.parse(strDate); } catch (ParseException ignored) { @@ -148,6 +155,10 @@ public static String dateToYearString(Date date) { return yearFormatter.format(date); } + public static FastDateFormat getDateFormatter(String timeFormat){ + return FastDateFormat.getInstance(timeFormat, timeZoner); + } + static { timeZoner = TimeZone.getTimeZone(timeZone); datetimeFormatter = FastDateFormat.getInstance(datetimeFormat, timeZoner); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java index 0fcb2a6ea6..594f093f1c 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java @@ -20,6 +20,7 @@ import com.dtstack.flinkx.common.ColumnType; import com.dtstack.flinkx.exception.WriteRecordException; +import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -67,9 +68,15 @@ public static String convertRegularExpr (String str) { return str; } - public static Object string2col(String str, String type) { + public static Object string2col(String str, String type, FastDateFormat customTimeFormat) { + if(str == null || str.length() == 0){ + return null; + } + + if(type == null){ + return str; + } - Preconditions.checkNotNull(type); ColumnType columnType = valueOf(type.toUpperCase()); Object ret; switch(columnType) { @@ -101,14 +108,14 @@ public static Object string2col(String str, String type) { ret = Boolean.valueOf(str.toLowerCase()); break; case DATE: - ret = DateUtil.columnToDate(str); + ret = DateUtil.columnToDate(str,customTimeFormat); break; case TIMESTAMP: case DATETIME: - ret = DateUtil.columnToTimestamp(str); + ret = DateUtil.columnToTimestamp(str,customTimeFormat); break; default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Unsupported field type:" + type); } return ret; diff --git a/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java b/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java index c9d1271cc1..3c6bdd5b3f 100644 --- a/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java +++ b/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java @@ -97,4 +97,9 @@ public String getStartQuote() { public String getEndQuote() { return ""; } + + @Override + public String quoteValue(String value, String column) { + return String.format("'%s' as %s",value,column); + } } diff --git a/flinkx-es/flinkx-es-core/src/main/java/com/dtstack/flinkx/es/EsUtil.java b/flinkx-es/flinkx-es-core/src/main/java/com/dtstack/flinkx/es/EsUtil.java index 31c69d6ea0..906b13cfe2 100644 --- a/flinkx-es/flinkx-es-core/src/main/java/com/dtstack/flinkx/es/EsUtil.java +++ b/flinkx-es/flinkx-es-core/src/main/java/com/dtstack/flinkx/es/EsUtil.java @@ -188,7 +188,7 @@ private static Object convertValueToAssignType(String columnType, String constan column = constantValue; break; case "DATE": - column = DateUtil.stringToDate(constantValue); + column = DateUtil.stringToDate(constantValue,null); break; default: throw new IllegalArgumentException("Unsupported column type: " + columnType); diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java index 002253631d..559fe63d42 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java @@ -24,6 +24,7 @@ import com.dtstack.flinkx.ftp.StandardFtpHandler; import com.dtstack.flinkx.inputformat.RichInputFormat; import com.dtstack.flinkx.reader.ByteRateLimiter; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; @@ -65,11 +66,7 @@ public class FtpInputFormat extends RichInputFormat { protected String charsetName = "utf-8"; - protected List columnIndex; - - protected List columnValue; - - protected List columnType; + protected List metaColumns; protected transient boolean isFirstLineHeader; @@ -116,8 +113,6 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { @Override public void openInternal(InputSplit split) throws IOException { - - FtpInputSplit inputSplit = (FtpInputSplit)split; List paths = inputSplit.getPaths(); FtpSeqInputStream is = new FtpSeqInputStream(ftpHandler, paths); @@ -144,21 +139,33 @@ public boolean reachedEnd() throws IOException { @Override public Row nextRecordInternal(Row row) throws IOException { - row = new Row(columnIndex.size()); String[] fields = line.split(delimiter); - for(int i = 0; i < columnIndex.size(); ++i) { - Integer index = columnIndex.get(i); - String val = columnValue.get(i); - if(index != null) { - String col = fields[index]; - row.setField(i, col); - } else if(val != null) { - String type = columnType.get(i); - Object col = StringUtil.string2col(val,type); - row.setField(i, col); + if (metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(fields.length); + for (int i = 0; i < fields.length; i++) { + row.setField(i, fields[i]); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object value; + if(metaColumn.getValue() != null){ + value = metaColumn.getValue(); + } else if(metaColumn.getIndex() != null){ + value = fields[metaColumn.getIndex()]; + } else { + value = null; + } + + if(value != null){ + value = StringUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat()); + } + + row.setField(i, value); } - } + return row; } diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java index 17248671dc..2af194870f 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java @@ -1,6 +1,7 @@ package com.dtstack.flinkx.ftp.reader; import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.commons.lang.StringUtils; import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions; import java.util.List; @@ -60,22 +61,14 @@ public void setEncoding(String encoding) { } } - public void setColumnIndex(List columnIndex) { - format.columnIndex = columnIndex; + public void setMetaColumn(List metaColumns) { + format.metaColumns = metaColumns; } public void setIsFirstLineHeader(boolean isFirstLineHeader){ format.isFirstLineHeader = isFirstLineHeader; } - public void setColumnValue(List columnValue) { - format.columnValue = columnValue; - } - - public void setColumnType(List columnType) { - format.columnType = columnType; - } - @Override protected void checkFormat() { diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java index baedb50c4e..1d6a1b40fe 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java @@ -22,15 +22,12 @@ import com.dtstack.flinkx.config.ReaderConfig; import com.dtstack.flinkx.ftp.FtpConfigConstants; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static com.dtstack.flinkx.ftp.FtpConfigKeys.*; import static com.dtstack.flinkx.ftp.FtpConfigConstants.*; @@ -52,10 +49,7 @@ public class FtpReader extends DataReader { private String fieldDelimiter; private String encoding; private boolean isFirstLineHeader; - - private List columnIndex; - private List columnType; - private List columnValue; + private List metaColumns; public FtpReader(DataTransferConfig config, StreamExecutionEnvironment env) { @@ -83,30 +77,13 @@ public FtpReader(DataTransferConfig config, StreamExecutionEnvironment env) { isFirstLineHeader = readerConfig.getParameter().getBooleanVal(KEY_IS_FIRST_HEADER,false); List columns = readerConfig.getParameter().getColumn(); - if(columns != null && columns.size() > 0) { - if (columns.get(0) instanceof Map) { - columnIndex = new ArrayList(); - columnType = new ArrayList<>(); - columnValue = new ArrayList<>(); - for (int i = 0; i < columns.size(); ++i) { - Map sm = (Map) columns.get(i); - Double temp = (Double) sm.get("index"); - columnIndex.add(temp != null ? temp.intValue() : null); - columnType.add((String) sm.get("type")); - columnValue.add((String) sm.get("value")); - } - } else if (!columns.get(0).equals("*") || columns.size() != 1) { - throw new IllegalArgumentException("column argument error"); - } - } + metaColumns = MetaColumn.getMetaColumns(columns); } @Override public DataStream readData() { FtpInputFormatBuilder builder = new FtpInputFormatBuilder(); - builder.setColumnType(columnType); - builder.setColumnIndex(columnIndex); - builder.setColumnValue(columnValue); + builder.setMetaColumn(metaColumns); builder.setConnectMode(connectPattern); builder.setDelimiter(fieldDelimiter); builder.setEncoding(encoding); diff --git a/flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/HdfsUtil.java b/flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/HdfsUtil.java index 55cf95dad4..4b35471d73 100644 --- a/flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/HdfsUtil.java +++ b/flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/HdfsUtil.java @@ -21,6 +21,7 @@ import com.dtstack.flinkx.common.ColumnType; import com.dtstack.flinkx.util.DateUtil; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -82,9 +83,15 @@ public static String getDefaultFs(){ return configuration.get("fs.defaultFS"); } - public static Object string2col(String str, String type) { + public static Object string2col(String str, String type, FastDateFormat customDateFormat) { + if (str == null || str.length() == 0){ + return null; + } + + if(type == null){ + return str; + } - Preconditions.checkNotNull(type); ColumnType columnType = ColumnType.fromString(type.toUpperCase()); Object ret; switch(columnType) { @@ -116,13 +123,13 @@ public static Object string2col(String str, String type) { ret = Boolean.valueOf(str.toLowerCase()); break; case DATE: - ret = DateUtil.columnToDate(str); + ret = DateUtil.columnToDate(str,customDateFormat); break; case TIMESTAMP: - ret = DateUtil.columnToTimestamp(str); + ret = DateUtil.columnToTimestamp(str,customDateFormat); break; default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Unsupported field type:" + type); } return ret; diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormat.java index 7514392d85..ac36fd5a0e 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormat.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.hdfs.reader; import com.dtstack.flinkx.inputformat.RichInputFormat; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -37,13 +38,7 @@ public abstract class HdfsInputFormat extends RichInputFormat { protected Map hadoopConfig; - protected List columnIndex; - - protected List columnValue; - - protected List columnType; - - protected List columnName; + protected List metaColumns; protected String inputPath; diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormatBuilder.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormatBuilder.java index 58ec0c2303..c4438c0db2 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormatBuilder.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsInputFormatBuilder.java @@ -19,6 +19,8 @@ package com.dtstack.flinkx.hdfs.reader; import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; +import com.dtstack.flinkx.reader.MetaColumn; + import java.util.List; import java.util.Map; @@ -50,21 +52,8 @@ public void setHadoopConfig(Map hadoopConfig) { format.hadoopConfig = hadoopConfig; } - public void setColumnIndex(List columnIndex) { - format.columnIndex = columnIndex; - } - - public void setColumnValue(List columnValue) { - format.columnValue = columnValue; - } - - public void setColumnName(List columnName) { - format.columnName = columnName; - } - - - public void setColumnType(List columnType) { - format.columnType = columnType; + public void setMetaColumn(List metaColumn) { + format.metaColumns = metaColumn; } public void setInputPaths(String inputPaths) { diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java index 35af6d9992..48856cbc73 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.hdfs.reader; import com.dtstack.flinkx.hdfs.HdfsUtil; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import org.apache.commons.lang.StringUtils; import org.apache.flink.core.io.InputSplit; @@ -115,10 +116,9 @@ protected void configureAnythingElse() { fullColTypes[i] = temp[1]; } - for(int j = 0; j < columnName.size(); ++j) { - if(columnName.get(j) != null) { - columnIndex.set(j,name2index(columnName.get(j))); - } + for(int j = 0; j < metaColumns.size(); ++j) { + MetaColumn metaColumn = metaColumns.get(j); + metaColumn.setIndex(name2index(metaColumn.getName())); } Properties p = new Properties(); @@ -180,22 +180,37 @@ private int name2index(String columnName) { @Override public Row nextRecordInternal(Row row) throws IOException { - row = new Row(columnIndex.size()); - for(int i = 0; i < columnIndex.size(); ++i) { - Integer index = columnIndex.get(i); - String val = columnValue.get(i); - String type = columnType.get(i); - if(index != null) { - Object col = inspector.getStructFieldData(value, fields.get(index)); + if(metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(fullColNames.length); + for (int i = 0; i < fullColNames.length; i++) { + Object col = inspector.getStructFieldData(value, fields.get(i)); if (col != null) { col = HdfsUtil.getWritableValue(col); } row.setField(i, col); - } else if(val != null) { - Object col = HdfsUtil.string2col(val,type); - row.setField(i, col); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object val; + if (metaColumn.getValue() != null){ + val = metaColumn.getValue(); + } else { + val = inspector.getStructFieldData(value, fields.get(metaColumn.getIndex())); + if (val != null) { + val = HdfsUtil.getWritableValue(val); + } + } + + if(val != null && val instanceof String){ + val = HdfsUtil.string2col(String.valueOf(val),metaColumn.getType(),metaColumn.getTimeFormat()); + } + + row.setField(i,val); } } + return row; } diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java index 591f3b3f73..0743254eec 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.hdfs.reader; import com.dtstack.flinkx.hdfs.HdfsUtil; +import com.dtstack.flinkx.reader.MetaColumn; import com.google.common.collect.Lists; import org.apache.flink.core.io.InputSplit; import org.apache.flink.types.Row; @@ -28,6 +29,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.Type; import java.io.IOException; import java.sql.Timestamp; @@ -50,11 +52,15 @@ public class HdfsParquetInputFormat extends HdfsInputFormat { private transient List allFilePaths; + private transient List fullColNames; + + private transient List fullColTypes; + private transient List currentSplitFilePaths; - private transient int currenFileIndex = 0; + private transient int currentFileIndex = 0; - private SimpleDateFormat sdf = new SimpleDateFormat(""); + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void configureAnythingElse() { @@ -71,7 +77,7 @@ protected void openInternal(InputSplit inputSplit) throws IOException { } private boolean nextLine() throws IOException{ - if (currentFileReader == null && currenFileIndex <= currentSplitFilePaths.size()-1){ + if (currentFileReader == null && currentFileIndex <= currentSplitFilePaths.size()-1){ nextFile(); } @@ -80,6 +86,19 @@ private boolean nextLine() throws IOException{ } currentLine = currentFileReader.read(); + if (fullColNames == null && currentLine != null){ + fullColNames = new ArrayList<>(); + fullColTypes = new ArrayList<>(); + List types = currentLine.getType().getFields(); + for (Type type : types) { + fullColNames.add(type.getName()); + fullColTypes.add(getTypeName(type.asPrimitiveType().getPrimitiveTypeName().getMethod)); + } + + for (MetaColumn metaColumn : metaColumns) { + metaColumn.setIndex(currentLine.getType().getFieldIndex(metaColumn.getName())); + } + } if (currentLine == null){ currentFileReader = null; @@ -90,31 +109,37 @@ private boolean nextLine() throws IOException{ } private void nextFile() throws IOException{ - String path = currentSplitFilePaths.get(currenFileIndex); + String path = currentSplitFilePaths.get(currentFileIndex); ParquetReader.Builder reader = ParquetReader.builder(new GroupReadSupport(), new Path(path)).withConf(conf); currentFileReader = reader.build(); - currenFileIndex++; + currentFileIndex++; } @Override protected Row nextRecordInternal(Row row) throws IOException { - row = new Row(columnIndex.size()); - Object col; - for (int i = 0; i < columnIndex.size(); i++) { - Integer index = columnIndex.get(i); - String staticVal = columnValue.get(i); - String type = columnType.get(i); - if(index != null){ - col = getDate(currentLine,type,index); - row.setField(i, col); - } else { - if(staticVal != null){ - row.setField(i, HdfsUtil.string2col(staticVal,type)); + if(metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(fullColNames.size()); + for (int i = 0; i < fullColNames.size(); i++) { + Object val = getData(currentLine,fullColTypes.get(i),i); + row.setField(i, val); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object val; + if (metaColumn.getValue() != null){ + val = metaColumn.getValue(); } else { - col = getDate(currentLine,type,currentLine.getType().getFieldIndex(columnName.get(i))); - row.setField(i, col); + val = getData(currentLine,metaColumn.getType(),metaColumn.getIndex()); } + + if(val != null && val instanceof String){ + val = HdfsUtil.string2col(String.valueOf(val),metaColumn.getType(),metaColumn.getTimeFormat()); + } + + row.setField(i,val); } } @@ -126,7 +151,7 @@ public boolean reachedEnd() throws IOException { return !nextLine(); } - private Object getDate(Group currentLine,String type,int index){ + private Object getData(Group currentLine,String type,int index){ Object data; switch (type){ case "tinyint" : @@ -218,6 +243,22 @@ private List getAllPartitionPath(String tableLocation) throws IOExceptio } } + private String getTypeName(String method){ + String typeName; + switch (method){ + case "getInteger" : typeName = "int";break; + case "getInt96" : typeName = "bigint";break; + case "getFloat" : typeName = "float";break; + case "getDouble" : typeName = "double";break; + case "getBinary" : typeName = "binary";break; + case "getString" : typeName = "string";break; + case "getBoolean" : typeName = "int";break; + default:typeName = "string"; + } + + return typeName; + } + static class HdfsParquetSplit implements InputSplit{ private int splitNumber; diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsReader.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsReader.java index 71076a2ebd..41d6c90388 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsReader.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsReader.java @@ -23,11 +23,11 @@ import com.dtstack.flinkx.config.ReaderConfig; import com.dtstack.flinkx.hdfs.HdfsConfigKeys; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -43,10 +43,7 @@ public class HdfsReader extends DataReader { protected String fileType; protected String path; protected String fieldDelimiter; - protected List columnIndex; - protected List columnType; - protected List columnValue; - protected List columnName; + private List metaColumns; protected Map hadoopConfig; public HdfsReader(DataTransferConfig config, StreamExecutionEnvironment env) { @@ -65,38 +62,14 @@ public HdfsReader(DataTransferConfig config, StreamExecutionEnvironment env) { fieldDelimiter = StringUtil.convertRegularExpr(fieldDelimiter); } - List columns = readerConfig.getParameter().getColumn(); - if(columns != null && columns.size() > 0) { - if(columns.get(0) instanceof Map) { - columnIndex = new ArrayList<>(); - columnType = new ArrayList<>(); - columnValue = new ArrayList<>(); - columnName = new ArrayList<>(); - for(int i = 0; i < columns.size(); ++i) { - Map sm = (Map) columns.get(i); - Double temp = (Double)sm.get("index"); - columnIndex.add(temp != null ? temp.intValue() : null); - columnType.add((String) sm.get("type")); - columnValue.add((String) sm.get("value")); - columnName.add((String) sm.get("name")); - } - System.out.println("init column finished"); - } else if (!columns.get(0).equals("*") || columns.size() != 1) { - throw new IllegalArgumentException("column argument error"); - } - } else{ - throw new IllegalArgumentException("column argument error"); - } + metaColumns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); } @Override public DataStream readData() { HdfsInputFormatBuilder builder = new HdfsInputFormatBuilder(fileType); builder.setInputPaths(path); - builder.setColumnIndex(columnIndex); - builder.setColumnName(columnName); - builder.setColumnType(columnType); - builder.setColumnValue(columnValue); + builder.setMetaColumn(metaColumns); builder.setHadoopConfig(hadoopConfig); builder.setDefaultFs(defaultFS); builder.setDelimiter(fieldDelimiter); diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsTextInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsTextInputFormat.java index 92821b66b6..e8dfcb10a6 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsTextInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsTextInputFormat.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.hdfs.reader; import com.dtstack.flinkx.hdfs.HdfsUtil; +import com.dtstack.flinkx.reader.MetaColumn; import jodd.util.StringUtil; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.core.io.InputSplit; @@ -87,23 +88,33 @@ public Row nextRecordInternal(Row row) throws IOException { byte[] data = ((Text)value).getBytes(); String line = new String(data, charsetName); String[] fields = line.split(delimiter); - row = new Row(columnIndex.size()); - for(int i = 0; i < columnIndex.size(); ++i) { - Integer index = columnIndex.get(i); - String val = columnValue.get(i); - if(index != null) { - if(index >= fields.length) { - row.setField(i, null); + + if (metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(fields.length); + for (int i = 0; i < fields.length; i++) { + row.setField(i, fields[i]); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object value; + if(metaColumn.getValue() != null){ + value = metaColumn.getValue(); + } else if(metaColumn.getIndex() != null){ + value = fields[metaColumn.getIndex()]; } else { - row.setField(i, HdfsUtil.string2col(fields[index],columnType.get(i))); + value = null; + } + + if(value != null){ + value = HdfsUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat()); } - } else if(val != null) { - String type = columnType.get(i); - Object col = HdfsUtil.string2col(val,type); - row.setField(i, col); - } + row.setField(i, value); + } } + return row; } @@ -128,21 +139,6 @@ public HdfsTextInputFormatBuilder setHadoopConfig(Map hadoopConfi return this; } - public HdfsTextInputFormatBuilder setColumnIndex(List columnIndex) { - format.columnIndex = columnIndex; - return this; - } - - public HdfsTextInputFormatBuilder setColumnValue(List columnValue) { - format.columnValue = columnValue; - return this; - } - - public HdfsTextInputFormatBuilder setColumnType(List columnType) { - format.columnType = columnType; - return this; - } - public HdfsTextInputFormatBuilder setInputPaths(String inputPaths) { format.inputPath = inputPaths; return this; diff --git a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsOrcOutputFormat.java b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsOrcOutputFormat.java index f24c937f6c..5b98f9eacd 100644 --- a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsOrcOutputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsOrcOutputFormat.java @@ -152,10 +152,10 @@ public void writeSingleRecordInternal(Row row) throws WriteRecordException { recordList.add(Boolean.valueOf(rowData)); break; case DATE: - recordList.add(DateUtil.columnToDate(column)); + recordList.add(DateUtil.columnToDate(column,null)); break; case TIMESTAMP: - recordList.add(DateUtil.columnToTimestamp(column)); + recordList.add(DateUtil.columnToTimestamp(column,null)); break; default: throw new IllegalArgumentException(); diff --git a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsTextOutputFormat.java b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsTextOutputFormat.java index 4c50d4e2db..06dacf183b 100644 --- a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsTextOutputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsTextOutputFormat.java @@ -133,7 +133,7 @@ public void writeSingleRecordInternal(Row row) throws WriteRecordException { if(column instanceof Date) { sb.append(DateUtil.dateToString((Date)column)); } else { - Date d = DateUtil.columnToDate(column); + Date d = DateUtil.columnToDate(column,null); String s = DateUtil.dateToString(d); sb.append(s); } @@ -142,7 +142,7 @@ public void writeSingleRecordInternal(Row row) throws WriteRecordException { if(column instanceof Date) { sb.append(DateUtil.timestampToString((Date)column)); } else { - Date d = DateUtil.columnToTimestamp(column); + Date d = DateUtil.columnToTimestamp(column,null); String s = DateUtil.timestampToString(d); sb.append(s); } diff --git a/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/Column.java b/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/Column.java deleted file mode 100644 index 66135b94e4..0000000000 --- a/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/Column.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.mongodb; - -import java.io.Serializable; - -/** - * @Company: www.dtstack.com - * @author jiangbo - */ -public class Column implements Serializable { - - private String name; - - private String type; - - private String splitter; - - public Column(String name, String type, String splitter) { - this.name = name; - this.type = type; - this.splitter = splitter; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getSplitter() { - return splitter; - } - - public void setSplitter(String splitter) { - this.splitter = splitter; - } -} diff --git a/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java b/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java index 6ea3c4158f..9bd14f08a7 100644 --- a/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java +++ b/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.mongodb; import com.dtstack.flinkx.exception.WriteRecordException; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.TelnetUtil; import com.google.common.collect.Lists; import com.mongodb.*; @@ -121,27 +122,10 @@ public static void close(){ } } - public static Row convertDocTORow(Document doc,List columns){ - Row row = new Row(columns.size()); - for (int i = 0; i < columns.size(); i++) { - Column col= columns.get(i); - Object colVal = getSpecifiedTypeVal(doc,col.getName(),col.getType()); - if (col.getSplitter() != null && col.getSplitter().length() > 0){ - if(colVal instanceof List){ - colVal = StringUtils.join((List)colVal,col.getSplitter()); - } - } - - row.setField(i,colVal); - } - - return row; - } - - public static Document convertRowToDoc(Row row,List columns) throws WriteRecordException { + public static Document convertRowToDoc(Row row,List columns) throws WriteRecordException { Document doc = new Document(); for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); + MetaColumn column = columns.get(i); Object val = convertField(row.getField(i)); if (StringUtils.isNotEmpty(column.getSplitter())){ val = Arrays.asList(String.valueOf(val).split(column.getSplitter())); @@ -161,38 +145,6 @@ private static Object convertField(Object val){ return val; } - private static Object getSpecifiedTypeVal(Document doc,String key,String type){ - if (!doc.containsKey(key)){ - return null; - } - - Object val; - switch (type.toLowerCase()){ - case "string" : - val = doc.getString(key); - break; - case "int" : - val = doc.getInteger(key); - break; - case "long" : - val = doc.getLong(key); - break; - case "double" : - val = doc.getDouble(key); - break; - case "bool" : - val = doc.getBoolean(key); - break; - case "date" : - val = doc.getDate(key); - break; - default: - val = doc.get(key); - } - - return val; - } - /** * parse server address from hostPorts string */ diff --git a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormat.java b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormat.java index 460a38f1bc..371a0ea08f 100644 --- a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormat.java +++ b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormat.java @@ -19,8 +19,8 @@ package com.dtstack.flinkx.mongodb.reader; import com.dtstack.flinkx.inputformat.RichInputFormat; -import com.dtstack.flinkx.mongodb.Column; import com.dtstack.flinkx.mongodb.MongodbUtil; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import com.mongodb.BasicDBObject; import com.mongodb.client.FindIterable; @@ -34,10 +34,7 @@ import org.bson.conversions.Bson; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.dtstack.flinkx.mongodb.MongodbConfigKeys.*; @@ -59,7 +56,7 @@ public class MongodbInputFormat extends RichInputFormat { protected String collectionName; - protected List columns; + protected List metaColumns; protected String filterJson; @@ -98,7 +95,33 @@ protected void openInternal(InputSplit inputSplit) throws IOException { @Override public Row nextRecordInternal(Row row) throws IOException { - return MongodbUtil.convertDocTORow(cursor.next(),columns); + Document doc = cursor.next(); + if(metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(doc.size()); + String[] names = doc.keySet().toArray(new String[0]); + for (int i = 0; i < names.length; i++) { + row.setField(i,doc.get(names[i])); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object value; + if (metaColumn.getValue() != null){ + value = metaColumn.getValue(); + } else { + value = doc.get(metaColumn.getName()); + } + + if(value != null && value instanceof String){ + value = StringUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat()); + } + + row.setField(i,value); + } + } + + return row; } @Override diff --git a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormatBuilder.java b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormatBuilder.java index 71bf2beaa2..2c73cedb7b 100644 --- a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormatBuilder.java +++ b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormatBuilder.java @@ -19,10 +19,9 @@ package com.dtstack.flinkx.mongodb.reader; import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; -import com.dtstack.flinkx.mongodb.Column; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; -import java.util.Map; /** * The builder for mongodb reader plugin @@ -58,8 +57,8 @@ public void setCollection(String collection){ format.collectionName = collection; } - public void setColumns(List columns){ - format.columns = columns; + public void setMetaColumns(List metaColumns){ + format.metaColumns = metaColumns; } public void setFilter(String filter){ diff --git a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbReader.java b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbReader.java index 08b88e359f..b781d01127 100644 --- a/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbReader.java +++ b/flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbReader.java @@ -20,16 +20,13 @@ import com.dtstack.flinkx.config.DataTransferConfig; import com.dtstack.flinkx.config.ReaderConfig; -import com.dtstack.flinkx.mongodb.Column; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; -import scala.util.parsing.json.JSONArray; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import static com.dtstack.flinkx.mongodb.MongodbConfigKeys.*; @@ -51,7 +48,7 @@ public class MongodbReader extends DataReader { protected String collection; - protected List columns = new ArrayList<>(); + private List metaColumns; protected String filter; @@ -65,11 +62,7 @@ public MongodbReader(DataTransferConfig config, StreamExecutionEnvironment env) database = readerConfig.getParameter().getStringVal(KEY_DATABASE); collection = readerConfig.getParameter().getStringVal(KEY_COLLECTION); filter = readerConfig.getParameter().getStringVal(KEY_FILTER); - - for (Object item : readerConfig.getParameter().getColumn()) { - Map colMap = (Map)item; - columns.add(new Column(colMap.get(KEY_NAME),colMap.get(KEY_TYPE),colMap.get(KEY_SPLITTER))); - } + metaColumns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); } @Override @@ -82,7 +75,7 @@ public DataStream readData() { builder.setDatabase(database); builder.setCollection(collection); builder.setFilter(filter); - builder.setColumns(columns); + builder.setMetaColumns(metaColumns); builder.setMonitorUrls(monitorUrls); builder.setBytes(bytes); diff --git a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormat.java b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormat.java index b9fd9266a0..1c8f844826 100644 --- a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormat.java +++ b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormat.java @@ -19,9 +19,9 @@ package com.dtstack.flinkx.mongodb.writer; import com.dtstack.flinkx.exception.WriteRecordException; -import com.dtstack.flinkx.mongodb.Column; import com.dtstack.flinkx.mongodb.MongodbUtil; import com.dtstack.flinkx.outputformat.RichOutputFormat; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.writer.WriteMode; import com.mongodb.client.MongoCollection; import org.apache.commons.lang.StringUtils; @@ -55,7 +55,7 @@ public class MongodbOutputFormat extends RichOutputFormat { protected String collectionName; - protected List columns; + protected List columns; protected String replaceKey; diff --git a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormatBuilder.java b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormatBuilder.java index abd6165a0c..3790536adf 100644 --- a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormatBuilder.java +++ b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbOutputFormatBuilder.java @@ -18,11 +18,12 @@ package com.dtstack.flinkx.mongodb.writer; -import com.dtstack.flinkx.mongodb.Column; import com.dtstack.flinkx.outputformat.RichOutputFormatBuilder; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; + /** * The builder for mongodb writer plugin * @@ -57,7 +58,7 @@ public void setCollection(String collection){ format.collectionName = collection; } - public void setColumns(List columns){ + public void setColumns(List columns){ format.columns = columns; } diff --git a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbWriter.java b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbWriter.java index 81ebecd5f8..0ea3841899 100644 --- a/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbWriter.java +++ b/flinkx-mongodb/flinkx-mongodb-writer/src/main/java/com/dtstack/flinkx/mongodb/writer/MongodbWriter.java @@ -20,16 +20,14 @@ import com.dtstack.flinkx.config.DataTransferConfig; import com.dtstack.flinkx.config.WriterConfig; -import com.dtstack.flinkx.mongodb.Column; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.writer.DataWriter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.types.Row; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import static com.dtstack.flinkx.mongodb.MongodbConfigKeys.*; import static com.dtstack.flinkx.mongodb.MongodbConfigKeys.KEY_COLLECTION; @@ -52,7 +50,7 @@ public class MongodbWriter extends DataWriter { protected String collection; - protected List columns = new ArrayList<>(); + protected List columns; protected String replaceKey; @@ -68,10 +66,7 @@ public MongodbWriter(DataTransferConfig config) { mode = writerConfig.getParameter().getStringVal(KEY_MODE); replaceKey = writerConfig.getParameter().getStringVal(KEY_REPLACE_KEY); - for (Object item : writerConfig.getParameter().getColumn()) { - Map colMap = (Map)item; - columns.add(new Column(colMap.get(KEY_NAME),colMap.get(KEY_TYPE),colMap.get(KEY_SPLITTER))); - } + columns = MetaColumn.getMetaColumns(writerConfig.getParameter().getColumn()); } @Override diff --git a/flinkx-mysql/flinkx-mysql-core/src/main/java/com/dtstack/flinkx/mysql/MySqlDatabaseMeta.java b/flinkx-mysql/flinkx-mysql-core/src/main/java/com/dtstack/flinkx/mysql/MySqlDatabaseMeta.java index 6129329886..48517f2f24 100644 --- a/flinkx-mysql/flinkx-mysql-core/src/main/java/com/dtstack/flinkx/mysql/MySqlDatabaseMeta.java +++ b/flinkx-mysql/flinkx-mysql-core/src/main/java/com/dtstack/flinkx/mysql/MySqlDatabaseMeta.java @@ -63,6 +63,11 @@ public String getEndQuote() { return "`"; } + @Override + public String quoteValue(String value, String column) { + return String.format("\"%s\" as %s",value,column); + } + @Override public String getReplaceStatement(List column, List fullColumn, String table, Map> updateKey) { return "REPLACE INTO " + quoteTable(table) diff --git a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormat.java b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormat.java index 000208963b..1a4135ba10 100644 --- a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormat.java +++ b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormat.java @@ -25,7 +25,7 @@ import com.aliyun.odps.tunnel.TableTunnel; import com.dtstack.flinkx.inputformat.RichInputFormat; import com.dtstack.flinkx.odps.OdpsUtil; -import com.dtstack.flinkx.reader.ByteRateLimiter; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.StringUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -48,11 +48,7 @@ */ public class OdpsInputFormat extends RichInputFormat { - protected List columnName; - - protected List columnValue; - - protected List columnType; + protected List metaColumns; protected String sessionId; @@ -164,25 +160,34 @@ record = recordReader.read(); } @Override - public Row nextRecordInternal(Row row) throws IOException {; - row = new Row(columnName.size()); - for(int i = 0; i < columnName.size(); ++i) { - String colName = columnName.get(i); - String val = columnValue.get(i); - String type = columnType.get(i); - if(StringUtils.isNotEmpty(colName)) { - Object field = record.get(colName); - if(field instanceof byte[]) { - field = new String((byte[]) field); + public Row nextRecordInternal(Row row) throws IOException { + if (metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){ + row = new Row(record.getColumnCount()); + for (int i = 0; i < record.getColumnCount(); i++) { + row.setField(i,record.get(i)); + } + } else { + row = new Row(metaColumns.size()); + for (int i = 0; i < metaColumns.size(); i++) { + MetaColumn metaColumn = metaColumns.get(i); + Object val; + if(metaColumn.getValue() != null){ + val = metaColumn.getValue(); + } else { + val = record.get(metaColumn.getName()); + if(val instanceof byte[]) { + val = new String((byte[]) val); + } } - row.setField(i, field); - } else if(val != null && type != null) { - Object col = StringUtil.string2col(val,type); - row.setField(i, col); - } else { - throw new RuntimeException("Illegal column format"); + + if(val != null){ + val = StringUtil.string2col(String.valueOf(val),metaColumn.getType(),metaColumn.getTimeFormat()); + } + + row.setField(i,val); } } + return row; } diff --git a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormatBuilder.java b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormatBuilder.java index f8e351a400..2c230efda1 100644 --- a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormatBuilder.java +++ b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsInputFormatBuilder.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.odps.reader; import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; import java.util.Map; @@ -47,16 +48,8 @@ public void setTableName(String tableName) { format.tableName = tableName; } - public void setColumnName(List columnName) { - format.columnName = columnName; - } - - public void setColumnType(List columnType) { - format.columnType = columnType; - } - - public void setColumnValue(List columnValue) { - format.columnValue = columnValue; + public void setMetaColumn(List metaColumns){ + format.metaColumns = metaColumns; } public void setPartition(String partition) { diff --git a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsReader.java b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsReader.java index 8aab787c86..c16f7e5122 100644 --- a/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsReader.java +++ b/flinkx-odps/flinkx-odps-reader/src/main/java/com/dtstack/flinkx/odps/reader/OdpsReader.java @@ -21,10 +21,10 @@ import com.dtstack.flinkx.config.DataTransferConfig; import com.dtstack.flinkx.config.ReaderConfig; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; -import java.util.ArrayList; import java.util.List; import java.util.Map; import static com.dtstack.flinkx.odps.OdpsConfigKeys.*; @@ -36,9 +36,7 @@ */ public class OdpsReader extends DataReader { private Map odpsConfig; - protected List columnName; - protected List columnType; - protected List columnValue; + private List metaColumns; protected String tableName; protected String partition; @@ -51,34 +49,14 @@ public OdpsReader(DataTransferConfig config, StreamExecutionEnvironment env) { tableName = readerConfig.getParameter().getStringVal(KEY_TABLE); partition = readerConfig.getParameter().getStringVal(KEY_PARTITION); - List columns = readerConfig.getParameter().getColumn(); - if(columns != null && columns.size() > 0) { - if(columns.get(0) instanceof Map) { - columnType = new ArrayList<>(); - columnValue = new ArrayList<>(); - columnName = new ArrayList<>(); - for(int i = 0; i < columns.size(); ++i) { - Map sm = (Map) columns.get(i); - columnType.add((String) sm.get(KEY_COLUMN_TYPE)); - columnValue.add((String) sm.get(KEY_COLUMN_VALUE)); - columnName.add((String) sm.get(KEY_COLUMN_NAME)); - } - System.out.println("init column finished"); - } else if (!columns.get(0).equals("*") || columns.size() != 1) { - throw new IllegalArgumentException("column argument error"); - } - } else{ - throw new IllegalArgumentException("column argument error"); - } + metaColumns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); } @Override public DataStream readData() { OdpsInputFormatBuilder builder = new OdpsInputFormatBuilder(); - builder.setColumnName(columnName); - builder.setColumnType(columnType); - builder.setColumnValue(columnValue); + builder.setMetaColumn(metaColumns); builder.setOdpsConfig(odpsConfig); builder.setTableName(tableName); builder.setPartition(partition); diff --git a/flinkx-odps/flinkx-odps-writer/src/main/java/com/dtstack/flinkx/odps/writer/OdpsOutputFormat.java b/flinkx-odps/flinkx-odps-writer/src/main/java/com/dtstack/flinkx/odps/writer/OdpsOutputFormat.java index 2f0b163571..5fc3a6c5b3 100644 --- a/flinkx-odps/flinkx-odps-writer/src/main/java/com/dtstack/flinkx/odps/writer/OdpsOutputFormat.java +++ b/flinkx-odps/flinkx-odps-writer/src/main/java/com/dtstack/flinkx/odps/writer/OdpsOutputFormat.java @@ -149,7 +149,7 @@ private Record row2record(Row row, String[] columnTypes) throws WriteRecordExcep case DATE: case DATETIME: case TIMESTAMP: - record.setDatetime(i, DateUtil.columnToTimestamp(column)); + record.setDatetime(i, DateUtil.columnToTimestamp(column,null)); break; default: throw new IllegalArgumentException(); diff --git a/flinkx-oracle/flinkx-oracle-core/src/main/java/com/dtstack/flinkx/oracle/OracleDatabaseMeta.java b/flinkx-oracle/flinkx-oracle-core/src/main/java/com/dtstack/flinkx/oracle/OracleDatabaseMeta.java index f7b5f5d530..8e287ce05d 100644 --- a/flinkx-oracle/flinkx-oracle-core/src/main/java/com/dtstack/flinkx/oracle/OracleDatabaseMeta.java +++ b/flinkx-oracle/flinkx-oracle-core/src/main/java/com/dtstack/flinkx/oracle/OracleDatabaseMeta.java @@ -58,6 +58,11 @@ public String getSQLQueryColumnFields(List column, String table) { return "SELECT /*+FIRST_ROWS*/ " + quoteColumns(column) + " FROM " + quoteTable(table) + " WHERE ROWNUM < 1"; } + @Override + public String quoteValue(String value, String column) { + return String.format("'%s' as %s",value,column); + } + @Override public String getSplitFilter(String columnName) { return String.format("mod(%s, ${N}) = ${M}", getStartQuote() + columnName + getEndQuote()); diff --git a/flinkx-postgresql/flinkx-postgresql-core/src/main/java/com/dtstack/flinkx/postgresql/PostgresqlDatabaseMeta.java b/flinkx-postgresql/flinkx-postgresql-core/src/main/java/com/dtstack/flinkx/postgresql/PostgresqlDatabaseMeta.java index 5669a66137..1dd360c348 100644 --- a/flinkx-postgresql/flinkx-postgresql-core/src/main/java/com/dtstack/flinkx/postgresql/PostgresqlDatabaseMeta.java +++ b/flinkx-postgresql/flinkx-postgresql-core/src/main/java/com/dtstack/flinkx/postgresql/PostgresqlDatabaseMeta.java @@ -100,6 +100,11 @@ public String getEndQuote() { return ""; } + @Override + public String quoteValue(String value, String column) { + return String.format("'%s' as %s",value,column); + } + @Override public EDatabaseType getDatabaseType() { return EDatabaseType.PostgreSQL; diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/DatabaseInterface.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/DatabaseInterface.java index 08af5d6b2d..d2ff546f20 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/DatabaseInterface.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/DatabaseInterface.java @@ -43,6 +43,8 @@ public interface DatabaseInterface { String getEndQuote(); + String quoteValue(String value,String column); + String quoteColumn(String column); String quoteColumns(List column, String table); diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/DistributedJdbcDataReader.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/DistributedJdbcDataReader.java index 9ea2fe1b83..a3e9da992e 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/DistributedJdbcDataReader.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/DistributedJdbcDataReader.java @@ -24,10 +24,10 @@ import com.dtstack.flinkx.rdb.DataSource; import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.inputformat.DistributedJdbcInputFormatBuilder; -import com.dtstack.flinkx.rdb.inputformat.DistributedJdbcInputSplit; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; import com.dtstack.flinkx.rdb.util.DBUtil; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; @@ -51,7 +51,7 @@ public class DistributedJdbcDataReader extends DataReader { protected String password; - protected List column; + protected List metaColumns; protected String where; @@ -74,7 +74,7 @@ protected DistributedJdbcDataReader(DataTransferConfig config, StreamExecutionEn username = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_USER_NAME); password = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_PASSWORD); where = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_WHERE); - column = readerConfig.getParameter().getColumn(); + metaColumns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); splitKey = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_SPLIK_KEY); connectionConfigs = readerConfig.getParameter().getConnection(); fetchSize = readerConfig.getParameter().getIntVal(JdbcConfigKeys.KEY_FETCH_SIZE,0); @@ -92,7 +92,7 @@ public DataStream readData() { builder.setMonitorUrls(monitorUrls); builder.setDatabaseInterface(databaseInterface); builder.setTypeConverter(typeConverter); - builder.setColumn(column); + builder.setMetaColumn(metaColumns); builder.setSourceList(buildConnections()); builder.setNumPartitions(numPartitions); builder.setSplitKey(splitKey); diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java index 189a31303f..16083d3639 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java @@ -26,6 +26,7 @@ import com.dtstack.flinkx.rdb.type.TypeConverterInterface; import com.dtstack.flinkx.rdb.util.DBUtil; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.ClassUtil; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -40,6 +41,7 @@ import java.sql.Timestamp; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; /** * The Reader plugin for any database that can be connected via JDBC. @@ -59,7 +61,7 @@ public class JdbcDataReader extends DataReader { protected String password; - protected List column; + protected List metaColumns; protected String[] columnTypes; @@ -93,7 +95,7 @@ public JdbcDataReader(DataTransferConfig config, StreamExecutionEnvironment env) password = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_PASSWORD); table = readerConfig.getParameter().getConnection().get(0).getTable().get(0); where = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_WHERE); - column = readerConfig.getParameter().getColumn(); + metaColumns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); fetchSize = readerConfig.getParameter().getIntVal(JdbcConfigKeys.KEY_FETCH_SIZE,0); queryTimeOut = readerConfig.getParameter().getIntVal(JdbcConfigKeys.KEY_QUERY_TIME_OUT,0); splitKey = readerConfig.getParameter().getStringVal(JdbcConfigKeys.KEY_SPLIK_KEY); @@ -112,7 +114,7 @@ public DataStream readData() { builder.setTable(table); builder.setDatabaseInterface(databaseInterface); builder.setTypeConverter(typeConverter); - builder.setColumn(column); + builder.setMetaColumn(metaColumns); builder.setFetchSize(fetchSize == 0 ? databaseInterface.getFetchSize() : fetchSize); builder.setQueryTimeOut(queryTimeOut == 0 ? databaseInterface.getQueryTimeout() : queryTimeOut); @@ -122,7 +124,7 @@ public DataStream readData() { isSplitByKey = true; } - String query = DBUtil.getQuerySql(databaseInterface,table,column,splitKey,where,isSplitByKey); + String query = DBUtil.getQuerySql(databaseInterface,table,metaColumns,splitKey,where,isSplitByKey); builder.setQuery(query); RichInputFormat format = builder.finish(); @@ -136,8 +138,8 @@ public DataStream readData() { * @return */ private RowTypeInfo getColumnTypes() { - - String sql = databaseInterface.getSQLQueryColumnFields(column, table); + List columnNames = metaColumns.stream().map(MetaColumn::getName).collect(Collectors.toList()); + String sql = databaseInterface.getSQLQueryColumnFields(columnNames, table); try (Connection conn = getConnection()) { Statement stmt = conn.createStatement(); diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormat.java index f4d28a6c63..4206c180eb 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormat.java @@ -24,7 +24,9 @@ import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; import com.dtstack.flinkx.rdb.util.DBUtil; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.ClassUtil; +import com.dtstack.flinkx.util.StringUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.types.Row; @@ -33,7 +35,6 @@ import java.sql.*; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; /** @@ -82,7 +83,7 @@ public class DistributedJdbcInputFormat extends RichInputFormat { protected String where; - protected List column; + protected List metaColumns; protected TypeConverterInterface typeConverter; @@ -116,7 +117,7 @@ private void openNextSource() throws SQLException{ DataSource currentSource = sourceList.get(sourceIndex); currentConn = DBUtil.getConnection(currentSource.getJdbcUrl(), currentSource.getUserName(), currentSource.getPassword()); currentConn.setAutoCommit(false); - String queryTemplate = DBUtil.getQuerySql(databaseInterface, currentSource.getTable(),column,splitKey,where, currentSource.isSplitByKey()); + String queryTemplate = DBUtil.getQuerySql(databaseInterface, currentSource.getTable(),metaColumns,splitKey,where, currentSource.isSplitByKey()); currentStatement = currentConn.createStatement(resultSetType, resultSetConcurrency); if (currentSource.isSplitByKey()){ @@ -142,7 +143,7 @@ private void openNextSource() throws SQLException{ if(descColumnTypeList == null) { descColumnTypeList = DBUtil.analyzeTable(currentSource.getJdbcUrl(), currentSource.getUserName(), - currentSource.getPassword(),databaseInterface, currentSource.getTable(),column); + currentSource.getPassword(),databaseInterface, currentSource.getTable(),metaColumns); } LOG.info("open source:" + currentSource.getJdbcUrl() + ",table:" + currentSource.getTable()); @@ -158,6 +159,13 @@ private boolean readNextRecord() throws IOException{ if (hasNext){ currentRecord = new Row(columnCount); DBUtil.getRow(databaseInterface.getDatabaseType(),currentRecord,descColumnTypeList,currentResultSet,typeConverter); + for (int i = 0; i < columnCount; i++) { + Object val = currentRecord.getField(i); + if (val != null && val instanceof String){ + val = StringUtil.string2col(String.valueOf(val),metaColumns.get(i).getType(),metaColumns.get(i).getTimeFormat()); + currentRecord.setField(i,val); + } + } } else { if(sourceIndex + 1 < sourceList.size()){ closeCurrentSource(); diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormatBuilder.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormatBuilder.java index 2b86467c92..00ed8f03f2 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormatBuilder.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/DistributedJdbcInputFormatBuilder.java @@ -22,6 +22,7 @@ import com.dtstack.flinkx.rdb.DataSource; import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; @@ -59,8 +60,8 @@ public void setTypeConverter(TypeConverterInterface converter){ format.typeConverter = converter; } - public void setColumn(List column){ - format.column = column; + public void setMetaColumn(List metaColumns){ + format.metaColumns = metaColumns; } public void setSplitKey(String splitKey){ diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java index e7cbf99ee0..96b882800f 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java @@ -22,7 +22,9 @@ import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; import com.dtstack.flinkx.rdb.util.DBUtil; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.ClassUtil; +import com.dtstack.flinkx.util.StringUtil; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; @@ -80,7 +82,7 @@ public class JdbcInputFormat extends RichInputFormat { protected TypeConverterInterface typeConverter; - protected List column; + protected List metaColumns; protected int fetchSize; @@ -129,7 +131,7 @@ public void openInternal(InputSplit inputSplit) throws IOException { columnCount = resultSet.getMetaData().getColumnCount(); if(descColumnTypeList == null) { - descColumnTypeList = DBUtil.analyzeTable(dbURL, username, password,databaseInterface,table,column); + descColumnTypeList = DBUtil.analyzeTable(dbURL, username, password,databaseInterface,table,metaColumns); } } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); @@ -177,6 +179,13 @@ public Row nextRecordInternal(Row row) throws IOException { } DBUtil.getRow(databaseInterface.getDatabaseType(),row,descColumnTypeList,resultSet,typeConverter); + for (int i = 0; i < columnCount; i++) { + Object val = row.getField(i); + if (val != null && val instanceof String){ + val = StringUtil.string2col(String.valueOf(val),metaColumns.get(i).getType(),metaColumns.get(i).getTimeFormat()); + row.setField(i,val); + } + } //update hasNext after we've read the record hasNext = resultSet.next(); diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormatBuilder.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormatBuilder.java index 0bc776d24e..21c63618ae 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormatBuilder.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormatBuilder.java @@ -21,6 +21,7 @@ import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; @@ -75,8 +76,8 @@ public void setTypeConverter(TypeConverterInterface converter){ format.typeConverter = converter; } - public void setColumn(List column){ - format.column = column; + public void setMetaColumn(List metaColumns){ + format.metaColumns = metaColumns; } public void setFetchSize(int fetchSize){ diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java index e66839db4f..0e6e56735c 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java @@ -207,9 +207,9 @@ private Object convertField(Row row, int index) { if(EDatabaseType.Oracle == databaseInterface.getDatabaseType()) { String type = columnType.get(index); if(type.equalsIgnoreCase("DATE")) { - field = DateUtil.columnToDate(field); + field = DateUtil.columnToDate(field,null); } else if(type.equalsIgnoreCase("TIMESTAMP")){ - field = DateUtil.columnToTimestamp(field); + field = DateUtil.columnToTimestamp(field,null); } } else if(EDatabaseType.PostgreSQL == databaseInterface.getDatabaseType()){ if(columnType != null && columnType.size() != 0) { diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/util/DBUtil.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/util/DBUtil.java index fd9d7db5c5..8a773363a5 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/util/DBUtil.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/util/DBUtil.java @@ -21,10 +21,12 @@ import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.ParameterValuesProvider; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; +import com.dtstack.flinkx.reader.MetaColumn; import com.dtstack.flinkx.util.ClassUtil; import com.dtstack.flinkx.util.DateUtil; import com.dtstack.flinkx.util.SysUtil; import com.dtstack.flinkx.util.TelnetUtil; +import org.apache.commons.lang.StringUtils; import org.apache.flink.types.Row; import java.io.BufferedReader; @@ -212,7 +214,7 @@ public Serializable[][] getParameterValues() { } public static List analyzeTable(String dbURL,String username,String password,DatabaseInterface databaseInterface, - String table,List column) { + String table,List metaColumns) { List ret = new ArrayList<>(); Connection dbConn = null; Statement stmt = null; @@ -228,8 +230,12 @@ public static List analyzeTable(String dbURL,String username,String pass nameTypeMap.put(rd.getColumnName(i+1),rd.getColumnTypeName(i+1)); } - for (String col : column) { - ret.add(nameTypeMap.get(col)); + for (MetaColumn metaColumn : metaColumns) { + if(metaColumn.getValue() != null){ + ret.add("string"); + } else { + ret.add(nameTypeMap.get(metaColumn.getName())); + } } } catch (SQLException e) { throw new RuntimeException(e); @@ -336,10 +342,20 @@ public static void getRow(EDatabaseType dbType, Row row, List descColumn } } - public static String getQuerySql(DatabaseInterface databaseInterface,String table,List column, + public static String getQuerySql(DatabaseInterface databaseInterface,String table,List metaColumns, String splitKey,String where,boolean isSplitByKey) { StringBuilder sb = new StringBuilder(); - sb.append("SELECT ").append(databaseInterface.quoteColumns(column)).append(" FROM "); + + List selectColumns = new ArrayList<>(); + for (MetaColumn metaColumn : metaColumns) { + if (metaColumn.getValue() != null){ + selectColumns.add(databaseInterface.quoteValue(metaColumn.getValue(),metaColumn.getName())); + } else { + selectColumns.add(databaseInterface.quoteColumn(metaColumn.getName())); + } + } + + sb.append("SELECT ").append(StringUtils.join(selectColumns,",")).append(" FROM "); sb.append(databaseInterface.quoteTable(table)); StringBuilder filter = new StringBuilder(); diff --git a/flinkx-sqlserver/flinkx-sqlserver-core/src/main/java/com/dtstack/flinkx/sqlserver/SqlServerDatabaseMeta.java b/flinkx-sqlserver/flinkx-sqlserver-core/src/main/java/com/dtstack/flinkx/sqlserver/SqlServerDatabaseMeta.java index ebfed9dbe0..1d4995a0a5 100644 --- a/flinkx-sqlserver/flinkx-sqlserver-core/src/main/java/com/dtstack/flinkx/sqlserver/SqlServerDatabaseMeta.java +++ b/flinkx-sqlserver/flinkx-sqlserver-core/src/main/java/com/dtstack/flinkx/sqlserver/SqlServerDatabaseMeta.java @@ -51,6 +51,11 @@ public String getSQLQueryColumnFields(List column, String table) { return "SELECT TOP 1 " + quoteColumns(column) + " FROM " + quoteTable(table); } + @Override + public String quoteValue(String value, String column) { + return String.format("'%s' as %s",value,column); + } + @Override public String getSplitFilter(String columnName) { return String.format("%s %% ${N} = ${M}", getStartQuote() + columnName + getEndQuote()); diff --git a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormat.java b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormat.java index 1f4402f401..545353144a 100644 --- a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormat.java +++ b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormat.java @@ -19,6 +19,8 @@ package com.dtstack.flinkx.stream.reader; import com.dtstack.flinkx.inputformat.RichInputFormat; +import com.dtstack.flinkx.reader.MetaColumn; +import com.dtstack.flinkx.util.StringUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; @@ -26,7 +28,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; /** * @Company: www.dtstack.com @@ -42,13 +43,15 @@ public class StreamInputFormat extends RichInputFormat { protected long sliceRecordCount; - protected List> columns; + protected List columns; @Override public void openInternal(InputSplit inputSplit) throws IOException { staticData = new Row(columns.size()); for (int i = 0; i < columns.size(); i++) { - staticData.setField(i,columns.get(i).get("value")); + MetaColumn col = columns.get(i); + Object value = StringUtil.string2col(col.getValue(),col.getType(),col.getTimeFormat()); + staticData.setField(i,value); } } diff --git a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormatBuilder.java b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormatBuilder.java index f95e292d99..2190b5100a 100644 --- a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormatBuilder.java +++ b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamInputFormatBuilder.java @@ -19,9 +19,9 @@ package com.dtstack.flinkx.stream.reader; import com.dtstack.flinkx.inputformat.RichInputFormatBuilder; +import com.dtstack.flinkx.reader.MetaColumn; import java.util.List; -import java.util.Map; /** * @Company: www.dtstack.com @@ -39,7 +39,7 @@ public void setSliceRecordCount(long sliceRecordCount){ format.sliceRecordCount = sliceRecordCount; } - public void setColumns(List> columns){ + public void setColumns(List columns){ format.columns = columns; } diff --git a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamReader.java b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamReader.java index feeefe3fe8..28368e2fd1 100644 --- a/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamReader.java +++ b/flinkx-stream/flinkx-stream-reader/src/main/java/com/dtstack/flinkx/stream/reader/StreamReader.java @@ -21,12 +21,12 @@ import com.dtstack.flinkx.config.DataTransferConfig; import com.dtstack.flinkx.config.ReaderConfig; import com.dtstack.flinkx.reader.DataReader; +import com.dtstack.flinkx.reader.MetaColumn; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import java.util.List; -import java.util.Map; /** * Read plugin for reading static data @@ -38,7 +38,7 @@ public class StreamReader extends DataReader { private long sliceRecordCount; - private List> columns; + private List columns; /** -1 means no limit */ private static final long DEFAULT_SLICE_RECORD_COUNT = -1; @@ -48,7 +48,7 @@ public StreamReader(DataTransferConfig config, StreamExecutionEnvironment env) { ReaderConfig readerConfig = config.getJob().getContent().get(0).getReader(); sliceRecordCount = readerConfig.getParameter().getLongVal("sliceRecordCount",DEFAULT_SLICE_RECORD_COUNT); - columns = readerConfig.getParameter().getColumn(); + columns = MetaColumn.getMetaColumns(readerConfig.getParameter().getColumn()); } @Override