Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Add compression support to BSONFileInputFormat. #82

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

nlaveaucriteo
Copy link

  • Prevent BSONSplitter to try to split compressed files.
  • BSONFileRecordReader detects whether a file is compressed, and if so,
    use the proper codec. This code is heavily inspired for the one from
    TextInputFormat.

- Prevent BSONSplitter to try to split compressed files.
- BSONFileRecordReader detects whether a file is compressed, and if so,
use the proper codec. This code is heavily inspired for the one from
TextInputFormat.
@alangalvino
Copy link

Hi, what's the status of this pull request? Will be available in next release?

What about LZO compression with BSON files?

@alangalvino
Copy link

I tested this pull request and worked well, but the compressed BSON file ins't splittable.

I created this project(https://github.com/alangalvino/BSON-Splitter) to split BSON files because a large BSON file compressed ins't splittable in Hadoop Map Reduce.

My approach was split large BSON files (> 1GB) in small pieces (250MB) and compress them. What do you think about it?

@alangalvino
Copy link

Running with a gzip file i'm getting an error:

java.io.IOException: unexpected EOF
    at org.bson.BasicBSONDecoder$BSONInput._need(BasicBSONDecoder.java:416)
    at org.bson.BasicBSONDecoder$BSONInput.readInt(BasicBSONDecoder.java:429)
    at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:149)
    at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:132)
    at inlocomedia.atlas.util.CompressBSONFileRecordReader.nextKeyValue(CompressBSONFileRecordReader.java:132)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:533)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

With bzip everything seems right.

@alangalvino
Copy link

With bz2 I'm losting data, for example, running a map reduce in uncompress BSON file i get 1006 records and running with same BSON file compressed with bz2, I'm getting 1000 records.

The error in gzip described above it's not influencing the map reduce result, with gzip i'm getting 1006 records the same as in pure BSON file.

@nlaveaucriteo
Copy link
Author

I'll try to see what's wrong with bz2 as soon as I can, as well as updating the pull request to the latest mongo-hadoop version.

Your approach with BSON-Splitter is similar to what I am doing :)

@alangalvino
Copy link

Great,

I tried figure out what's wrong with gzip(the exception described above) and the result was that in last decode the BSONDecoder was waiting a Integer(4bytes) but getting 1 byte and throw EOF exception, here is the code:

 @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        try{
            if (filePosition.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()
                    && (codec == null || in.available() == 0)) {
                try{
                    this.close();
                }catch(Exception e){
                }finally{
                    return false;
                }
            }

            callback.reset();


            // The problem was in this line
            int bytesRead = decoder.decode(in, callback);
            // Just to highlight


            value = (BSONObject) callback.get();
            numDocsRead++;
            if(numDocsRead % 10000 == 0){
                log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + filePosition.getPos());
            }
            return true;
        }catch(Exception e){
            try{
                if(!(e.getMessage().equals("unexpected EOF") && codec instanceof org.apache.hadoop.io.compress.GzipCodec)) {
                    e.printStackTrace();
                    log.warn("Error reading key/value from bson file: " + e.getMessage());
                }
                this.close();
            }catch(Exception e2){
            }finally{
                return false;
            }
        }
    }

@llvtt llvtt added the core label Apr 1, 2015
@llvtt
Copy link

llvtt commented Aug 21, 2015

Splittable BSON compression is a feature I'd like to have in mongo-hadoop for the 1.5 release. @nlaveaucriteo, @alangalvino, have either of you gotten compression to work while keeping BSON files splittable? If so, would one of you like to update this pull request or make a new one that does this?

I'm happy to implement this myself, but I don't want to undermine the efforts of anyone who's already completed this feature.

@alangalvino
Copy link

Hi Luke,

I'm still splitting my BSON files(more than 1GB) with https://github.com/alangalvino/BSON-Splitter and compressing them with gzip compression(files wit 256MB). Before run my job I decompress and split again using this file CompressBSONFileInputFormat.java(https://gist.github.com/alangalvino/8bd2842935cd43a536e4) with code borrowed from this pull request.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants