From c2afeee2074cbe8007c3f1680271dee9428407ae Mon Sep 17 00:00:00 2001 From: vikasrathee-cs Date: Thu, 26 Sep 2024 22:24:35 +0530 Subject: [PATCH] changes done for adding streaming support in XLSX module. --- format-xls/pom.xml | 26 +++++- .../format/xls/input/XlsInputFormat.java | 80 +++++++++++++---- .../xls/input/XlsInputFormatConfig.java | 15 ++-- .../xls/input/XlsInputFormatProvider.java | 86 +++++++++++++------ .../format/xls/input/XlsRowConverter.java | 16 ++-- 5 files changed, 160 insertions(+), 63 deletions(-) diff --git a/format-xls/pom.xml b/format-xls/pom.xml index 77358cfdb..227f20fe5 100644 --- a/format-xls/pom.xml +++ b/format-xls/pom.xml @@ -26,7 +26,7 @@ XLS format plugins jar - 5.2.4 + 5.2.5 2.20.0 @@ -45,7 +45,7 @@ org.apache.logging.log4j log4j-core compile - ${log4j-core.version} + ${log4j-core.version} io.cdap.cdap @@ -64,12 +64,30 @@ format-common ${project.version} - - junit junit + + commons-io + commons-io + 2.15.0 + + + org.apache.commons + commons-compress + 1.26.0 + + + com.github.pjfanning + excel-streaming-reader + 4.2.1 + + + com.github.pjfanning + poi-shared-strings + 2.8.0 + diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java index 0b79787f6..18ee5c4a8 100644 --- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java @@ -16,6 +16,9 @@ package io.cdap.plugin.format.xls.input; + +import com.github.pjfanning.xlsx.StreamingReader; +import com.google.common.base.Preconditions; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.format.input.PathTrackingInputFormat; @@ -25,17 +28,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.poi.EmptyFileException; +import org.apache.poi.poifs.filesystem.FileMagic; import org.apache.poi.ss.usermodel.FormulaEvaluator; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.WorkbookFactory; +import org.apache.poi.util.IOUtils; import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; import javax.annotation.Nullable; @@ -51,11 +60,12 @@ public class XlsInputFormat extends PathTrackingInputFormat { public static final String SHEET_VALUE = "sheetValue"; public static final String NAME_SKIP_HEADER = "skipHeader"; public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow"; + protected static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2; @Override protected RecordReader createRecordReader( - FileSplit split, TaskAttemptContext context, @Nullable String pathField, - @Nullable Schema schema) throws IOException { + FileSplit split, TaskAttemptContext context, @Nullable String pathField, + @Nullable Schema schema) throws IOException { Configuration jobConf = context.getConfiguration(); boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false); boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false); @@ -65,6 +75,10 @@ protected RecordReader createRecordReade return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow); } + public boolean isSplitable(JobContext context, Path file) { + return false; + } + /** * Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record. */ @@ -74,11 +88,7 @@ public static class XlsRecordReader extends RecordReader rows; + // Specifies the row index. + private long rowIdx; + /** * Constructor for XlsRecordReader. @@ -113,10 +128,35 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(jobConf); fileIn = fs.open(file); + Sheet workSheet; + Workbook workbook; + boolean isStreaming = false; + try { + // Use Magic Bytes to detect the file type + InputStream is = FileMagic.prepareToCheckMagic(fileIn); + byte[] emptyFileCheck = new byte[1]; + is.mark(emptyFileCheck.length); + if (is.read(emptyFileCheck) < emptyFileCheck.length) { + throw new EmptyFileException(); + } + is.reset(); + + final FileMagic fm = FileMagic.valueOf(is); + switch (fm) { + case OOXML: + workbook = StreamingReader.builder().rowCacheSize(10).open(is); + isStreaming = true; + break; + case OLE2: + IOUtils.setByteArrayMaxOverride(EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT); + workbook = WorkbookFactory.create(is); + formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); + formulaEvaluator.setIgnoreMissingWorkbooks(true); + break; + default: + throw new IOException("Can't open workbook - unsupported file type: " + fm); + } - try (Workbook workbook = WorkbookFactory.create(fileIn)) { - formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); - formulaEvaluator.setIgnoreMissingWorkbooks(true); // Check if user wants to access with name or number if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) { workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue)); @@ -127,37 +167,43 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx } catch (Exception e) { throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e); } - + // As we cannot get the number of rows in a sheet while streaming. + // -1 is used as rowCount to indicate that all rows should be read. + rowCount = isStreaming ? -1 : workSheet.getPhysicalNumberOfRows(); lastRowNum = workSheet.getLastRowNum(); + rows = workSheet.iterator(); isRowNull = false; - rowIndex = skipFirstRow ? 1 : 0; + rowIdx = 0; valueBuilder = StructuredRecord.builder(outputSchema); + if (skipFirstRow) { + Preconditions.checkArgument(rows.hasNext(), "No rows found on sheet %s", sheetValue); + rowIdx = 1; + rows.next(); + } } @Override public boolean nextKeyValue() { // If any is true, then we stop processing. - if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) { + if (!rows.hasNext() || rowCount == 0 || (isRowNull && terminateIfEmptyRow)) { return false; } // Get the next row. - Row row = workSheet.getRow(rowIndex); + Row row = rows.next(); valueBuilder = rowConverter.convert(row, outputSchema); if (row == null || valueBuilder == null) { isRowNull = true; // set valueBuilder to a new builder with all fields set to null valueBuilder = StructuredRecord.builder(outputSchema); } - // if all fields are null, then the row is null - rowIndex++; - + rowIdx++; // Stop processing if the row is null and terminateIfEmptyRow is true. return !isRowNull || !terminateIfEmptyRow; } @Override public float getProgress() { - return (float) rowIndex / lastRowNum; + return (float) rowIdx / lastRowNum; } @Override diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java index 27ec7c343..14ab66c5c 100644 --- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java @@ -17,14 +17,10 @@ package io.cdap.plugin.format.xls.input; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; -import io.cdap.cdap.api.data.schema.Schema; -import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.api.plugin.PluginPropertyField; -import io.cdap.plugin.common.KeyValueListParser; import io.cdap.plugin.format.input.PathTrackingConfig; import java.util.Collections; @@ -37,7 +33,6 @@ */ public class XlsInputFormatConfig extends PathTrackingConfig { public static final String SHEET_NUMBER = "Sheet Number"; - private static final String NAME_OVERRIDE = "override"; private static final String NAME_SHEET = "sheet"; public static final String NAME_SHEET_VALUE = "sheetValue"; private static final String NAME_SKIP_HEADER = "skipHeader"; @@ -53,18 +48,18 @@ public class XlsInputFormatConfig extends PathTrackingConfig { "Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " + "'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0. Default is 'Sheet Number' 0."; public static final String DESC_TERMINATE_ROW = "Specify whether to stop reading after " + - "encountering the first empty row. Defaults to false."; + "encountering the first empty row. Defaults to false."; public static final Map XLS_FIELDS; static { Map fields = new HashMap<>(FIELDS); fields.put(NAME_SKIP_HEADER, - new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true)); + new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true)); // Add fields specific for excel format handling. fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true)); fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true)); fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField( - NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true)); + NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true)); XLS_FIELDS = Collections.unmodifiableMap(fields); } @@ -102,7 +97,7 @@ public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nu super(); this.schema = schema; this.sheet = sheet; - this.sheetValue = sheetValue; + this.sheetValue = sheetValue; this.skipHeader = skipHeader; this.terminateIfEmptyRow = terminateIfEmptyRow; } @@ -173,7 +168,7 @@ public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) { } public XlsInputFormatConfig build() { - return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow); + return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow); } } diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java index bedcb48a3..e0266614e 100644 --- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java @@ -16,6 +16,7 @@ package io.cdap.plugin.format.xls.input; +import com.github.pjfanning.xlsx.StreamingReader; import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -30,17 +31,21 @@ import io.cdap.plugin.format.input.PathTrackingConfig; import io.cdap.plugin.format.input.PathTrackingInputFormatProvider; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.poi.EmptyFileException; +import org.apache.poi.poifs.filesystem.FileMagic; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.DataFormatter; -import org.apache.poi.ss.usermodel.FormulaEvaluator; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.WorkbookFactory; import org.apache.poi.ss.util.CellReference; +import org.apache.poi.util.IOUtils; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -56,13 +61,13 @@ public class XlsInputFormatProvider extends PathTrackingInputFormatProvider properties) { public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws IOException { String blankHeader = "BLANK"; FailureCollector failureCollector = context.getFailureCollector(); - FormulaEvaluator formulaEvaluator; + DataFormatter formatter = new DataFormatter(); for (InputFile inputFile : inputFiles) { - DataFormatter formatter = new DataFormatter(); - try (Workbook workbook = WorkbookFactory.create(inputFile.open())) { - formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); - formulaEvaluator.setIgnoreMissingWorkbooks(true); + try (Workbook workbook = getWorkbook(inputFile.open())) { Sheet workSheet; // Check if user wants to access with name or number if (conf.getSheet() != null && conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER)) { @@ -124,7 +126,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws } else { if (Strings.isNullOrEmpty(conf.getSheetValue())) { failureCollector.addFailure("Sheet name must be specified.", null) - .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); + .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); return null; } workSheet = workbook.getSheet(conf.getSheetValue()); @@ -133,7 +135,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws // If provided sheet does not exist, throw an exception if (workSheet == null) { failureCollector.addFailure("Sheet " + conf.getSheetValue() + " does not exist in the workbook.", - "Specify a valid sheet."); + "Specify a valid sheet."); return null; } @@ -145,8 +147,9 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws int lastCellNumMax = 0; List columnNames = new ArrayList<>(); XlsInputFormatSchemaDetector schemaDetector = new XlsInputFormatSchemaDetector(); - for (int rowIndex = rowStart; rowIndex <= rowEnd; rowIndex++) { - Row row = workSheet.getRow(rowIndex); + Iterator rows = workSheet.iterator(); + for (int rowIndex = rowStart; rowIndex <= rowEnd && rows.hasNext(); rowIndex++) { + Row row = rows.next(); if (row == null) { continue; } @@ -156,7 +159,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws if (rowIndex == 0 && conf.getSkipHeader()) { for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) { Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL); - columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell, formulaEvaluator)); + columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell)); } // Skip Header continue; @@ -185,7 +188,7 @@ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws } Schema schema = Schema.recordOf("xls", schemaDetector.getFields( - XlsInputFormatUtils.getSafeColumnNames(columnNames))); + XlsInputFormatUtils.getSafeColumnNames(columnNames))); return PathTrackingInputFormatProvider.addPathField(context.getFailureCollector(), schema, conf.getPathField()); } } @@ -200,12 +203,45 @@ private Integer getSheetAsNumber(FailureCollector failureCollector) { return sheetValue; } failureCollector.addFailure("Sheet number must be a positive number.", null) - .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); + .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); } catch (NumberFormatException e) { failureCollector.addFailure("Sheet number must be a number.", null) - .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); + .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); } } - return null; + return 0; + } + + private Workbook getWorkbook(InputStream fileIn) { + Workbook workbook; + try { + // Use Magic Bytes to detect the file type + InputStream is = FileMagic.prepareToCheckMagic(fileIn); + byte[] emptyFileCheck = new byte[1]; + is.mark(emptyFileCheck.length); + if (is.read(emptyFileCheck) < emptyFileCheck.length) { + throw new EmptyFileException(); + } + is.reset(); + + final FileMagic fm = FileMagic.valueOf(is); + switch (fm) { + case OOXML: + workbook = StreamingReader.builder().rowCacheSize(10).open(is); + break; + case OLE2: + // workaround for large files + IOUtils.setByteArrayMaxOverride(XlsInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT); + workbook = WorkbookFactory.create(is); + break; + default: + throw new IOException("Can't open workbook - unsupported file type: " + fm); + } + return workbook; + } catch (Exception e) { + throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e); + } + } + } diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java index 251d6b36f..dc8362af5 100644 --- a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java @@ -35,7 +35,7 @@ public class XlsRowConverter { private final FormulaEvaluator evaluator; private static final DataFormatter dataFormatter = new DataFormatter(); - XlsRowConverter(FormulaEvaluator evaluator) { + XlsRowConverter(@Nullable FormulaEvaluator evaluator) { this.evaluator = evaluator; } @@ -59,7 +59,7 @@ public StructuredRecord.Builder convert(Row row, Schema outputSchema) { } Schema.Field field = fields.get(cellIndex); Schema.Type type = field.getSchema().isNullable() ? - field.getSchema().getNonNullable().getType() : field.getSchema().getType(); + field.getSchema().getNonNullable().getType() : field.getSchema().getType(); Object cellValue; switch (type) { case STRING: @@ -74,7 +74,7 @@ public StructuredRecord.Builder convert(Row row, Schema outputSchema) { default: // As we only support string, double and boolean, this should never happen. throw new IllegalStateException( - String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s", + String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s", field.getName(), type, "string, double, boolean")); } if (cellValue == null) { @@ -95,7 +95,9 @@ private CellType getCellType(Cell cell) { try { cellType = cell.getCachedFormulaResultType(); } catch (Exception e) { - cellType = evaluator.evaluateFormulaCell(cell); + if (evaluator != null) { + cellType = evaluator.evaluateFormulaCell(cell); + } } } return cellType; @@ -119,7 +121,7 @@ private String getCellAsString(Cell cell) { return null; default: throw new IllegalStateException( - String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); + String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); } } @@ -139,7 +141,7 @@ private boolean getCellAsBoolean(Cell cell) { return false; default: throw new IllegalStateException( - String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); + String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); } } @@ -158,7 +160,7 @@ private Double getCellAsDouble(Cell cell) { return 0.0; default: throw new IllegalStateException( - String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); + String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); } }