Skip to content

Commit

Permalink
Support static data and customize time format
Browse files Browse the repository at this point in the history
  • Loading branch information
lijiangbo committed Nov 26, 2018
1 parent 8251187 commit eb45ce9
Show file tree
Hide file tree
Showing 45 changed files with 519 additions and 434 deletions.
132 changes: 132 additions & 0 deletions flinkx-core/src/main/java/com/dtstack/flinkx/reader/MetaColumn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flinkx.reader;

import com.dtstack.flinkx.util.DateUtil;
import org.apache.commons.lang3.time.FastDateFormat;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
*
* @author jiangbo
* @date 2018/11/26
*/
public class MetaColumn {

private String name;

private String type;

private Integer index;

private String value;

private FastDateFormat timeFormat;

private String splitter;

public String getSplitter() {
return splitter;
}

public void setSplitter(String splitter) {
this.splitter = splitter;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public Integer getIndex() {
return index;
}

public void setIndex(Integer index) {
this.index = index;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public FastDateFormat getTimeFormat() {
return timeFormat;
}

public void setTimeFormat(FastDateFormat timeFormat) {
this.timeFormat = timeFormat;
}

public static List<MetaColumn> getMetaColumns(List columns){
List<MetaColumn> metaColumns = new ArrayList<>();
if(columns != null && columns.size() > 0) {
if (columns.get(0) instanceof Map) {
for (int i = 0; i < columns.size(); i++) {
Map sm = (Map) columns.get(i);
MetaColumn mc = new MetaColumn();
mc.setIndex((Integer) sm.get("index"));
mc.setType((String) sm.get("type"));
mc.setValue((String) sm.get("value"));
mc.setSplitter((String) sm.get("splitter"));

if(sm.get("format") != null){
mc.setTimeFormat(DateUtil.getDateFormatter((String) sm.get("format")));
}

metaColumns.add(mc);
}
} else if (columns.get(0) instanceof String) {
if(columns.size() == 1 && columns.get(0).equals("*")){
MetaColumn mc = new MetaColumn();
mc.setName("*");
metaColumns.add(mc);
} else {
for (Object column : columns) {
MetaColumn mc = new MetaColumn();
mc.setName(String.valueOf(column));
metaColumns.add(mc);
}
}
} else {
throw new IllegalArgumentException("column argument error");
}
}

return metaColumns;
}
}
21 changes: 16 additions & 5 deletions flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public class DateUtil {
private DateUtil() {}


public static java.sql.Date columnToDate(Object column) {
public static java.sql.Date columnToDate(Object column,FastDateFormat customTimeFormat) {
if(column == null) {
return null;
} else if(column instanceof String) {
if (((String) column).length() == 0){
return null;
}
return new java.sql.Date(stringToDate((String)column).getTime());
return new java.sql.Date(stringToDate((String)column,customTimeFormat).getTime());
} else if (column instanceof Integer) {
Integer rawData = (Integer) column;
return new java.sql.Date(rawData.longValue());
Expand All @@ -82,14 +82,14 @@ public static java.sql.Date columnToDate(Object column) {
throw new IllegalArgumentException("Can't convert " + column.getClass().getName() + " to Date");
}

public static java.sql.Timestamp columnToTimestamp(Object column) {
public static java.sql.Timestamp columnToTimestamp(Object column,FastDateFormat customTimeFormat) {
if (column == null) {
return null;
} else if(column instanceof String) {
if (((String) column).length() == 0){
return null;
}
return new java.sql.Timestamp(stringToDate((String)column).getTime());
return new java.sql.Timestamp(stringToDate((String)column,customTimeFormat).getTime());
} else if (column instanceof Integer) {
Integer rawData = (Integer) column;
return new java.sql.Timestamp(rawData.longValue());
Expand All @@ -108,11 +108,18 @@ public static java.sql.Timestamp columnToTimestamp(Object column) {
throw new IllegalArgumentException("Can't convert " + column.getClass().getName() + " to Date");
}

public static Date stringToDate(String strDate) {
public static Date stringToDate(String strDate,FastDateFormat customTimeFormat) {
if(strDate == null || strDate.trim().length() == 0) {
return null;
}

if(customTimeFormat != null){
try {
return customTimeFormat.parse(strDate);
} catch (ParseException ignored) {
}
}

try {
return datetimeFormatter.parse(strDate);
} catch (ParseException ignored) {
Expand Down Expand Up @@ -148,6 +155,10 @@ public static String dateToYearString(Date date) {
return yearFormatter.format(date);
}

public static FastDateFormat getDateFormatter(String timeFormat){
return FastDateFormat.getInstance(timeFormat, timeZoner);
}

static {
timeZoner = TimeZone.getTimeZone(timeZone);
datetimeFormatter = FastDateFormat.getInstance(datetimeFormat, timeZoner);
Expand Down
17 changes: 12 additions & 5 deletions flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.flinkx.common.ColumnType;
import com.dtstack.flinkx.exception.WriteRecordException;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -67,9 +68,15 @@ public static String convertRegularExpr (String str) {
return str;
}

public static Object string2col(String str, String type) {
public static Object string2col(String str, String type, FastDateFormat customTimeFormat) {
if(str == null || str.length() == 0){
return null;
}

if(type == null){
return str;
}

Preconditions.checkNotNull(type);
ColumnType columnType = valueOf(type.toUpperCase());
Object ret;
switch(columnType) {
Expand Down Expand Up @@ -101,14 +108,14 @@ public static Object string2col(String str, String type) {
ret = Boolean.valueOf(str.toLowerCase());
break;
case DATE:
ret = DateUtil.columnToDate(str);
ret = DateUtil.columnToDate(str,customTimeFormat);
break;
case TIMESTAMP:
case DATETIME:
ret = DateUtil.columnToTimestamp(str);
ret = DateUtil.columnToTimestamp(str,customTimeFormat);
break;
default:
throw new IllegalArgumentException();
throw new IllegalArgumentException("Unsupported field type:" + type);
}

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,9 @@ public String getStartQuote() {
public String getEndQuote() {
return "";
}

@Override
public String quoteValue(String value, String column) {
return String.format("'%s' as %s",value,column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private static Object convertValueToAssignType(String columnType, String constan
column = constantValue;
break;
case "DATE":
column = DateUtil.stringToDate(constantValue);
column = DateUtil.stringToDate(constantValue,null);
break;
default:
throw new IllegalArgumentException("Unsupported column type: " + columnType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.dtstack.flinkx.ftp.StandardFtpHandler;
import com.dtstack.flinkx.inputformat.RichInputFormat;
import com.dtstack.flinkx.reader.ByteRateLimiter;
import com.dtstack.flinkx.reader.MetaColumn;
import com.dtstack.flinkx.util.StringUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
Expand Down Expand Up @@ -65,11 +66,7 @@ public class FtpInputFormat extends RichInputFormat {

protected String charsetName = "utf-8";

protected List<Integer> columnIndex;

protected List<String> columnValue;

protected List<String> columnType;
protected List<MetaColumn> metaColumns;

protected transient boolean isFirstLineHeader;

Expand Down Expand Up @@ -116,8 +113,6 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

@Override
public void openInternal(InputSplit split) throws IOException {


FtpInputSplit inputSplit = (FtpInputSplit)split;
List<String> paths = inputSplit.getPaths();
FtpSeqInputStream is = new FtpSeqInputStream(ftpHandler, paths);
Expand All @@ -144,21 +139,33 @@ public boolean reachedEnd() throws IOException {

@Override
public Row nextRecordInternal(Row row) throws IOException {
row = new Row(columnIndex.size());
String[] fields = line.split(delimiter);
for(int i = 0; i < columnIndex.size(); ++i) {
Integer index = columnIndex.get(i);
String val = columnValue.get(i);
if(index != null) {
String col = fields[index];
row.setField(i, col);
} else if(val != null) {
String type = columnType.get(i);
Object col = StringUtil.string2col(val,type);
row.setField(i, col);
if (metaColumns.size() == 1 && metaColumns.get(0).getName().equals("*")){
row = new Row(fields.length);
for (int i = 0; i < fields.length; i++) {
row.setField(i, fields[i]);
}
} else {
row = new Row(metaColumns.size());
for (int i = 0; i < metaColumns.size(); i++) {
MetaColumn metaColumn = metaColumns.get(i);
Object value;
if(metaColumn.getValue() != null){
value = metaColumn.getValue();
} else if(metaColumn.getIndex() != null){
value = fields[metaColumn.getIndex()];
} else {
value = null;
}

if(value != null){
value = StringUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat());
}

row.setField(i, value);
}

}

return row;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dtstack.flinkx.ftp.reader;

import com.dtstack.flinkx.inputformat.RichInputFormatBuilder;
import com.dtstack.flinkx.reader.MetaColumn;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
import java.util.List;
Expand Down Expand Up @@ -60,22 +61,14 @@ public void setEncoding(String encoding) {
}
}

public void setColumnIndex(List<Integer> columnIndex) {
format.columnIndex = columnIndex;
public void setMetaColumn(List<MetaColumn> metaColumns) {
format.metaColumns = metaColumns;
}

public void setIsFirstLineHeader(boolean isFirstLineHeader){
format.isFirstLineHeader = isFirstLineHeader;
}

public void setColumnValue(List<String> columnValue) {
format.columnValue = columnValue;
}

public void setColumnType(List<String> columnType) {
format.columnType = columnType;
}

@Override
protected void checkFormat() {

Expand Down
Loading

0 comments on commit eb45ce9

Please sign in to comment.