Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes done for adding streaming support in XLSX module. #1890

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions format-xls/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<name>XLS format plugins</name>
<packaging>jar</packaging>
<properties>
<poi.version>5.2.4</poi.version>
<poi.version>5.2.5</poi.version>
<log4j-core.version>2.20.0</log4j-core.version>
</properties>

Expand All @@ -45,7 +45,7 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>${log4j-core.version}</version>
<version>${log4j-core.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
Expand All @@ -64,12 +64,30 @@
<artifactId>format-common</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>poi-shared-strings</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.cdap.plugin.format.xls.input;


import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
Expand All @@ -25,17 +28,23 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.EmptyFileException;
import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.util.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;


Expand All @@ -51,11 +60,12 @@ public class XlsInputFormat extends PathTrackingInputFormat {
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
protected static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2;

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
Configuration jobConf = context.getConfiguration();
boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
Expand All @@ -65,6 +75,10 @@ protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReade
return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
}

public boolean isSplitable(JobContext context, Path file) {
return false;
}

/**
* Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
*/
Expand All @@ -74,11 +88,7 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
FormulaEvaluator formulaEvaluator;
// Builder for building structured record
private StructuredRecord.Builder valueBuilder;
private Sheet workSheet;
// InputStream handler for Excel files.
private FSDataInputStream fileIn;
// Specifies the row index.
private int rowIndex;
// Specifies last row num.
private int lastRowNum;
private boolean isRowNull;
Expand All @@ -87,6 +97,11 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
private final Schema outputSchema;
private final boolean terminateIfEmptyRow;
private final boolean skipFirstRow;
private int rowCount;
private Iterator<Row> rows;
// Specifies the row index.
private long rowIdx;


/**
* Constructor for XlsRecordReader.
Expand All @@ -113,10 +128,35 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(file);
Sheet workSheet;
Workbook workbook;
boolean isStreaming = false;
try {
// Use Magic Bytes to detect the file type
InputStream is = FileMagic.prepareToCheckMagic(fileIn);
byte[] emptyFileCheck = new byte[1];
is.mark(emptyFileCheck.length);
if (is.read(emptyFileCheck) < emptyFileCheck.length) {
throw new EmptyFileException();
}
is.reset();

final FileMagic fm = FileMagic.valueOf(is);
switch (fm) {
case OOXML:
workbook = StreamingReader.builder().rowCacheSize(10).open(is);
isStreaming = true;
break;
case OLE2:
IOUtils.setByteArrayMaxOverride(EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT);
workbook = WorkbookFactory.create(is);
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
break;
default:
throw new IOException("Can't open workbook - unsupported file type: " + fm);
}

try (Workbook workbook = WorkbookFactory.create(fileIn)) {
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
// Check if user wants to access with name or number
if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
Expand All @@ -127,37 +167,43 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
} catch (Exception e) {
throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
}

// As we cannot get the number of rows in a sheet while streaming.
// -1 is used as rowCount to indicate that all rows should be read.
rowCount = isStreaming ? -1 : workSheet.getPhysicalNumberOfRows();
lastRowNum = workSheet.getLastRowNum();
rows = workSheet.iterator();
isRowNull = false;
rowIndex = skipFirstRow ? 1 : 0;
rowIdx = 0;
valueBuilder = StructuredRecord.builder(outputSchema);
if (skipFirstRow) {
Preconditions.checkArgument(rows.hasNext(), "No rows found on sheet %s", sheetValue);
rowIdx = 1;
rows.next();
}
}

@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
if (!rows.hasNext() || rowCount == 0 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
Row row = workSheet.getRow(rowIndex);
Row row = rows.next();
valueBuilder = rowConverter.convert(row, outputSchema);
if (row == null || valueBuilder == null) {
isRowNull = true;
// set valueBuilder to a new builder with all fields set to null
valueBuilder = StructuredRecord.builder(outputSchema);
}
// if all fields are null, then the row is null
rowIndex++;

rowIdx++;
// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}

@Override
public float getProgress() {
return (float) rowIndex / lastRowNum;
return (float) rowIdx / lastRowNum;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package io.cdap.plugin.format.xls.input;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.PluginPropertyField;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.format.input.PathTrackingConfig;

import java.util.Collections;
Expand All @@ -37,7 +33,6 @@
*/
public class XlsInputFormatConfig extends PathTrackingConfig {
public static final String SHEET_NUMBER = "Sheet Number";
private static final String NAME_OVERRIDE = "override";
private static final String NAME_SHEET = "sheet";
public static final String NAME_SHEET_VALUE = "sheetValue";
private static final String NAME_SKIP_HEADER = "skipHeader";
Expand All @@ -53,18 +48,18 @@ public class XlsInputFormatConfig extends PathTrackingConfig {
"Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " +
"'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0. Default is 'Sheet Number' 0.";
public static final String DESC_TERMINATE_ROW = "Specify whether to stop reading after " +
"encountering the first empty row. Defaults to false.";
"encountering the first empty row. Defaults to false.";
public static final Map<String, PluginPropertyField> XLS_FIELDS;

static {
Map<String, PluginPropertyField> fields = new HashMap<>(FIELDS);
fields.put(NAME_SKIP_HEADER,
new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
// Add fields specific for excel format handling.
fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true));
fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true));
fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField(
NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
XLS_FIELDS = Collections.unmodifiableMap(fields);
}

Expand Down Expand Up @@ -102,7 +97,7 @@ public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nu
super();
this.schema = schema;
this.sheet = sheet;
this.sheetValue = sheetValue;
this.sheetValue = sheetValue;
this.skipHeader = skipHeader;
this.terminateIfEmptyRow = terminateIfEmptyRow;
}
Expand Down Expand Up @@ -173,7 +168,7 @@ public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) {
}

public XlsInputFormatConfig build() {
return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
}
}

Expand Down
Loading
Loading