Skip to content

Commit

Permalink
Added support to decompress based on the file extension. (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder authored Apr 21, 2018
1 parent cb67558 commit cac32a7
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>opencsv</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.16.1</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -233,6 +234,14 @@ File findNextInputFile() {
return result;
}

static final Map<String, String> SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of(
"bz2", CompressorStreamFactory.BZIP2,
"gz", CompressorStreamFactory.GZIP,
"snappy", CompressorStreamFactory.SNAPPY_RAW,
"lz4", CompressorStreamFactory.LZ4_BLOCK,
"z", CompressorStreamFactory.Z
);

public List<SourceRecord> read() {
try {
if (!hasRecords) {
Expand Down Expand Up @@ -261,7 +270,19 @@ public List<SourceRecord> read() {
Number number = (Number) offset.get("offset");
lastOffset = number.longValue();
}
this.inputStream = new FileInputStream(this.inputFile);

final String extension = Files.getFileExtension(inputFile.getName());
log.trace("read() - fileName = '{}' extension = '{}'", inputFile, extension);
final InputStream inputStream = new FileInputStream(this.inputFile);

if (SUPPORTED_COMPRESSION_TYPES.containsKey(extension)) {
final String compressor = SUPPORTED_COMPRESSION_TYPES.get(extension);
log.info("Decompressing {} as {}", inputFile, compressor);
final CompressorStreamFactory compressorStreamFactory = new CompressorStreamFactory();
this.inputStream = compressorStreamFactory.createCompressorInputStream(compressor, inputStream);
} else {
this.inputStream = inputStream;
}
configure(this.inputStream, this.metadata, lastOffset);
} catch (Exception ex) {
throw new ConnectException(ex);
Expand Down

0 comments on commit cac32a7

Please sign in to comment.