Skip to content

Commit

Permalink
Merge pull request #1853 from cloudsufi/cherry-pick-excel-streaming
Browse files Browse the repository at this point in the history
[🍒][PLUGIN-1771] Add Streaming support for excel source
  • Loading branch information
psainics authored Apr 12, 2024
2 parents eb91515 + 09cfff0 commit e30e037
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
24 changes: 22 additions & 2 deletions core-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,35 @@
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.12</version>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.11</version>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>poi-shared-strings</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.batch.source;

import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -26,19 +27,23 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.hssf.usermodel.HSSFDateUtil;
import org.apache.poi.EmptyFileException;
import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.ss.util.CellReference;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;


Expand Down Expand Up @@ -67,6 +72,11 @@ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, Tas
return new ExcelRecordReader();
}

@Override
public boolean isSplitable(JobContext context, Path file) {
return false;
}

public static void setConfigurations(Job job, String filePattern, String sheet, boolean reprocess,
String sheetValue, String columnList, boolean skipFirstRow,
String terminateIfEmptyRow, String rowLimit, String ifErrorRecord,
Expand Down Expand Up @@ -145,9 +155,31 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
String sheet = job.get(SHEET);
String sheetValue = job.get(SHEET_VALUE);

Sheet workSheet; // sheet can be used as common for XSSF and HSSF workbook
Sheet workSheet;
Workbook workbook;
boolean isStreaming = false;
try {
Workbook workbook = WorkbookFactory.create(fileIn);
// Use Magic Bytes to detect the file type
InputStream is = FileMagic.prepareToCheckMagic(fileIn);
byte[] emptyFileCheck = new byte[1];
is.mark(emptyFileCheck.length);
if (is.read(emptyFileCheck) < emptyFileCheck.length) {
throw new EmptyFileException();
}
is.reset();

final FileMagic fm = FileMagic.valueOf(is);
switch (fm) {
case OOXML:
workbook = StreamingReader.builder().rowCacheSize(10).open(is);
isStreaming = true;
break;
case OLE2:
workbook = WorkbookFactory.create(is);
break;
default:
throw new IOException("Can't open workbook - unsupported file type: " + fm);
}
if (sheet.equalsIgnoreCase(SHEET_NAME)) {
workSheet = workbook.getSheet(sheetValue);
} else {
Expand All @@ -157,7 +189,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e);
}

rowCount = job.getInt(ROWS_LIMIT, workSheet.getPhysicalNumberOfRows());
// As we cannot get the number of rows in a sheet while streaming.
// -1 is used as rowCount to indicate that all rows should be read.
rowCount = job.getInt(ROWS_LIMIT, isStreaming ? -1 : workSheet.getPhysicalNumberOfRows());
rows = workSheet.iterator();
lastRowNum = workSheet.getLastRowNum();
rowIdx = 0;
Expand All @@ -171,7 +205,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() {
if (!rows.hasNext() || rowCount == 0) {
return false;
}
Expand Down Expand Up @@ -200,18 +234,18 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
Cell cell = cellIterator.next();
String colName = CellReference.convertNumToColString(cell.getColumnIndex());
switch (cell.getCellType()) {
case Cell.CELL_TYPE_STRING:
case STRING:
sb.append(colName)
.append(COLUMN_SEPERATOR).append(cell.getStringCellValue()).append(CELL_SEPERATOR);
break;

case Cell.CELL_TYPE_BOOLEAN:
case BOOLEAN:
sb.append(colName)
.append(COLUMN_SEPERATOR).append(cell.getBooleanCellValue()).append(CELL_SEPERATOR);
break;

case Cell.CELL_TYPE_NUMERIC:
if (HSSFDateUtil.isCellDateFormatted(cell)) {
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
sb.append(colName).append(COLUMN_SEPERATOR).append(cell.getDateCellValue()).append(CELL_SEPERATOR);
} else {
sb.append(colName)
Expand Down

0 comments on commit e30e037

Please sign in to comment.