diff --git a/core-plugins/pom.xml b/core-plugins/pom.xml index 70644a501..c4185bbfd 100644 --- a/core-plugins/pom.xml +++ b/core-plugins/pom.xml @@ -173,15 +173,35 @@ ${hadoop.version} test + + commons-io + commons-io + 2.15.0 + + + org.apache.commons + commons-compress + 1.26.0 + org.apache.poi poi - 3.12 + 5.2.4 org.apache.poi poi-ooxml - 3.11 + 5.2.4 + + + com.github.pjfanning + excel-streaming-reader + 4.2.1 + + + com.github.pjfanning + poi-shared-strings + 2.8.0 test diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java index c8accfb18..d5f0ef5d1 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java @@ -16,6 +16,7 @@ package io.cdap.plugin.batch.source; +import com.github.pjfanning.xlsx.StreamingReader; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; @@ -26,12 +27,15 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.poi.hssf.usermodel.HSSFDateUtil; +import org.apache.poi.EmptyFileException; +import org.apache.poi.poifs.filesystem.FileMagic; import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DateUtil; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; @@ -39,6 +43,7 @@ import org.apache.poi.ss.util.CellReference; import java.io.IOException; +import java.io.InputStream; import java.util.Iterator; @@ -67,6 +72,11 @@ public RecordReader createRecordReader(InputSplit split, Tas return new ExcelRecordReader(); } + @Override + public boolean isSplitable(JobContext context, Path file) { + return false; + } + public static void setConfigurations(Job job, String filePattern, String sheet, boolean reprocess, String sheetValue, String columnList, boolean skipFirstRow, String terminateIfEmptyRow, String rowLimit, String ifErrorRecord, @@ -145,9 +155,31 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro String sheet = job.get(SHEET); String sheetValue = job.get(SHEET_VALUE); - Sheet workSheet; // sheet can be used as common for XSSF and HSSF workbook + Sheet workSheet; + Workbook workbook; + boolean isStreaming = false; try { - Workbook workbook = WorkbookFactory.create(fileIn); + // Use Magic Bytes to detect the file type + InputStream is = FileMagic.prepareToCheckMagic(fileIn); + byte[] emptyFileCheck = new byte[1]; + is.mark(emptyFileCheck.length); + if (is.read(emptyFileCheck) < emptyFileCheck.length) { + throw new EmptyFileException(); + } + is.reset(); + + final FileMagic fm = FileMagic.valueOf(is); + switch (fm) { + case OOXML: + workbook = StreamingReader.builder().rowCacheSize(10).open(is); + isStreaming = true; + break; + case OLE2: + workbook = WorkbookFactory.create(is); + break; + default: + throw new IOException("Can't open workbook - unsupported file type: " + fm); + } if (sheet.equalsIgnoreCase(SHEET_NAME)) { workSheet = workbook.getSheet(sheetValue); } else { @@ -157,7 +189,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e); } - rowCount = job.getInt(ROWS_LIMIT, workSheet.getPhysicalNumberOfRows()); + // As we cannot get the number of rows in a sheet while streaming. + // -1 is used as rowCount to indicate that all rows should be read. + rowCount = job.getInt(ROWS_LIMIT, isStreaming ? -1 : workSheet.getPhysicalNumberOfRows()); rows = workSheet.iterator(); lastRowNum = workSheet.getLastRowNum(); rowIdx = 0; @@ -171,7 +205,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro } @Override - public boolean nextKeyValue() throws IOException, InterruptedException { + public boolean nextKeyValue() { if (!rows.hasNext() || rowCount == 0) { return false; } @@ -200,18 +234,18 @@ public boolean nextKeyValue() throws IOException, InterruptedException { Cell cell = cellIterator.next(); String colName = CellReference.convertNumToColString(cell.getColumnIndex()); switch (cell.getCellType()) { - case Cell.CELL_TYPE_STRING: + case STRING: sb.append(colName) .append(COLUMN_SEPERATOR).append(cell.getStringCellValue()).append(CELL_SEPERATOR); break; - case Cell.CELL_TYPE_BOOLEAN: + case BOOLEAN: sb.append(colName) .append(COLUMN_SEPERATOR).append(cell.getBooleanCellValue()).append(CELL_SEPERATOR); break; - case Cell.CELL_TYPE_NUMERIC: - if (HSSFDateUtil.isCellDateFormatted(cell)) { + case NUMERIC: + if (DateUtil.isCellDateFormatted(cell)) { sb.append(colName).append(COLUMN_SEPERATOR).append(cell.getDateCellValue()).append(CELL_SEPERATOR); } else { sb.append(colName)