diff --git a/core-plugins/pom.xml b/core-plugins/pom.xml index 28b400790..3837a86ba 100644 --- a/core-plugins/pom.xml +++ b/core-plugins/pom.xml @@ -173,15 +173,35 @@ ${hadoop.version} test + + commons-io + commons-io + 2.15.0 + + + org.apache.commons + commons-compress + 1.26.0 + org.apache.poi poi - 3.12 + 5.2.5 org.apache.poi poi-ooxml - 3.11 + 5.2.5 + + + com.github.pjfanning + excel-streaming-reader + 4.2.1 + + + com.github.pjfanning + poi-shared-strings + 2.8.0 test diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java index c8accfb18..a7e46a30f 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java @@ -16,6 +16,7 @@ package io.cdap.plugin.batch.source; +import com.github.pjfanning.xlsx.StreamingReader; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; @@ -26,19 +27,24 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.poi.hssf.usermodel.HSSFDateUtil; +import org.apache.poi.EmptyFileException; +import org.apache.poi.poifs.filesystem.FileMagic; import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DateUtil; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.WorkbookFactory; import org.apache.poi.ss.util.CellReference; +import org.apache.poi.util.IOUtils; import java.io.IOException; +import java.io.InputStream; import java.util.Iterator; @@ -61,16 +67,23 @@ public class ExcelInputFormat extends TextInputFormat { public static final String FILE_PATTERN = "filePattern"; public static final String SHEET = "sheet"; public static final String SHEET_VALUE = "sheetValue"; + public static final String EXCEL_BYTE_ARRAY_MAX_OVERRIDE = "excel.byteArrayMaxOverride"; + public static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2; @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new ExcelRecordReader(); } + @Override + public boolean isSplitable(JobContext context, Path file) { + return false; + } + public static void setConfigurations(Job job, String filePattern, String sheet, boolean reprocess, String sheetValue, String columnList, boolean skipFirstRow, String terminateIfEmptyRow, String rowLimit, String ifErrorRecord, - String processedFiles) { + String processedFiles, int byteArrayMaxOverride) { Configuration configuration = job.getConfiguration(); configuration.set(FILE_PATTERN, filePattern); @@ -90,6 +103,7 @@ public static void setConfigurations(Job job, String filePattern, String sheet, configuration.set(IF_ERROR_RECORD, ifErrorRecord); configuration.set(PROCESSED_FILES, processedFiles); + configuration.set(EXCEL_BYTE_ARRAY_MAX_OVERRIDE, String.valueOf(byteArrayMaxOverride)); } @@ -145,9 +159,34 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro String sheet = job.get(SHEET); String sheetValue = job.get(SHEET_VALUE); - Sheet workSheet; // sheet can be used as common for XSSF and HSSF workbook + Sheet workSheet; + Workbook workbook; + boolean isStreaming = false; try { - Workbook workbook = WorkbookFactory.create(fileIn); + // Use Magic Bytes to detect the file type + InputStream is = FileMagic.prepareToCheckMagic(fileIn); + byte[] emptyFileCheck = new byte[1]; + is.mark(emptyFileCheck.length); + if (is.read(emptyFileCheck) < emptyFileCheck.length) { + throw new EmptyFileException(); + } + is.reset(); + + final FileMagic fm = FileMagic.valueOf(is); + switch (fm) { + case OOXML: + workbook = StreamingReader.builder().rowCacheSize(10).open(is); + isStreaming = true; + break; + case OLE2: + // workaround for large files + IOUtils.setByteArrayMaxOverride(job.getInt(EXCEL_BYTE_ARRAY_MAX_OVERRIDE, + ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT)); + workbook = WorkbookFactory.create(is); + break; + default: + throw new IOException("Can't open workbook - unsupported file type: " + fm); + } if (sheet.equalsIgnoreCase(SHEET_NAME)) { workSheet = workbook.getSheet(sheetValue); } else { @@ -157,7 +196,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e); } - rowCount = job.getInt(ROWS_LIMIT, workSheet.getPhysicalNumberOfRows()); + // As we cannot get the number of rows in a sheet while streaming. + // -1 is used as rowCount to indicate that all rows should be read. + rowCount = job.getInt(ROWS_LIMIT, isStreaming ? -1 : workSheet.getPhysicalNumberOfRows()); rows = workSheet.iterator(); lastRowNum = workSheet.getLastRowNum(); rowIdx = 0; @@ -171,7 +212,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro } @Override - public boolean nextKeyValue() throws IOException, InterruptedException { + public boolean nextKeyValue() { if (!rows.hasNext() || rowCount == 0) { return false; } @@ -200,18 +241,18 @@ public boolean nextKeyValue() throws IOException, InterruptedException { Cell cell = cellIterator.next(); String colName = CellReference.convertNumToColString(cell.getColumnIndex()); switch (cell.getCellType()) { - case Cell.CELL_TYPE_STRING: + case STRING: sb.append(colName) .append(COLUMN_SEPERATOR).append(cell.getStringCellValue()).append(CELL_SEPERATOR); break; - case Cell.CELL_TYPE_BOOLEAN: + case BOOLEAN: sb.append(colName) .append(COLUMN_SEPERATOR).append(cell.getBooleanCellValue()).append(CELL_SEPERATOR); break; - case Cell.CELL_TYPE_NUMERIC: - if (HSSFDateUtil.isCellDateFormatted(cell)) { + case NUMERIC: + if (DateUtil.isCellDateFormatted(cell)) { sb.append(colName).append(COLUMN_SEPERATOR).append(cell.getDateCellValue()).append(CELL_SEPERATOR); } else { sb.append(colName) diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputReader.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputReader.java index e6e549bf0..38157d85e 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputReader.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputReader.java @@ -318,11 +318,16 @@ public void prepareRun(BatchSourceContext batchSourceContext) throws Exception { processFiles = GSON.toJson(getAllProcessedFiles(batchSourceContext), ARRAYLIST_PREPROCESSED_FILES); } + Map arguments = new HashMap<>(batchSourceContext.getArguments().asMap()); + int byteArrayMaxOverride = arguments.containsKey(ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE) ? + Integer.parseInt(arguments.get(ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE)) : + ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT; + ExcelInputFormat.setConfigurations(job, excelInputreaderConfig.filePattern, excelInputreaderConfig.sheet, excelInputreaderConfig.reprocess, excelInputreaderConfig.sheetValue, excelInputreaderConfig.columnList, excelInputreaderConfig.skipFirstRow, excelInputreaderConfig.terminateIfEmptyRow, excelInputreaderConfig.rowsLimit, - excelInputreaderConfig.ifErrorRecord, processFiles); + excelInputreaderConfig.ifErrorRecord, processFiles, byteArrayMaxOverride); // Sets the input path(s). ExcelInputFormat.addInputPaths(job, excelInputreaderConfig.filePath);