From 03e4326cc016680403065114f849f8048b81f54c Mon Sep 17 00:00:00 2001 From: Geoff Minerbo Date: Thu, 7 Aug 2014 15:27:57 -0700 Subject: [PATCH 1/2] Changes required to support highly nested Mongo documents, and enable schema retrieval. --- .../com/mongodb/hadoop/pig/BSONLoader.java | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java index a9f19ec9..c6ab4a8b 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java @@ -8,9 +8,12 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.Expression; import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.ResourceStatistics; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.BagFactory; @@ -29,7 +32,7 @@ import java.util.HashMap; import java.util.Map; -public class BSONLoader extends LoadFunc { +public class BSONLoader extends LoadFunc implements LoadMetadata { private static TupleFactory tupleFactory = TupleFactory.getInstance(); private static BagFactory bagFactory = BagFactory.getInstance(); @@ -133,7 +136,7 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi ResourceFieldSchema[] fs = s.getFields(); Tuple t = tupleFactory.newTuple(fs.length); - BasicDBObject val = (BasicDBObject) obj; + BasicBSONObject val = (BasicBSONObject) obj; for (int j = 0; j < fs.length; j++) { t.set(j, readField(val.get(fs[j].getName()), fs[j])); @@ -150,12 +153,12 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi DataBag bag = bagFactory.newDefaultBag(); - BasicDBList vals = (BasicDBList) obj; + BasicBSONList vals = (BasicBSONList) obj; for (Object val1 : vals) { t = tupleFactory.newTuple(fs.length); for (int k = 0; k < fs.length; k++) { - t.set(k, readField(((BasicDBObject) val1).get(fs[k].getName()), fs[k])); + t.set(k, readField(((BasicBSONObject) val1).get(fs[k].getName()), fs[k])); } bag.add(t); } @@ -220,5 +223,28 @@ public static Object convertBSONtoPigType(final Object o) throws ExecException { } } + + @Override + public ResourceSchema getSchema(String location, Job job) + throws IOException { + return this.schema; + } + + @Override + public ResourceStatistics getStatistics(String location, Job job) + throws IOException { + return null; + } + + @Override + public String[] getPartitionKeys(String location, Job job) + throws IOException { + return null; + } + + @Override + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + } } From 0cdd20089af400f30329f5f21839b6eed32fa3fd Mon Sep 17 00:00:00 2001 From: Geoff Minerbo Date: Tue, 12 Aug 2014 16:05:03 -0700 Subject: [PATCH 2/2] Adding unit test explicitly passing a BasicBSONObject to the BSONLoader class. --- build.gradle | 3 +- .../com/mongodb/hadoop/pig/BSONLoader.java | 2 - .../mongodb/hadoop/pig/BSONLoaderTest.java | 66 +++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java diff --git a/build.gradle b/build.gradle index 43449fa3..7f96fc48 100644 --- a/build.gradle +++ b/build.gradle @@ -106,6 +106,7 @@ configure(subprojects) { testCompile 'org.zeroturnaround:zt-exec:1.6' testCompile 'com.jayway.awaitility:awaitility:1.6.0' testCompile 'commons-daemon:commons-daemon:1.0.15' + testCompile 'org.easymock:easymock:3.2' testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}" testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}:tests" @@ -394,4 +395,4 @@ task enronEmails(dependsOn: [downloadEnronEmails, configureCluster]) << { "com.mongodb.hadoop.examples.enron.EnronMail", ["mongo.input.split_size=64"]) } -apply from: 'gradle/maven-deployment.gradle' \ No newline at end of file +apply from: 'gradle/maven-deployment.gradle' diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java index c6ab4a8b..f61ba303 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java @@ -1,7 +1,5 @@ package com.mongodb.hadoop.pig; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; import com.mongodb.hadoop.BSONFileInputFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java new file mode 100644 index 00000000..ce8a8986 --- /dev/null +++ b/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java @@ -0,0 +1,66 @@ +package com.mongodb.hadoop.pig; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.data.Tuple; +import org.bson.BasicBSONObject; +import org.bson.types.BasicBSONList; +import org.bson.types.ObjectId; +import org.easymock.EasyMock; +import org.junit.Test; + +public class BSONLoaderTest { + + @Test + public void testSimpleObject() throws Exception { + + String schema = "id:chararray," + + "name:chararray," + + "books:bag{tuple(title:chararray)}"; + /* + This goal of this test is to pass an instance of BasicBSONObject, + NOT it's child BasicDBObject, to the BSONLoader class. It's possible + to do the following + + // Creates a BasicDBObject which we don't want + BasicBSONObject topLevelObject = (BasicBSONObject) JSON.parse(jsonDataStr); + + yet this creates a BasicDBObject under the hood. This is why the + following BSONobject is constructed long hand below. + + Test BSON object: + {"_id" : ObjectId("53ea61edc2e62b61021c2f2e"), + "name" : "Jane", + "books": [{"title":"Lord of the Rings"}] + } + */ + BasicBSONObject book1 = new BasicBSONObject(); + book1.append("title", "Lord of the Rings"); + BasicBSONList bookList = new BasicBSONList(); + bookList.add(book1); + BasicBSONObject simpleObject = new BasicBSONObject(); + simpleObject.append("_id", new ObjectId(DatatypeConverter.parseHexBinary("53ea61edc2e62b61021c2f2e"))); + simpleObject.append("name", "Jane"); + simpleObject.append("books", bookList); + + RecordReader recordReader = EasyMock.createNiceMock(RecordReader.class); + expect(recordReader.nextKeyValue()).andReturn(true).once(); + expect(recordReader.nextKeyValue()).andReturn(false); + expect(recordReader.getCurrentValue()).andReturn(simpleObject).once(); + expect(recordReader.getCurrentValue()).andReturn(null); + replay(recordReader); + + BSONLoader bsonLoader = new BSONLoader("id", schema); + bsonLoader.prepareToRead(recordReader, null); + Tuple t = bsonLoader.getNext(); + + String expected = "(53ea61edc2e62b61021c2f2e,Jane,{(Lord of the Rings)})"; + assertEquals(expected, t.toString()); + } + +} \ No newline at end of file