Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Add compression support to BSONFileInputFormat. #82

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;

import org.bson.*;

import java.io.DataInputStream;
Expand Down Expand Up @@ -65,22 +71,41 @@ public class BSONFileRecordReader extends RecordReader<NullWritable, BSONObject>
private Object key;
private BSONObject value;
byte[] headerBuf = new byte[4];
private FSDataInputStream in;
private InputStream in;
private int numDocsRead = 0;
private boolean finished = false;

private BSONCallback callback;
private BSONDecoder decoder;

private CompressionCodecFactory compressionCodecs = null;
private CompressionCodec codec;
private Decompressor decompressor;
private Seekable filePosition;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = context.getConfiguration();
log.info("reading split " + this.fileSplit.toString());
Path file = fileSplit.getPath();
compressionCodecs = new CompressionCodecFactory(this.conf);
codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file, 16*1024*1024);
in.seek(fileSplit.getStart());
FSDataInputStream fileIn = fs.open(file, 16*1024*1024);
if (codec == null) {
log.info("reading split " + this.fileSplit.toString());
fileIn.seek(fileSplit.getStart());
in = fileIn;
} else {
if (fileSplit.getStart() > 0) {
throw new IOException("File is not seekable but start of split is non-zero");
}
decompressor = CodecPool.getDecompressor(codec);
in = codec.createInputStream(fileIn, decompressor);
log.info("reading compressed split " + this.fileSplit.toString());
// start is ignored. as the file is not really seekable.
// but then the splits should be starting at 0
}
filePosition = fileIn;

if (MongoConfigUtil.getLazyBSON(this.conf)) {
callback = new LazyBSONCallback();
Expand All @@ -94,7 +119,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try{
if(in.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()){
if (filePosition.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()
&& (codec == null || in.available() == 0)) {
try{
this.close();
}catch(Exception e){
Expand All @@ -108,7 +134,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
value = (BSONObject) callback.get();
numDocsRead++;
if(numDocsRead % 10000 == 0){
log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + in.getPos());
log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + filePosition.getPos());
}
return true;
}catch(Exception e){
Expand Down Expand Up @@ -138,7 +164,7 @@ public float getProgress() throws IOException, InterruptedException {
if(this.finished)
return 1f;
if(in != null)
return new Float(in.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength();
return new Float(filePosition.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength();
return 0f;
}

Expand All @@ -148,6 +174,9 @@ public void close() throws IOException {
if(this.in != null){
in.close();
}
if (codec != null){
((FSDataInputStream)this.filePosition).close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.bson.*;

import java.io.DataInputStream;
Expand All @@ -49,22 +54,44 @@ public class BSONFileRecordReader implements RecordReader<NullWritable, BSONWrit
private Object key;
private BSONWritable value;
byte[] headerBuf = new byte[4];
private FSDataInputStream in;
private InputStream in;
private int numDocsRead = 0;
private boolean finished = false;

private BSONCallback callback;
private BSONDecoder decoder;

public BSONFileRecordReader(){ }

private CompressionCodecFactory compressionCodecs = null;
private CompressionCodec codec;
private Decompressor decompressor;
private Seekable filePosition;

public void initialize(InputSplit inputSplit, Configuration conf) throws IOException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = conf;
Path file = fileSplit.getPath();
compressionCodecs = new CompressionCodecFactory(this.conf);
codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file, 16*1024*1024);
in.seek(fileSplit.getStart());
FSDataInputStream fileIn = fs.open(file, 16*1024*1024);
if (codec == null) {
log.info("reading split " + this.fileSplit.toString());
fileIn.seek(fileSplit.getStart());
in = fileIn;
} else {
if (fileSplit.getStart() > 0) {
throw new IOException("File is not seekable but start of split is non-zero");
}
decompressor = CodecPool.getDecompressor(codec);
in = codec.createInputStream(fileIn, decompressor);
log.info("reading compressed split " + this.fileSplit.toString());
// start is ignored. as the file is not really seekable.
// but then the splits should be starting at 0
}
filePosition = fileIn;

if (MongoConfigUtil.getLazyBSON(conf)) {
callback = new LazyBSONCallback();
decoder = new LazyBSONDecoder();
Expand All @@ -77,7 +104,8 @@ public void initialize(InputSplit inputSplit, Configuration conf) throws IOExcep
@Override
public boolean next(NullWritable key, BSONWritable value) throws IOException {
try{
if(in.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()){
if (filePosition.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()
&& (codec == null || in.available() == 0)) {
try{
this.close();
}catch(Exception e){
Expand All @@ -93,7 +121,7 @@ public boolean next(NullWritable key, BSONWritable value) throws IOException {

numDocsRead++;
if(numDocsRead % 5000 == 0){
log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + in.getPos());
log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + filePosition.getPos());
}
return true;
}catch(Exception e){
Expand All @@ -111,15 +139,15 @@ public float getProgress() throws IOException {
if(this.finished)
return 1f;
if(in != null)
return new Float(in.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength();
return new Float(filePosition.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength();
return 0f;
}

public long getPos() throws IOException {
if(this.finished)
return this.fileSplit.getStart() + this.fileSplit.getLength();
if(in != null )
return in.getPos();
if (in != null)
return filePosition.getPos();
return this.fileSplit.getStart();
}

Expand All @@ -140,6 +168,9 @@ public void close() throws IOException {
if(this.in != null){
in.close();
}
if (codec != null) {
((FSDataInputStream)this.filePosition).close();
}
}

}
15 changes: 13 additions & 2 deletions core/src/main/java/com/mongodb/hadoop/splitter/BSONSplitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.BlockLocation;
Expand All @@ -45,6 +44,8 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.compress.CompressionCodecFactory;


import org.bson.BSONObject;
import org.bson.BasicBSONCallback;
Expand Down Expand Up @@ -153,8 +154,18 @@ public void readSplitsForFile(FileStatus file) throws IOException{
ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
FileSystem fs = path.getFileSystem(getConf());
long length = file.getLen();
if(!getConf().getBoolean("bson.split.read_splits", true)){
boolean dosplits = true;
if (!getConf().getBoolean("bson.split.read_splits", true)) {
log.info("Reading splits is disabled - constructing single split for " + file);
dosplits = false;
} else {
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(getConf());
if (compressionCodecs.getCodec(file.getPath()) != null) {
dosplits = false;
}
log.info("File is compressed - constructing single split for " + file);
}
if (!dosplits) {
FileSplit onesplit = createFileSplit(file, fs, 0, length);
splits.add(onesplit);
this.splitsList = splits;
Expand Down