diff --git a/format-xls/pom.xml b/format-xls/pom.xml
index 77358cfdb..227f20fe5 100644
--- a/format-xls/pom.xml
+++ b/format-xls/pom.xml
@@ -26,7 +26,7 @@
XLS format plugins
jar
- 5.2.4
+ 5.2.5
2.20.0
@@ -45,7 +45,7 @@
org.apache.logging.log4j
log4j-core
compile
- ${log4j-core.version}
+ ${log4j-core.version}
io.cdap.cdap
@@ -64,12 +64,30 @@
format-common
${project.version}
-
-
junit
junit
+
+ commons-io
+ commons-io
+ 2.15.0
+
+
+ org.apache.commons
+ commons-compress
+ 1.26.0
+
+
+ com.github.pjfanning
+ excel-streaming-reader
+ 4.2.1
+
+
+ com.github.pjfanning
+ poi-shared-strings
+ 2.8.0
+
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java
index 0b79787f6..18ee5c4a8 100644
--- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java
@@ -16,6 +16,9 @@
package io.cdap.plugin.format.xls.input;
+
+import com.github.pjfanning.xlsx.StreamingReader;
+import com.google.common.base.Preconditions;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
@@ -25,17 +28,23 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.poi.EmptyFileException;
+import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
+import org.apache.poi.util.IOUtils;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
import javax.annotation.Nullable;
@@ -51,11 +60,12 @@ public class XlsInputFormat extends PathTrackingInputFormat {
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
+ protected static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2;
@Override
protected RecordReader createRecordReader(
- FileSplit split, TaskAttemptContext context, @Nullable String pathField,
- @Nullable Schema schema) throws IOException {
+ FileSplit split, TaskAttemptContext context, @Nullable String pathField,
+ @Nullable Schema schema) throws IOException {
Configuration jobConf = context.getConfiguration();
boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
@@ -65,6 +75,10 @@ protected RecordReader createRecordReade
return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
}
+ public boolean isSplitable(JobContext context, Path file) {
+ return false;
+ }
+
/**
* Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
*/
@@ -74,11 +88,7 @@ public static class XlsRecordReader extends RecordReader rows;
+ // Specifies the row index.
+ private long rowIdx;
+
/**
* Constructor for XlsRecordReader.
@@ -113,10 +128,35 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(file);
+ Sheet workSheet;
+ Workbook workbook;
+ boolean isStreaming = false;
+ try {
+ // Use Magic Bytes to detect the file type
+ InputStream is = FileMagic.prepareToCheckMagic(fileIn);
+ byte[] emptyFileCheck = new byte[1];
+ is.mark(emptyFileCheck.length);
+ if (is.read(emptyFileCheck) < emptyFileCheck.length) {
+ throw new EmptyFileException();
+ }
+ is.reset();
+
+ final FileMagic fm = FileMagic.valueOf(is);
+ switch (fm) {
+ case OOXML:
+ workbook = StreamingReader.builder().rowCacheSize(10).open(is);
+ isStreaming = true;
+ break;
+ case OLE2:
+ IOUtils.setByteArrayMaxOverride(EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT);
+ workbook = WorkbookFactory.create(is);
+ formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
+ formulaEvaluator.setIgnoreMissingWorkbooks(true);
+ break;
+ default:
+ throw new IOException("Can't open workbook - unsupported file type: " + fm);
+ }
- try (Workbook workbook = WorkbookFactory.create(fileIn)) {
- formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
- formulaEvaluator.setIgnoreMissingWorkbooks(true);
// Check if user wants to access with name or number
if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
@@ -127,37 +167,43 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
} catch (Exception e) {
throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
}
-
+ // As we cannot get the number of rows in a sheet while streaming.
+ // -1 is used as rowCount to indicate that all rows should be read.
+ rowCount = isStreaming ? -1 : workSheet.getPhysicalNumberOfRows();
lastRowNum = workSheet.getLastRowNum();
+ rows = workSheet.iterator();
isRowNull = false;
- rowIndex = skipFirstRow ? 1 : 0;
+ rowIdx = 0;
valueBuilder = StructuredRecord.builder(outputSchema);
+ if (skipFirstRow) {
+ Preconditions.checkArgument(rows.hasNext(), "No rows found on sheet %s", sheetValue);
+ rowIdx = 1;
+ rows.next();
+ }
}
@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
- if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
+ if (!rows.hasNext() || rowCount == 0 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
- Row row = workSheet.getRow(rowIndex);
+ Row row = rows.next();
valueBuilder = rowConverter.convert(row, outputSchema);
if (row == null || valueBuilder == null) {
isRowNull = true;
// set valueBuilder to a new builder with all fields set to null
valueBuilder = StructuredRecord.builder(outputSchema);
}
- // if all fields are null, then the row is null
- rowIndex++;
-
+ rowIdx++;
// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}
@Override
public float getProgress() {
- return (float) rowIndex / lastRowNum;
+ return (float) rowIdx / lastRowNum;
}
@Override
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java
index 27ec7c343..14ab66c5c 100644
--- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java
@@ -17,14 +17,10 @@
package io.cdap.plugin.format.xls.input;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
-import io.cdap.cdap.api.data.schema.Schema;
-import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.PluginPropertyField;
-import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.format.input.PathTrackingConfig;
import java.util.Collections;
@@ -37,7 +33,6 @@
*/
public class XlsInputFormatConfig extends PathTrackingConfig {
public static final String SHEET_NUMBER = "Sheet Number";
- private static final String NAME_OVERRIDE = "override";
private static final String NAME_SHEET = "sheet";
public static final String NAME_SHEET_VALUE = "sheetValue";
private static final String NAME_SKIP_HEADER = "skipHeader";
@@ -53,18 +48,18 @@ public class XlsInputFormatConfig extends PathTrackingConfig {
"Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " +
"'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0. Default is 'Sheet Number' 0.";
public static final String DESC_TERMINATE_ROW = "Specify whether to stop reading after " +
- "encountering the first empty row. Defaults to false.";
+ "encountering the first empty row. Defaults to false.";
public static final Map XLS_FIELDS;
static {
Map fields = new HashMap<>(FIELDS);
fields.put(NAME_SKIP_HEADER,
- new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
+ new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
// Add fields specific for excel format handling.
fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true));
fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true));
fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField(
- NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
+ NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
XLS_FIELDS = Collections.unmodifiableMap(fields);
}
@@ -102,7 +97,7 @@ public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nu
super();
this.schema = schema;
this.sheet = sheet;
- this.sheetValue = sheetValue;
+ this.sheetValue = sheetValue;
this.skipHeader = skipHeader;
this.terminateIfEmptyRow = terminateIfEmptyRow;
}
@@ -173,7 +168,7 @@ public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) {
}
public XlsInputFormatConfig build() {
- return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
+ return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
}
}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java
index bedcb48a3..e0266614e 100644
--- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.format.xls.input;
+import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
@@ -30,17 +31,21 @@
import io.cdap.plugin.format.input.PathTrackingConfig;
import io.cdap.plugin.format.input.PathTrackingInputFormatProvider;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.poi.EmptyFileException;
+import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DataFormatter;
-import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.ss.util.CellReference;
+import org.apache.poi.util.IOUtils;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -56,13 +61,13 @@ public class XlsInputFormatProvider extends PathTrackingInputFormatProvider properties) {
public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws IOException {
String blankHeader = "BLANK";
FailureCollector failureCollector = context.getFailureCollector();
- FormulaEvaluator formulaEvaluator;
+ DataFormatter formatter = new DataFormatter();
for (InputFile inputFile : inputFiles) {
- DataFormatter formatter = new DataFormatter();
- try (Workbook workbook = WorkbookFactory.create(inputFile.open())) {
- formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
- formulaEvaluator.setIgnoreMissingWorkbooks(true);
+ try (Workbook workbook = getWorkbook(inputFile.open())) {
Sheet workSheet;
// Check if user wants to access with name or number
if (conf.getSheet() != null && conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER)) {
@@ -124,7 +126,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws
} else {
if (Strings.isNullOrEmpty(conf.getSheetValue())) {
failureCollector.addFailure("Sheet name must be specified.", null)
- .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
return null;
}
workSheet = workbook.getSheet(conf.getSheetValue());
@@ -133,7 +135,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws
// If provided sheet does not exist, throw an exception
if (workSheet == null) {
failureCollector.addFailure("Sheet " + conf.getSheetValue() + " does not exist in the workbook.",
- "Specify a valid sheet.");
+ "Specify a valid sheet.");
return null;
}
@@ -145,8 +147,9 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws
int lastCellNumMax = 0;
List columnNames = new ArrayList<>();
XlsInputFormatSchemaDetector schemaDetector = new XlsInputFormatSchemaDetector();
- for (int rowIndex = rowStart; rowIndex <= rowEnd; rowIndex++) {
- Row row = workSheet.getRow(rowIndex);
+ Iterator rows = workSheet.iterator();
+ for (int rowIndex = rowStart; rowIndex <= rowEnd && rows.hasNext(); rowIndex++) {
+ Row row = rows.next();
if (row == null) {
continue;
}
@@ -156,7 +159,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws
if (rowIndex == 0 && conf.getSkipHeader()) {
for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) {
Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
- columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell, formulaEvaluator));
+ columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell));
}
// Skip Header
continue;
@@ -185,7 +188,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws
}
Schema schema = Schema.recordOf("xls", schemaDetector.getFields(
- XlsInputFormatUtils.getSafeColumnNames(columnNames)));
+ XlsInputFormatUtils.getSafeColumnNames(columnNames)));
return PathTrackingInputFormatProvider.addPathField(context.getFailureCollector(), schema, conf.getPathField());
}
}
@@ -200,12 +203,45 @@ private Integer getSheetAsNumber(FailureCollector failureCollector) {
return sheetValue;
}
failureCollector.addFailure("Sheet number must be a positive number.", null)
- .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
} catch (NumberFormatException e) {
failureCollector.addFailure("Sheet number must be a number.", null)
- .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
}
}
- return null;
+ return 0;
+ }
+
+ private Workbook getWorkbook(InputStream fileIn) {
+ Workbook workbook;
+ try {
+ // Use Magic Bytes to detect the file type
+ InputStream is = FileMagic.prepareToCheckMagic(fileIn);
+ byte[] emptyFileCheck = new byte[1];
+ is.mark(emptyFileCheck.length);
+ if (is.read(emptyFileCheck) < emptyFileCheck.length) {
+ throw new EmptyFileException();
+ }
+ is.reset();
+
+ final FileMagic fm = FileMagic.valueOf(is);
+ switch (fm) {
+ case OOXML:
+ workbook = StreamingReader.builder().rowCacheSize(10).open(is);
+ break;
+ case OLE2:
+ // workaround for large files
+ IOUtils.setByteArrayMaxOverride(XlsInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT);
+ workbook = WorkbookFactory.create(is);
+ break;
+ default:
+ throw new IOException("Can't open workbook - unsupported file type: " + fm);
+ }
+ return workbook;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e);
+ }
+
}
+
}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java
index 251d6b36f..dc8362af5 100644
--- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java
@@ -35,7 +35,7 @@ public class XlsRowConverter {
private final FormulaEvaluator evaluator;
private static final DataFormatter dataFormatter = new DataFormatter();
- XlsRowConverter(FormulaEvaluator evaluator) {
+ XlsRowConverter(@Nullable FormulaEvaluator evaluator) {
this.evaluator = evaluator;
}
@@ -59,7 +59,7 @@ public StructuredRecord.Builder convert(Row row, Schema outputSchema) {
}
Schema.Field field = fields.get(cellIndex);
Schema.Type type = field.getSchema().isNullable() ?
- field.getSchema().getNonNullable().getType() : field.getSchema().getType();
+ field.getSchema().getNonNullable().getType() : field.getSchema().getType();
Object cellValue;
switch (type) {
case STRING:
@@ -74,7 +74,7 @@ public StructuredRecord.Builder convert(Row row, Schema outputSchema) {
default:
// As we only support string, double and boolean, this should never happen.
throw new IllegalStateException(
- String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s",
+ String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s",
field.getName(), type, "string, double, boolean"));
}
if (cellValue == null) {
@@ -95,7 +95,9 @@ private CellType getCellType(Cell cell) {
try {
cellType = cell.getCachedFormulaResultType();
} catch (Exception e) {
- cellType = evaluator.evaluateFormulaCell(cell);
+ if (evaluator != null) {
+ cellType = evaluator.evaluateFormulaCell(cell);
+ }
}
}
return cellType;
@@ -119,7 +121,7 @@ private String getCellAsString(Cell cell) {
return null;
default:
throw new IllegalStateException(
- String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
}
}
@@ -139,7 +141,7 @@ private boolean getCellAsBoolean(Cell cell) {
return false;
default:
throw new IllegalStateException(
- String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
}
}
@@ -158,7 +160,7 @@ private Double getCellAsDouble(Cell cell) {
return 0.0;
default:
throw new IllegalStateException(
- String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
}
}