From 5140bf861c707996f957a3e79ea71c2383388e13 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Fri, 14 Mar 2014 12:44:14 -0400 Subject: [PATCH 1/9] Mortar changes for mongo-hadoop connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix bugs with MongoOutputFormat’s updateKeys and multiUpdate. * Better error message when people can’t connect to Mongo * Added tests for splitting code * bytearray is converted to pig DataByteArray * If we don’t know the field of an inner object use convertBSONtoPigType * Handle inner bags without a schema * In schemaless mode convert a list to a bag. * Add option for loading the whole record as a chararray --- .../com/mongodb/hadoop/MongoOutputFormat.java | 2 +- .../hadoop/mapred/MongoOutputFormat.java | 13 ++- .../mapred/output/MongoRecordWriter.java | 31 ++++++- .../hadoop/output/MongoRecordWriter.java | 2 +- .../splitter/MongoCollectionSplitter.java | 13 ++- .../splitter/ShardChunkMongoSplitter.java | 14 +++ .../splitter/ShardChunkMongoSplitterTest.java | 78 ++++++++++++++++ .../hadoop/splitter/TestMongoInputSplit.java | 11 +++ .../com/mongodb/hadoop/pig/BSONLoader.java | 73 +++++++++------ .../com/mongodb/hadoop/pig/MongoLoader.java | 64 +++++++++++--- .../com/mongodb/hadoop/pig/MongoStorage.java | 4 + .../mongodb/hadoop/pig/MongoLoaderTest.java | 88 ++++++++++++++++++- 12 files changed, 344 insertions(+), 49 deletions(-) create mode 100644 core/src/test/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitterTest.java create mode 100644 core/src/test/java/com/mongodb/hadoop/splitter/TestMongoInputSplit.java diff --git a/core/src/main/java/com/mongodb/hadoop/MongoOutputFormat.java b/core/src/main/java/com/mongodb/hadoop/MongoOutputFormat.java index cc915d58..6a0f040f 100644 --- a/core/src/main/java/com/mongodb/hadoop/MongoOutputFormat.java +++ b/core/src/main/java/com/mongodb/hadoop/MongoOutputFormat.java @@ -54,7 +54,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) { * Get the record writer that points to the output collection. */ public RecordWriter getRecordWriter(final TaskAttemptContext context) { - return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context); + return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context, updateKeys, multiUpdate); } public MongoOutputFormat() { diff --git a/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java b/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java index f9368037..af48b853 100644 --- a/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java +++ b/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java @@ -20,6 +20,7 @@ import com.mongodb.hadoop.mapred.output.MongoOutputCommitter; import com.mongodb.hadoop.mapred.output.MongoRecordWriter; import com.mongodb.hadoop.util.MongoConfigUtil; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCommitter; @@ -33,7 +34,17 @@ @SuppressWarnings("deprecation") public class MongoOutputFormat implements OutputFormat { + + private final String[] updateKeys; + private final boolean multiUpdate; + public MongoOutputFormat() { + this(null, false); + } + + public MongoOutputFormat(String[] updateKeys, boolean multiUpdate) { + this.updateKeys = updateKeys; + this.multiUpdate = multiUpdate; } public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException { @@ -50,7 +61,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) { public RecordWriter getRecordWriter(final FileSystem ignored, final JobConf job, final String name, final Progressable progress) { - return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(job), job); + return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(job), job, updateKeys, multiUpdate); } } diff --git a/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java b/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java index a154f162..5ccb664a 100644 --- a/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java +++ b/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java @@ -23,6 +23,7 @@ import com.mongodb.MongoException; import com.mongodb.hadoop.MongoOutput; import com.mongodb.hadoop.io.BSONWritable; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; @@ -38,11 +39,21 @@ public class MongoRecordWriter implements RecordWriter { private final List collections; private final JobConf configuration; - + + private final String[] updateKeys; + private final boolean multiUpdate; + public MongoRecordWriter(final List c, final JobConf conf) { + this(c, conf, null, false); + } + + + public MongoRecordWriter(final List c, final JobConf conf, String[] updateKeys, boolean multiUpdate) { collections = c; configuration = conf; numberOfHosts = c.size(); + this.updateKeys = updateKeys; + this.multiUpdate = multiUpdate; } @@ -75,7 +86,23 @@ public void write(final K key, final V value) throws IOException { try { DBCollection dbCollection = getDbCollectionByRoundRobin(); - dbCollection.save(o); + + if (updateKeys == null) { + dbCollection.save(o); + } else { + // Form the query fields + DBObject query = new BasicDBObject(updateKeys.length); + for (String updateKey : updateKeys) { + query.put(updateKey, o.get(updateKey)); + o.removeField(updateKey); + } + // If _id is null remove it, we don't want to override with null _id + if (o.get("_id") == null) { + o.removeField("_id"); + } + DBObject set = new BasicDBObject().append("$set", o); + dbCollection.update(query, set, true, multiUpdate); + } } catch (final MongoException e) { throw new IOException("can't write to mongo", e); } diff --git a/core/src/main/java/com/mongodb/hadoop/output/MongoRecordWriter.java b/core/src/main/java/com/mongodb/hadoop/output/MongoRecordWriter.java index 685770b5..cc037fe8 100644 --- a/core/src/main/java/com/mongodb/hadoop/output/MongoRecordWriter.java +++ b/core/src/main/java/com/mongodb/hadoop/output/MongoRecordWriter.java @@ -74,7 +74,7 @@ public MongoRecordWriter(final List c, final TaskAttemptContext ct collections = new ArrayList(c); context = ctx; this.updateKeys = updateKeys; - this.multiUpdate = false; + this.multiUpdate = multi; this.numberOfHosts = c.size(); //authenticate if necessary - but don't auth twice on same DB diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java index de99a181..4f25f35b 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java @@ -122,8 +122,17 @@ public MongoCollectionSplitter(final Configuration conf) { protected void init() { MongoURI inputURI = MongoConfigUtil.getInputURI(conf); - this.inputCollection = MongoConfigUtil.getCollection(inputURI); - DB db = this.inputCollection.getDB(); + + DB db; + try { + this.inputCollection = MongoConfigUtil.getCollection(inputURI); + db = this.inputCollection.getDB(); + } catch (Exception e) { + String message = e.getMessage() + "\n\nMongo connection strings are required to be of the form:\n" + + " mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database.collection"; + throw new IllegalStateException(message, e); + } + this.mongo = db.getMongo(); MongoURI authURI = MongoConfigUtil.getAuthURI(conf); if (authURI != null) { diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitter.java index 92c667d2..cb50b9d3 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitter.java @@ -123,6 +123,20 @@ public List calculateSplits() throws SplitFailedException { numChunks++; } + return createSplitList(numChunks, shardToSplits); + } + + /** + * Round robin splits across shards. The splits are going to end up as Map jobs + * processed in the same order as the splits. We want to have continuous map + * jobs be on separate shards so that as you're completing map jobs the work + * is spread evenly across shard machines. + * + * @param numChunks - Number of chunks + * @param shardToSplits - Map of shardName to list of splits on that shard. + */ + protected static List createSplitList(int numChunks, + Map> shardToSplits) { final List splits = new ArrayList(numChunks); int splitIndex = 0; while (splitIndex < numChunks) { diff --git a/core/src/test/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitterTest.java b/core/src/test/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitterTest.java new file mode 100644 index 00000000..b311367b --- /dev/null +++ b/core/src/test/java/com/mongodb/hadoop/splitter/ShardChunkMongoSplitterTest.java @@ -0,0 +1,78 @@ +package com.mongodb.hadoop.splitter; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.junit.Test; + +import com.mongodb.MongoURI; + +@SuppressWarnings("deprecation") +public class ShardChunkMongoSplitterTest { + @Test + public void testCreateSplitList_oneShard() { + int numChunks = 2; + InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1")); + InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2")); + LinkedList shardSplits = new LinkedList(Arrays.asList(split1, split2)); + + Map> shardToSplits = new HashMap>(); + shardToSplits.put("shard1", shardSplits); + + List splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits); + assertEquals(split1, splits.get(0)); + assertEquals(split2, splits.get(1)); + } + + @Test + public void testCreateSplitList_twoEvenShards() { + int numChunks = 4; + InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1")); + InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2")); + InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3")); + InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4")); + LinkedList shardSplits1 = new LinkedList(Arrays.asList(split1, split2)); + LinkedList shardSplits2 = new LinkedList(Arrays.asList(split3, split4)); + + Map> shardToSplits = new HashMap>(); + shardToSplits.put("shard1", shardSplits1); + shardToSplits.put("shard2", shardSplits2); + + List splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits); + assertEquals(split1, splits.get(0)); + assertEquals(split3, splits.get(1)); + assertEquals(split2, splits.get(2)); + assertEquals(split4, splits.get(3)); + } + + @Test + public void testCreateSplitList_twoUnevenShards() { + int numChunks = 6; + InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1")); + InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2")); + InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3")); + InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4")); + InputSplit split5 = new TestMongoInputSplit(new MongoURI("mongodb://split5")); + InputSplit split6 = new TestMongoInputSplit(new MongoURI("mongodb://split6")); + LinkedList shardSplits1 = new LinkedList(Arrays.asList(split1, split2)); + LinkedList shardSplits2 = new LinkedList(Arrays.asList(split3, split4, split5, split6)); + + Map> shardToSplits = new HashMap>(); + shardToSplits.put("shard1", shardSplits1); + shardToSplits.put("shard2", shardSplits2); + + List splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits); + assertEquals(split1, splits.get(0)); + assertEquals(split3, splits.get(1)); + assertEquals(split2, splits.get(2)); + assertEquals(split4, splits.get(3)); + assertEquals(split5, splits.get(4)); + assertEquals(split6, splits.get(5)); + } +} diff --git a/core/src/test/java/com/mongodb/hadoop/splitter/TestMongoInputSplit.java b/core/src/test/java/com/mongodb/hadoop/splitter/TestMongoInputSplit.java new file mode 100644 index 00000000..0d19e125 --- /dev/null +++ b/core/src/test/java/com/mongodb/hadoop/splitter/TestMongoInputSplit.java @@ -0,0 +1,11 @@ +package com.mongodb.hadoop.splitter; + +import com.mongodb.MongoURI; +import com.mongodb.hadoop.input.MongoInputSplit; + +public class TestMongoInputSplit extends MongoInputSplit { + + public TestMongoInputSplit(MongoURI inputURI) { + this.inputURI = inputURI; + } +} \ No newline at end of file diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java index 03c48be6..f4c5a425 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java @@ -1,8 +1,10 @@ package com.mongodb.hadoop.pig; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.hadoop.BSONFileInputFormat; +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.InputFormat; @@ -15,6 +17,7 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -24,10 +27,7 @@ import org.bson.types.BasicBSONList; import org.bson.types.ObjectId; -import java.io.IOException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; +import com.mongodb.hadoop.BSONFileInputFormat; public class BSONLoader extends LoadFunc { @@ -112,9 +112,10 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi try { if (field == null) { - return obj; + //If we don't know the type we're using, try to convert it directly. + return convertBSONtoPigType(obj); } - + switch (field.getType()) { case DataType.INTEGER: return Integer.parseInt(obj.toString()); @@ -125,7 +126,7 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi case DataType.DOUBLE: return Double.parseDouble(obj.toString()); case DataType.BYTEARRAY: - return BSONLoader.convertBSONtoPigType(obj); + return new DataByteArray(obj.toString()); case DataType.CHARARRAY: return obj.toString(); case DataType.TUPLE: @@ -133,7 +134,7 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi ResourceFieldSchema[] fs = s.getFields(); Tuple t = tupleFactory.newTuple(fs.length); - BasicDBObject val = (BasicDBObject) obj; + BasicBSONObject val = (BasicBSONObject) obj; for (int j = 0; j < fs.length; j++) { t.set(j, readField(val.get(fs[j].getName()), fs[j])); @@ -142,22 +143,34 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi return t; case DataType.BAG: + //We already know the bag has a schema of length 1 which is + //a tuple, so skip that schema and get the schema of the tuple. s = field.getSchema(); - fs = s.getFields(); + ResourceFieldSchema[] bagFields = s.getFields(); - s = fs[0].getSchema(); - fs = s.getFields(); + s = bagFields[0].getSchema(); DataBag bag = bagFactory.newDefaultBag(); - - BasicDBList vals = (BasicDBList) obj; - - for (Object val1 : vals) { - t = tupleFactory.newTuple(fs.length); - for (int k = 0; k < fs.length; k++) { - t.set(k, readField(((BasicDBObject) val1).get(fs[k].getName()), fs[k])); + BasicBSONList vals = (BasicBSONList) obj; + + if (s == null) { + //Handle lack of schema - We'll create a separate tuple for each item in this bag. + for(int j = 0; j < vals.size(); j++) { + t = tupleFactory.newTuple(1); + t.set(0, readField(vals.get(j), null)); + bag.add(t); + } + } else { + fs = s.getFields(); + for (Object val1 : vals) { + t = tupleFactory.newTuple(fs.length); + + for(int k = 0; k < fs.length; k++) { + String fieldName = fs[k].getName(); + t.set(k, readField(((BasicBSONObject) val1).get(fieldName), fs[k])); + } + bag.add(t); } - bag.add(t); } return bag; @@ -172,7 +185,7 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi if (fs != null) { outputMap.put(key, readField(inputMap.get(key), fs[0])); } else { - outputMap.put(key, readField(inputMap.get(key), null)); + outputMap.put(key, convertBSONtoPigType(inputMap.get(key))); } } return outputMap; @@ -187,10 +200,8 @@ protected static Object readField(final Object obj, final ResourceFieldSchema fi LOG.warn("Type " + type + " for field " + fieldName + " can not be applied to " + obj.getClass().toString()); return null; } - } - public static Object convertBSONtoPigType(final Object o) throws ExecException { if (o == null) { return null; @@ -202,11 +213,15 @@ public static Object convertBSONtoPigType(final Object o) throws ExecException { return o.toString(); } else if (o instanceof BasicBSONList) { BasicBSONList bl = (BasicBSONList) o; - Tuple t = tupleFactory.newTuple(bl.size()); - for (int i = 0; i < bl.size(); i++) { - t.set(i, convertBSONtoPigType(bl.get(i))); + DataBag bag = bagFactory.newDefaultBag(); + + for(int i = 0; i < bl.size(); i++) { + Tuple t = tupleFactory.newTuple(1); + t.set(0, convertBSONtoPigType(bl.get(i))); + bag.add(t); } - return t; + + return bag; } else if (o instanceof Map) { //TODO make this more efficient for lazy objects? Map fieldsMap = (Map) o; diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java index 0d9dbdbb..40c813bb 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java @@ -1,7 +1,12 @@ package com.mongodb.hadoop.pig; -import com.mongodb.hadoop.MongoInputFormat; -import com.mongodb.hadoop.util.MongoConfigUtil; +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -15,12 +20,15 @@ import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Utils; import org.bson.BSONObject; -import java.io.IOException; +import com.mongodb.hadoop.MongoInputFormat; +import com.mongodb.hadoop.util.MongoConfigUtil; +import com.mongodb.util.JSON; public class MongoLoader extends LoadFunc implements LoadMetadata { private static final Log LOG = LogFactory.getLog(MongoStorage.class); @@ -34,6 +42,10 @@ public class MongoLoader extends LoadFunc implements LoadMetadata { private ResourceFieldSchema[] fields; private String idAlias = null; + private final static CommandLineParser parser = new GnuParser(); + private final Options validOptions = new Options(); + boolean loadAsChararray = false; + @Override public void setUDFContextSignature(final String signature) { } @@ -44,24 +56,46 @@ public MongoLoader() { this.fields = null; } - public ResourceFieldSchema[] getFields() { - return this.fields; + + + public MongoLoader(final String userSchema) { + this(userSchema, null); } public MongoLoader(final String userSchema, final String idAlias) { - this.idAlias = idAlias; + initializeSchema(userSchema, idAlias); + } + + public MongoLoader(final String userSchema, final String idAlias, final String options) { + String[] optsArr = options.split(" "); + populateValidOptions(); + try { + CommandLine configuredOptions = parser.parse(validOptions, optsArr); + loadAsChararray = configuredOptions.hasOption("loadaschararray"); + initializeSchema(userSchema, idAlias); + } catch (ParseException e) { + throw new IllegalArgumentException("Invalid Options " + e.getMessage(), e); + } + } + + private void initializeSchema(final String userSchema, final String idAlias) { try { - schema = new ResourceSchema(Utils.getSchemaFromString(userSchema)); + //Remove new lines from schema string. + schema = new ResourceSchema(Utils.getSchemaFromString(userSchema.replaceAll("\\s|\\\\n",""))); fields = schema.getFields(); } catch (Exception e) { - throw new IllegalArgumentException("Invalid Schema Format"); + throw new IllegalArgumentException("Invalid Schema Format", e); } } - - public MongoLoader(final String userSchema) { - this(userSchema, null); + + private void populateValidOptions() { + validOptions.addOption("loadaschararray", false, "Loads the entire record as a chararray"); } + public ResourceFieldSchema[] getFields() { + return this.fields; + } + @Override public void setLocation(final String location, final Job job) throws IOException { MongoConfigUtil.setInputURI(job.getConfiguration(), location); @@ -94,6 +128,14 @@ public Tuple getNext() throws IOException { } Tuple t; + if (loadAsChararray) { + if (fields != null && (fields.length != 1 || fields[0].getType() != DataType.CHARARRAY)) { + throw new IllegalArgumentException("Invalid schema. If -loadaschararray option is used, schema must be one chararray field.") ; + } + t = tupleFactory.newTuple(1); + t.set(0, JSON.serialize(val)); + return t; + } if (this.fields == null) { // Schemaless mode - just output a tuple with a single element, // which is a map storing the keys/values in the document diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java index 20a4e521..3e778cfe 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java @@ -244,4 +244,8 @@ public void setStoreLocation(final String location, final Job job) throws IOExce public void setStoreFuncUDFContextSignature(final String signature) { udfContextSignature = signature; } + + public void cleanupOnFailure(String location, Job job) throws IOException { + LOG.error("Store operation failed (see logged exception). Your Mongo collection will retain any records inserted or updated by MongoStorage before it failed."); + } } diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java index 15ba096b..5d8e4b17 100644 --- a/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java @@ -2,6 +2,7 @@ import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; + import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.junit.Test; @@ -14,8 +15,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; - - +@SuppressWarnings("rawtypes") public class MongoLoaderTest { @Test public void testSimpleChararray() throws IOException { @@ -226,4 +226,88 @@ public void testMapWithTuple() throws Exception { assertEquals("t21 value", t2.get(0)); */ } + + @Test + public void mapWithMap_noSchema() throws Exception { + BasicDBObject v1 = new BasicDBObject() + .append("t1", "t11 value") + .append("t2", 12); + BasicDBObject v2 = new BasicDBObject() + .append("t1", "t21 value") + .append("t2", 22); + BasicDBObject obj = new BasicDBObject() + .append("v1", v1) + .append("v2", v2); + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + + assertEquals(2, m.size()); + + Map m1 = (Map) m.get("v1"); + assertEquals("t11 value", m1.get("t1")); + assertEquals(12, m1.get("t2")); + + Map m2 = (Map) m.get("v2"); + assertEquals("t21 value", m2.get("t1")); + } + + @Test + public void mapWithList() throws Exception { + BasicDBObject v1 = new BasicDBObject() + .append("t1", "t1 value") + .append("t2", 12); + BasicDBObject v2 = new BasicDBObject() + .append("t1", "t1 value") + .append("t2", 22); + BasicDBList vl = new BasicDBList(); + vl.add(v1); + vl.add(v2); + + BasicDBObject obj = new BasicDBObject() + .append("some_list", vl); + + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + assertEquals(1, m.size()); + + DataBag bag = (DataBag) m.get("some_list"); + assertEquals(2, bag.size()); + + Iterator bit = bag.iterator(); + Tuple t = bit.next(); + + assertEquals(1, t.size()); + + Map innerMap = (Map) t.get(0); + assertEquals("t1 value", innerMap.get("t1")); + } + + @Test + public void testReadField_mapWithSimpleList_noSchema() throws Exception { + BasicDBList vl = new BasicDBList(); + vl.add("v1"); + vl.add("v2"); + + BasicDBObject obj = new BasicDBObject() + .append("some_list", vl); + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + + assertEquals(1, m.size()); + + DataBag bag = (DataBag) m.get("some_list"); + assertEquals(2, bag.size()); + + Iterator bit = bag.iterator(); + Tuple t = bit.next(); + + assertEquals(1, t.size()); + assertEquals("v1", t.get(0)); + + t = bit.next(); + assertEquals(1, t.size()); + assertEquals("v2", t.get(0)); + } + + } From 0d6b8bed34a506b21b8b4e8767f4ecd04b2d3e65 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Tue, 18 Mar 2014 14:28:42 -0400 Subject: [PATCH 2/9] Convert all embedded fields to BSON friendly data types. --- .../com/mongodb/hadoop/pig/BSONStorage.java | 8 ++- .../mongodb/hadoop/pig/BSONStorageTest.java | 63 ++++++++++++++++++- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java index 8e086255..b9a2011b 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java @@ -18,6 +18,7 @@ import com.mongodb.BasicDBObjectBuilder; import com.mongodb.hadoop.BSONFileOutputFormat; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,6 +33,7 @@ import org.apache.pig.StoreFunc; import org.apache.pig.StoreMetadata; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; @@ -99,6 +101,8 @@ public static Object getTypeForBSON(final Object o, final ResourceSchema.Resourc if (dataType == DataType.BYTEARRAY && o instanceof Map) { dataType = DataType.MAP; + } else if (o instanceof DataByteArray) { + dataType = DataType.BYTEARRAY; } switch (dataType) { @@ -153,13 +157,13 @@ public static Object getTypeForBSON(final Object o, final ResourceSchema.Resourc // <*> -> can be any string since the field name of the tuple in a bag should be ignored if (fs.length == 1 && fs[0].getName().equals(toIgnore)) { for (Tuple t : (DataBag) o) { - a.add(t.get(0)); + a.add(getTypeForBSON(t.get(0), fs[0], toIgnore)); } } else { for (Tuple t : (DataBag) o) { Map ma = new LinkedHashMap(); for (int j = 0; j < fs.length; j++) { - ma.put(fs[j].getName(), t.get(j)); + ma.put(fs[j].getName(), getTypeForBSON(t.get(j), fs[j], toIgnore)); } a.add(ma); } diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/BSONStorageTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/BSONStorageTest.java index 56d74ca6..910d1282 100644 --- a/pig/src/test/java/com/mongodb/hadoop/pig/BSONStorageTest.java +++ b/pig/src/test/java/com/mongodb/hadoop/pig/BSONStorageTest.java @@ -1,17 +1,76 @@ package com.mongodb.hadoop.pig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + import org.apache.pig.ResourceSchema; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Utils; import org.junit.Test; -import static org.junit.Assert.assertNull; - public class BSONStorageTest { + TupleFactory tf = TupleFactory.getInstance(); + @Test public void testNullMap() throws Exception { ResourceSchema schema = new ResourceSchema(Utils.getSchemaFromString("m:map[]")); assertNull(BSONStorage.getTypeForBSON(null, schema.getFields()[0], null)); } + + @SuppressWarnings("rawtypes") + @Test + public void testEmbeddedObject() throws Exception { + ResourceSchema schema = new ResourceSchema(Utils.getSchemaFromString("t:tuple(b:bytearray, it:tuple(c:chararray))")); + + Tuple innerTuple = tf.newTuple(1); + innerTuple.set(0, "char"); + + Tuple outerTuple = tf.newTuple(2); + outerTuple.set(0, new DataByteArray("byte")); + outerTuple.set(1, innerTuple); + + Map resultTuple = (Map) BSONStorage.getTypeForBSON(outerTuple, schema.getFields()[0], null); + assertEquals(2, resultTuple.keySet().size()); + assertEquals("byte", resultTuple.get("b")); + + Map resultInnerTuple = (Map) resultTuple.get("it"); + assertEquals(1, resultInnerTuple.size()); + assertEquals("char", resultInnerTuple.get("c")); + } + + @SuppressWarnings("rawtypes") + @Test + public void testEmbeddedBag() throws Exception { + ResourceSchema schema = new ResourceSchema(Utils.getSchemaFromString("t:(group:bytearray, joined:bag{t:(clean_searches::user_id, clean_searches::timestamp)})")); + + DataByteArray group = new DataByteArray("bytey"); + + Tuple innerTuple = tf.newTuple(2); + innerTuple.set(0, new DataByteArray("user_id")); + innerTuple.set(1, new DataByteArray("timestamp")); + + DataBag b = BagFactory.getInstance().newDefaultBag(Arrays.asList(innerTuple)); + + Tuple outerTuple = tf.newTuple(2); + outerTuple.set(0, group); + outerTuple.set(1, b); + + Map resultTuple = (Map) BSONStorage.getTypeForBSON(outerTuple, schema.getFields()[0], null); + assertEquals(2, resultTuple.keySet().size()); + assertEquals("bytey", resultTuple.get("group")); + + ArrayList resultBag = (ArrayList) resultTuple.get("joined"); + assertEquals(1, resultBag.size()); + assertEquals("user_id", ((Map)resultBag.get(0)).get("clean_searches::user_id")); + } } From cfc6acd07c7d3886c0fc156aad741435a5030331 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Wed, 19 Mar 2014 13:07:15 -0400 Subject: [PATCH 3/9] Implement LoadMetadata interface so that illustrate works. --- .../com/mongodb/hadoop/pig/BSONLoader.java | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java index f4c5a425..e149dbc3 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java @@ -10,9 +10,12 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.Expression; import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.ResourceStatistics; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.BagFactory; @@ -29,7 +32,7 @@ import com.mongodb.hadoop.BSONFileInputFormat; -public class BSONLoader extends LoadFunc { +public class BSONLoader extends LoadFunc implements LoadMetadata { private static TupleFactory tupleFactory = TupleFactory.getInstance(); private static BagFactory bagFactory = BagFactory.getInstance(); @@ -236,4 +239,34 @@ public static Object convertBSONtoPigType(final Object o) throws ExecException { } + @Override + public ResourceSchema getSchema(String location, Job job) + throws IOException { + if (schema != null) { + return schema; + } + return null; + } + + @Override + public ResourceStatistics getStatistics(String location, Job job) + throws IOException { + // No statistics available. In the future + // we could maybe construct something from db.collection.stats() here + // but the class/API for this is unstable anyway, so this is unlikely + // to be high priority. + return null; + } + + @Override + public String[] getPartitionKeys(String location, Job job) + throws IOException { + // No partition keys. + return null; + } + + @Override + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + } } From 22374c7128c9a4feb69908ffc34a5b4c643d86e4 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Mon, 24 Mar 2014 08:53:05 -0400 Subject: [PATCH 4/9] Make illustrate work in schema less mode. --- .../main/java/com/mongodb/hadoop/pig/MongoLoader.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java index 40c813bb..e5e4f32c 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java @@ -24,6 +24,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Utils; +import org.apache.pig.parser.ParserException; import org.bson.BSONObject; import com.mongodb.hadoop.MongoInputFormat; @@ -165,8 +166,15 @@ public String relativeToAbsolutePath(final String location, final Path curDir) t public ResourceSchema getSchema(final String location, final Job job) throws IOException { if (schema != null) { return schema; + } else { + try { + //If we didn't have a schema, we loaded the document as a map. + return new ResourceSchema(Utils.getSchemaFromString("document:map[]")); + } catch (ParserException e) { + //Should never get here, but just return null to indicate lack of a schema. + return null; + } } - return null; } @Override From 1e5b8f49d7963330ed8a6d42271a8041b39b9a98 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Mon, 24 Mar 2014 09:49:37 -0400 Subject: [PATCH 5/9] Make illustrate work in schema less mode for BSONLoader --- .../main/java/com/mongodb/hadoop/pig/BSONLoader.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java index e149dbc3..22c81f82 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONLoader.java @@ -25,6 +25,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Utils; +import org.apache.pig.parser.ParserException; import org.bson.BSONObject; import org.bson.BasicBSONObject; import org.bson.types.BasicBSONList; @@ -244,8 +245,15 @@ public ResourceSchema getSchema(String location, Job job) throws IOException { if (schema != null) { return schema; + } else { + try { + //If we didn't have a schema, we loaded the document as a map. + return new ResourceSchema(Utils.getSchemaFromString("document:map[]")); + } catch (ParserException e) { + //Should never get here, but just return null to indicate lack of a schema. + return null; + } } - return null; } @Override From 0c1a19e0da4001bad91818b199ba612a918b3ba0 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Mon, 24 Mar 2014 15:23:35 -0400 Subject: [PATCH 6/9] Create constructor that just takes idField --- .../main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java index 953e2ee1..e136c124 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java @@ -57,6 +57,10 @@ public class MongoInsertStorage extends StoreFunc implements StoreMetadata { public MongoInsertStorage() { } + + public MongoInsertStorage(final String idField) { + this.idField = idField; + } public MongoInsertStorage(final String idField, final String useUpsert) { this.idField = idField; From d623dbd614b90487c2ad472682ab1d53fd7654d3 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Mon, 24 Mar 2014 21:24:58 -0400 Subject: [PATCH 7/9] Initialize idAlias field so that it works. --- pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java index e5e4f32c..fb3fdcd5 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java @@ -80,6 +80,7 @@ public MongoLoader(final String userSchema, final String idAlias, final String o } private void initializeSchema(final String userSchema, final String idAlias) { + this.idAlias = idAlias; try { //Remove new lines from schema string. schema = new ResourceSchema(Utils.getSchemaFromString(userSchema.replaceAll("\\s|\\\\n",""))); From dde564d7dfaf5a98c5bbbfed6a3a43fe4e6414a0 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Thu, 27 Mar 2014 21:33:05 -0400 Subject: [PATCH 8/9] Add tests that run full Pig scripts against local Mongo database. --- .../mongodb/hadoop/pig/BSONLoaderTest.java | 401 ++++++++++++++++ .../hadoop/pig/MongoInsertStorageTest.java | 57 +++ .../mongodb/hadoop/pig/MongoLoaderTest.java | 429 ++++++------------ .../hadoop/pig/MongoUpdateStorageTest.java | 76 ++++ .../test/resources/testEmbeddedObject.bson | Bin 0 -> 103 bytes .../test/resources/testSimpleChararray.bson | Bin 0 -> 72 bytes 6 files changed, 675 insertions(+), 288 deletions(-) create mode 100644 pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java create mode 100644 pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java create mode 100644 pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java create mode 100644 pig/src/test/resources/testEmbeddedObject.bson create mode 100644 pig/src/test/resources/testSimpleChararray.bson diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java new file mode 100644 index 00000000..c2df1fa5 --- /dev/null +++ b/pig/src/test/java/com/mongodb/hadoop/pig/BSONLoaderTest.java @@ -0,0 +1,401 @@ +package com.mongodb.hadoop.pig; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.Utils; +import org.bson.types.ObjectId; +import org.junit.Test; + +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; + +public class BSONLoaderTest { + private static PigServer pigServerLocal = null; + + @Test + public void testSimpleChararray_fullRun() throws Exception { + //Expected Data: + // { "_id" : ObjectId("5334bd1f300498022c34dc21"), "a" : "value1" } + // { "_id" : ObjectId("5334bd1f300498022c34dc22"), "a" : "value2" } + + pigServerLocal = new PigServer(ExecType.LOCAL); + + //Test no schema + pigServerLocal.registerQuery( + "A = LOAD 'pig/src/test/resources/testSimpleChararray.bson' using com.mongodb.hadoop.pig.BSONLoader();"); + + Iterator iter = pigServerLocal.openIterator("A"); + assertTrue(iter.hasNext()); + Tuple result1 = iter.next(); + assertTrue(iter.hasNext()); + Tuple result2 = iter.next(); + assertFalse(iter.hasNext()); + + Map map1 = (Map) result1.get(0); + assertEquals("value1", map1.get("a")); + assertEquals(new ObjectId("5334bd1f300498022c34dc21"), map1.get("_id")); + + Map map2 = (Map) result2.get(0); + assertEquals("value2", map2.get("a")); + assertEquals(new ObjectId("5334bd1f300498022c34dc22"), map2.get("_id")); + + + //Test Schema and idAlias + pigServerLocal.registerQuery( + "B = LOAD 'pig/src/test/resources/testSimpleChararray.bson' using com.mongodb.hadoop.pig.BSONLoader('mongo_id', 'mongo_id:chararray, a:chararray');"); + + iter = pigServerLocal.openIterator("B"); + assertTrue(iter.hasNext()); + result1 = iter.next(); + assertTrue(iter.hasNext()); + result2 = iter.next(); + assertFalse(iter.hasNext()); + + assertEquals(new ObjectId("5334bd1f300498022c34dc21"), result1.get(0)); + assertEquals("value1", result1.get(1)); + assertEquals(new ObjectId("5334bd1f300498022c34dc22"), result2.get(0)); + assertEquals("value2", result2.get(1)); + } + + @Test + public void testEmbeddedObject_fullRun() throws Exception { + // Input Data: {"_id":ObjectId("5334bd2b300498022c34dc23"), "f1":"v1", "f2":2, "f3": [1,2,3], "f4":{"i1":"inner1","i2":"inner2"}} + + pigServerLocal = new PigServer(ExecType.LOCAL); + + //Test no schema + pigServerLocal.registerQuery( + "A = LOAD 'pig/src/test/resources/testEmbeddedObject.bson' using com.mongodb.hadoop.pig.BSONLoader();"); + + Iterator iter = pigServerLocal.openIterator("A"); + assertTrue(iter.hasNext()); + Tuple result1 = iter.next(); + assertFalse(iter.hasNext()); + + Map map1 = (Map) result1.get(0); + assertEquals(new ObjectId("5334bd2b300498022c34dc23"), map1.get("_id")); + assertEquals("v1", map1.get("f1")); + assertEquals(2, map1.get("f2")); + + DataBag bag = (DataBag) map1.get("f3"); + assertEquals(3, bag.size()); + assertEquals(1, bag.iterator().next().get(0)); + + Map innerMap = (Map) map1.get("f4"); + assertEquals("inner1", innerMap.get("i1")); + assertEquals(2, innerMap.get("i2")); + + + //Test Schema and idAlias + pigServerLocal.registerQuery(String.format( + "B = LOAD 'pig/src/test/resources/testEmbeddedObject.bson' using com.mongodb.hadoop.pig.BSONLoader('mongo_id', 'mongo_id:chararray, f1:chararray, f2:int, f3:bag{}, f4:(i1:chararray, i2:int)');")); + + iter = pigServerLocal.openIterator("B"); + assertTrue(iter.hasNext()); + result1 = iter.next(); + assertFalse(iter.hasNext()); + + assertEquals(new ObjectId("5334bd2b300498022c34dc23"), result1.get(0)); + assertEquals("v1", result1.get(1)); + assertEquals(2, result1.get(2)); + + bag = (DataBag) result1.get(3); + assertEquals(3, bag.size()); + assertEquals(1, bag.iterator().next().get(0)); + + Tuple innerTuple = (Tuple) result1.get(4); + assertEquals("inner1", innerTuple.get(0)); + assertEquals(2, innerTuple.get(1)); + } + + @Test + public void testSimpleChararray() throws Exception { + Object result = BSONLoader.readField("value", getSchema("d:chararray")); + assertEquals("value", result); + } + + @Test + public void testSimpleFloat() throws Exception { + Object result = BSONLoader.readField(1.1F, getSchema("d:float")); + assertEquals(1.1F, result); + } + + @Test + public void testSimpleFloatAsDouble() throws Exception { + Object result = BSONLoader.readField(1.1D, getSchema("d:float")); + assertEquals(1.1F, result); + } + + @Test + public void testSimpleTuple() throws Exception { + String userSchema = "t:tuple(t1:chararray, t2:chararray)"; + Object val = new BasicDBObject() + .append("t1", "t1_value") + .append("t2", "t2_value"); + + Object result = BSONLoader.readField(val, getSchema(userSchema)); + + Tuple t = (Tuple) result; + assertEquals(2, t.size()); + assertEquals("t1_value", t.get(0)); + assertEquals("t2_value", t.get(1)); + } + + @Test + public void testSimpleTupleMissingField() throws Exception { + String userSchema = "t:tuple(t1:chararray, t2:chararray, t3:chararray)"; + Object val = new BasicDBObject() + .append("t1", "t1_value") + .append("t2", "t2_value"); + + Object result = BSONLoader.readField(val, getSchema(userSchema)); + + Tuple t = (Tuple) result; + assertEquals(3, t.size()); + assertEquals("t1_value", t.get(0)); + assertEquals("t2_value", t.get(1)); + assertNull(t.get(2)); + } + + @Test + public void testSimpleTupleIncorrectFieldType() throws Exception { + String userSchema = "t:tuple(t1:chararray, t2:float)"; + Object val = new BasicDBObject() + .append("t1", "t1_value") + .append("t2", "t2_value"); + + Object result = BSONLoader.readField(val, getSchema(userSchema)); + + Tuple t = (Tuple) result; + assertEquals(2, t.size()); + assertEquals("t1_value", t.get(0)); + assertNull(t.get(1)); + } + + @Test + public void testSimpleBag() throws Exception { + String userSchema = "b:{t:tuple(t1:chararray, t2:chararray)}"; + BasicDBList bag = new BasicDBList(); + bag.add(new BasicDBObject() + .append("t1", "t11_value") + .append("t2", "t12_value")); + bag.add(new BasicDBObject() + .append("t1", "t21_value") + .append("t2", "t22_value")); + + Object result = BSONLoader.readField(bag, getSchema(userSchema)); + + DataBag b = (DataBag) result; + Iterator bit = b.iterator(); + + Tuple firstInnerT = bit.next(); + assertEquals(2, firstInnerT.size()); + assertEquals("t11_value", firstInnerT.get(0)); + assertEquals("t12_value", firstInnerT.get(1)); + + Tuple secondInnerT = bit.next(); + assertEquals(2, secondInnerT.size()); + assertEquals("t21_value", secondInnerT.get(0)); + assertEquals("t22_value", secondInnerT.get(1)); + + assertFalse(bit.hasNext()); + } + + @Test + public void testBagThatIsNotABag() throws Exception { + String userSchema = "b:{t:tuple(t1:chararray, t2:chararray)}"; + BasicDBObject notABag = new BasicDBObject(); + notABag.append("f1", new BasicDBObject() + .append("t1", "t11_value") + .append("t2", "t12_value")); + notABag.append("f2", new BasicDBObject() + .append("t1", "t21_value") + .append("t2", "t22_value")); + + Object result = BSONLoader.readField(notABag, getSchema(userSchema)); + assertNull(result); + } + + @Test + public void testDeepness() throws Exception { + String userSchema = "b:{t:tuple(t1:chararray, b:{t:tuple(i1:int, i2:int)})}"; + + BasicDBList innerBag = new BasicDBList(); + innerBag.add(new BasicDBObject() + .append("i1", 1) + .append("i2", 2)); + innerBag.add(new BasicDBObject() + .append("i1", 3) + .append("i2", 4)); + + BasicDBList bag = new BasicDBList(); + bag.add(new BasicDBObject() + .append("t1", "t1_value") + .append("b", innerBag)); + + DataBag result = (DataBag) BSONLoader.readField(bag, getSchema(userSchema)); + assertEquals(1, result.size()); + + Iterator bit = result.iterator(); + Tuple t = bit.next(); + + assertEquals(2, t.size()); + + DataBag innerBagResult = (DataBag) t.get(1); + assertEquals(2, innerBagResult.size()); + + Iterator innerBit = innerBagResult.iterator(); + Tuple innerT = innerBit.next(); + + assertEquals(2, innerT.get(1)); + } + + @Test + public void testSimpleMap() throws Exception { + //String userSchema = "m:[int]"; + // Note: before pig 0.9, explicitly setting the type for + // map keys was not allowed, so can't test that here :( + String userSchema = "m:[]"; + BasicDBObject obj = new BasicDBObject() + .append("k1", 1) + .append("k2", 2); + + Map m = (Map) BSONLoader.readField(obj, getSchema(userSchema)); + + assertEquals(2, m.size()); + assertEquals(1, m.get("k1")); + assertEquals(2, m.get("k2")); + } + + @Test + public void testMapWithTuple() throws Exception { + //String userSchema = "m:[(t1:chararray, t2:int)]"; + // Note: before pig 0.9, explicitly setting the type for + // map keys was not allowed, so can't test that here :( + String userSchema = "m:[]"; + BasicDBObject v1 = new BasicDBObject() + .append("t1", "t11 value") + .append("t2", 12); + BasicDBObject v2 = new BasicDBObject() + .append("t1", "t21 value") + .append("t2", 22); + BasicDBObject obj = new BasicDBObject() + .append("v1", v1) + .append("v2", v2); + + Map m = (Map) BSONLoader.readField(obj, getSchema(userSchema)); + + assertEquals(2, m.size()); + + /* We can't safely cast to Tuple here + * because pig < 0.9 doesn't allow setting types. + * Skip for now. + + Tuple t1 = (Tuple) m.get("v1"); + assertEquals("t11 value", t1.get(0)); + assertEquals(12, t1.get(1)); + + Tuple t2 = (Tuple) m.get("v2"); + assertEquals("t21 value", t2.get(0)); + */ + } + + @Test + public void mapWithMap_noSchema() throws Exception { + BasicDBObject v1 = new BasicDBObject() + .append("t1", "t11 value") + .append("t2", 12); + BasicDBObject v2 = new BasicDBObject() + .append("t1", "t21 value") + .append("t2", 22); + BasicDBObject obj = new BasicDBObject() + .append("v1", v1) + .append("v2", v2); + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + + assertEquals(2, m.size()); + + Map m1 = (Map) m.get("v1"); + assertEquals("t11 value", m1.get("t1")); + assertEquals(12, m1.get("t2")); + + Map m2 = (Map) m.get("v2"); + assertEquals("t21 value", m2.get("t1")); + } + + @Test + public void mapWithList() throws Exception { + BasicDBObject v1 = new BasicDBObject() + .append("t1", "t1 value") + .append("t2", 12); + BasicDBObject v2 = new BasicDBObject() + .append("t1", "t1 value") + .append("t2", 22); + BasicDBList vl = new BasicDBList(); + vl.add(v1); + vl.add(v2); + + BasicDBObject obj = new BasicDBObject() + .append("some_list", vl); + + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + assertEquals(1, m.size()); + + DataBag bag = (DataBag) m.get("some_list"); + assertEquals(2, bag.size()); + + Iterator bit = bag.iterator(); + Tuple t = bit.next(); + + assertEquals(1, t.size()); + + Map innerMap = (Map) t.get(0); + assertEquals("t1 value", innerMap.get("t1")); + } + + @Test + public void testReadField_mapWithSimpleList_noSchema() throws Exception { + BasicDBList vl = new BasicDBList(); + vl.add("v1"); + vl.add("v2"); + + BasicDBObject obj = new BasicDBObject() + .append("some_list", vl); + + Map m = (Map) BSONLoader.convertBSONtoPigType(obj); + + assertEquals(1, m.size()); + + DataBag bag = (DataBag) m.get("some_list"); + assertEquals(2, bag.size()); + + Iterator bit = bag.iterator(); + Tuple t = bit.next(); + + assertEquals(1, t.size()); + assertEquals("v1", t.get(0)); + + t = bit.next(); + assertEquals(1, t.size()); + assertEquals("v2", t.get(0)); + } + + private ResourceFieldSchema getSchema(String schema) throws Exception { + return new ResourceSchema(Utils.getSchemaFromString(schema)).getFields()[0]; + } +} diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java new file mode 100644 index 00000000..f05dbd50 --- /dev/null +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java @@ -0,0 +1,57 @@ +package com.mongodb.hadoop.pig; + +import static org.junit.Assert.assertEquals; + +import java.util.Date; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.test.Util; +import org.junit.AfterClass; +import org.junit.Test; + +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; + +public class MongoInsertStorageTest { + private static PigServer pigServerLocal = null; + private static String dbName = "MongoInsertStorageTest-" + new Date().getTime(); + + @AfterClass + public static void tearDown() throws Exception { + new MongoClient().dropDatabase(dbName); + } + + @Test + public void testInsert() throws Exception { + String[] input = { + "f11\t1", + "f12\t2" + }; + Util.createLocalInputFile("simple_input", input); + + pigServerLocal = new PigServer(ExecType.LOCAL); + pigServerLocal.registerQuery("A = LOAD 'simple_input' as (f1:chararray, f2:int);"); + pigServerLocal.registerQuery(String.format( + "STORE A INTO 'mongodb://localhost:27017/%s.%s' USING com.mongodb.hadoop.pig.MongoInsertStorage();", dbName, "insert_simple")); + pigServerLocal.setBatchOn(); + pigServerLocal.executeBatch(); + + MongoClient mc = new MongoClient(); + DBCollection col = mc.getDB(dbName).getCollection("insert_simple"); + + DBCursor cursor = col.find(); + + assertEquals(2, cursor.size()); + DBObject result1 = cursor.next(); + assertEquals("f11", result1.get("f1")); + assertEquals(1, result1.get("f2")); + DBObject result2 = cursor.next(); + assertEquals("f12", result2.get("f1")); + assertEquals(2, result2.get("f2")); + } + + +} diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java index 5d8e4b17..26d0e2d2 100644 --- a/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java @@ -1,312 +1,165 @@ package com.mongodb.hadoop.pig; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Date; +import java.util.Iterator; +import java.util.Map; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; +import org.junit.AfterClass; import org.junit.Test; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.MongoClient; @SuppressWarnings("rawtypes") public class MongoLoaderTest { - @Test - public void testSimpleChararray() throws IOException { - String userSchema = "d:chararray"; - MongoLoader ml = new MongoLoader(userSchema); + private static PigServer pigServerLocal = null; + private static String dbName = "MongoLoaderTest-" + new Date().getTime(); - Object result = BSONLoader.readField("value", ml.getFields()[0]); - assertEquals("value", result); + @AfterClass + public static void tearDown() throws Exception { + new MongoClient().dropDatabase(dbName); } - @Test - public void testSimpleFloat() throws IOException { - String userSchema = "d:float"; - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(1.1F, ml.getFields()[0]); - assertEquals(1.1F, result); + private void insertData(String collectionName, BasicDBObject... objs) throws Exception { + MongoClient mc = new MongoClient(); + DBCollection col = mc.getDB(dbName).getCollection(collectionName); + col.insert(objs); } - - @Test - public void testSimpleFloatAsDouble() throws IOException { - String userSchema = "d:float"; - MongoLoader ml = new MongoLoader(userSchema); - Object result = BSONLoader.readField(1.1D, ml.getFields()[0]); - assertEquals(1.1F, result); - } - @Test - public void testSimpleTuple() throws IOException { - String userSchema = "t:tuple(t1:chararray, t2:chararray)"; - Object val = new BasicDBObject() - .append("t1", "t1_value") - .append("t2", "t2_value"); - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(val, ml.getFields()[0]); - - Tuple t = (Tuple) result; - assertEquals(2, t.size()); - assertEquals("t1_value", t.get(0)); - assertEquals("t2_value", t.get(1)); + public void testSimpleChararray_fullRun() throws Exception { + BasicDBObject obj1 = new BasicDBObject("a", "value1"); + BasicDBObject obj2 = new BasicDBObject("a", "value2"); + insertData("testSimpleChararray", obj1, obj2); + + pigServerLocal = new PigServer(ExecType.LOCAL); + + //Test no schema + pigServerLocal.registerQuery(String.format( + "A = LOAD 'mongodb://localhost:27017/%s.%s' using com.mongodb.hadoop.pig.MongoLoader();", dbName, "testSimpleChararray")); + + Iterator iter = pigServerLocal.openIterator("A"); + assertTrue(iter.hasNext()); + Tuple result1 = iter.next(); + assertTrue(iter.hasNext()); + Tuple result2 = iter.next(); + assertFalse(iter.hasNext()); + + Map map1 = (Map) result1.get(0); + assertEquals(obj1.getString("a"), map1.get("a")); + assertEquals(obj1.getObjectId("_id"), map1.get("_id")); + + Map map2 = (Map) result2.get(0); + assertEquals(obj2.getString("a"), map2.get("a")); + assertEquals(obj2.getObjectId("_id"), map2.get("_id")); + + + //Test Schema and idAlias + pigServerLocal.registerQuery(String.format( + "B = LOAD 'mongodb://localhost:27017/%s.%s' using com.mongodb.hadoop.pig.MongoLoader('mongo_id:chararray, a:chararray', 'mongo_id');", dbName, "testSimpleChararray")); + + iter = pigServerLocal.openIterator("B"); + assertTrue(iter.hasNext()); + result1 = iter.next(); + assertTrue(iter.hasNext()); + result2 = iter.next(); + assertFalse(iter.hasNext()); + + assertEquals(obj1.getObjectId("_id"), result1.get(0)); + assertEquals(obj1.getString("a"), result1.get(1)); + assertEquals(obj2.getObjectId("_id"), result2.get(0)); + assertEquals(obj2.getString("a"), result2.get(1)); + + + //Test loadaschararray + pigServerLocal.registerQuery(String.format( + "C = LOAD 'mongodb://localhost:27017/%s.%s' using com.mongodb.hadoop.pig.MongoLoader('s:chararray', '', '--loadaschararray');", dbName, "testSimpleChararray")); + + iter = pigServerLocal.openIterator("C"); + assertTrue(iter.hasNext()); + result1 = iter.next(); + assertTrue(iter.hasNext()); + result2 = iter.next(); + assertFalse(iter.hasNext()); + + assertEquals( String.format("{ \"_id\" : { \"$oid\" : \"%s\"} , \"a\" : \"%s\"}", obj1.getObjectId("_id"), obj1.getString("a")) + , result1.get(0)); + assertEquals( String.format("{ \"_id\" : { \"$oid\" : \"%s\"} , \"a\" : \"%s\"}", obj2.getObjectId("_id"), obj2.getString("a")) + , result2.get(0)); } @Test - public void testSimpleTupleMissingField() throws IOException { - String userSchema = "t:tuple(t1:chararray, t2:chararray, t3:chararray)"; - Object val = new BasicDBObject() - .append("t1", "t1_value") - .append("t2", "t2_value"); - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(val, ml.getFields()[0]); - - Tuple t = (Tuple) result; - assertEquals(3, t.size()); - assertEquals("t1_value", t.get(0)); - assertEquals("t2_value", t.get(1)); - assertNull(t.get(2)); - } - - @Test - public void testSimpleTupleIncorrectFieldType() throws IOException { - String userSchema = "t:tuple(t1:chararray, t2:float)"; - Object val = new BasicDBObject() - .append("t1", "t1_value") - .append("t2", "t2_value"); - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(val, ml.getFields()[0]); - - Tuple t = (Tuple) result; - assertEquals(2, t.size()); - assertEquals("t1_value", t.get(0)); - assertNull(t.get(1)); - } - - @Test - public void testSimpleBag() throws IOException { - String userSchema = "b:{t:tuple(t1:chararray, t2:chararray)}"; - BasicDBList bag = new BasicDBList(); - bag.add(new BasicDBObject() - .append("t1", "t11_value") - .append("t2", "t12_value")); - bag.add(new BasicDBObject() - .append("t1", "t21_value") - .append("t2", "t22_value")); - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(bag, ml.getFields()[0]); - - DataBag b = (DataBag) result; - Iterator bit = b.iterator(); - - Tuple firstInnerT = bit.next(); - assertEquals(2, firstInnerT.size()); - assertEquals("t11_value", firstInnerT.get(0)); - assertEquals("t12_value", firstInnerT.get(1)); - - Tuple secondInnerT = bit.next(); - assertEquals(2, secondInnerT.size()); - assertEquals("t21_value", secondInnerT.get(0)); - assertEquals("t22_value", secondInnerT.get(1)); - - assertFalse(bit.hasNext()); - } - - @Test - public void testBagThatIsNotABag() throws IOException { - String userSchema = "b:{t:tuple(t1:chararray, t2:chararray)}"; - BasicDBObject notABag = new BasicDBObject(); - notABag.append("f1", new BasicDBObject() - .append("t1", "t11_value") - .append("t2", "t12_value")); - notABag.append("f2", new BasicDBObject() - .append("t1", "t21_value") - .append("t2", "t22_value")); - MongoLoader ml = new MongoLoader(userSchema); - - Object result = BSONLoader.readField(notABag, ml.getFields()[0]); - assertNull(result); - } - - @Test - public void testDeepness() throws IOException { - String userSchema = "b:{t:tuple(t1:chararray, b:{t:tuple(i1:int, i2:int)})}"; - + public void testEmbeddedObject_fullRun() throws Exception { + // Input Data: {"f1":"v1", "f2":2, "f3": [1,2,3], "f4":{"i1":"inner1","i2":"inner2"}} BasicDBList innerBag = new BasicDBList(); - innerBag.add(new BasicDBObject() - .append("i1", 1) - .append("i2", 2)); - innerBag.add(new BasicDBObject() - .append("i1", 3) - .append("i2", 4)); - - BasicDBList bag = new BasicDBList(); - bag.add(new BasicDBObject() - .append("t1", "t1_value") - .append("b", innerBag)); - - MongoLoader ml = new MongoLoader(userSchema); - - DataBag result = (DataBag) BSONLoader.readField(bag, ml.getFields()[0]); - assertEquals(1, result.size()); - - Iterator bit = result.iterator(); - Tuple t = bit.next(); - - assertEquals(2, t.size()); - - DataBag innerBagResult = (DataBag) t.get(1); - assertEquals(2, innerBagResult.size()); - - Iterator innerBit = innerBagResult.iterator(); - Tuple innerT = innerBit.next(); - - assertEquals(2, innerT.get(1)); - } - - @Test - public void testSimpleMap() throws Exception { - //String userSchema = "m:[int]"; - // Note: before pig 0.9, explicitly setting the type for - // map keys was not allowed, so can't test that here :( - String userSchema = "m:[]"; + innerBag.addAll(Arrays.asList(1,2,3)); + BasicDBObject innerObj = new BasicDBObject() + .append("i1", "inner1") + .append("i2", 2); BasicDBObject obj = new BasicDBObject() - .append("k1", 1) - .append("k2", 2); - - MongoLoader ml = new MongoLoader(userSchema); - Map m = (Map) BSONLoader.readField(obj, ml.getFields()[0]); - - assertEquals(2, m.size()); - assertEquals(1, m.get("k1")); - assertEquals(2, m.get("k2")); - } - - @Test - public void testMapWithTuple() throws Exception { - //String userSchema = "m:[(t1:chararray, t2:int)]"; - // Note: before pig 0.9, explicitly setting the type for - // map keys was not allowed, so can't test that here :( - String userSchema = "m:[]"; - BasicDBObject v1 = new BasicDBObject() - .append("t1", "t11 value") - .append("t2", 12); - BasicDBObject v2 = new BasicDBObject() - .append("t1", "t21 value") - .append("t2", 22); - BasicDBObject obj = new BasicDBObject() - .append("v1", v1) - .append("v2", v2); - - MongoLoader ml = new MongoLoader(userSchema); - Map m = (Map) BSONLoader.readField(obj, ml.getFields()[0]); - - assertEquals(2, m.size()); - - /* We can't safely cast to Tuple here - * because pig < 0.9 doesn't allow setting types. - * Skip for now. - - Tuple t1 = (Tuple) m.get("v1"); - assertEquals("t11 value", t1.get(0)); - assertEquals(12, t1.get(1)); - - Tuple t2 = (Tuple) m.get("v2"); - assertEquals("t21 value", t2.get(0)); - */ - } - - @Test - public void mapWithMap_noSchema() throws Exception { - BasicDBObject v1 = new BasicDBObject() - .append("t1", "t11 value") - .append("t2", 12); - BasicDBObject v2 = new BasicDBObject() - .append("t1", "t21 value") - .append("t2", 22); - BasicDBObject obj = new BasicDBObject() - .append("v1", v1) - .append("v2", v2); - - Map m = (Map) BSONLoader.convertBSONtoPigType(obj); - - assertEquals(2, m.size()); - - Map m1 = (Map) m.get("v1"); - assertEquals("t11 value", m1.get("t1")); - assertEquals(12, m1.get("t2")); - - Map m2 = (Map) m.get("v2"); - assertEquals("t21 value", m2.get("t1")); - } - - @Test - public void mapWithList() throws Exception { - BasicDBObject v1 = new BasicDBObject() - .append("t1", "t1 value") - .append("t2", 12); - BasicDBObject v2 = new BasicDBObject() - .append("t1", "t1 value") - .append("t2", 22); - BasicDBList vl = new BasicDBList(); - vl.add(v1); - vl.add(v2); - - BasicDBObject obj = new BasicDBObject() - .append("some_list", vl); - - - Map m = (Map) BSONLoader.convertBSONtoPigType(obj); - assertEquals(1, m.size()); - - DataBag bag = (DataBag) m.get("some_list"); - assertEquals(2, bag.size()); - - Iterator bit = bag.iterator(); - Tuple t = bit.next(); - - assertEquals(1, t.size()); - - Map innerMap = (Map) t.get(0); - assertEquals("t1 value", innerMap.get("t1")); - } - - @Test - public void testReadField_mapWithSimpleList_noSchema() throws Exception { - BasicDBList vl = new BasicDBList(); - vl.add("v1"); - vl.add("v2"); - - BasicDBObject obj = new BasicDBObject() - .append("some_list", vl); - - Map m = (Map) BSONLoader.convertBSONtoPigType(obj); - - assertEquals(1, m.size()); - - DataBag bag = (DataBag) m.get("some_list"); - assertEquals(2, bag.size()); - - Iterator bit = bag.iterator(); - Tuple t = bit.next(); - - assertEquals(1, t.size()); - assertEquals("v1", t.get(0)); - - t = bit.next(); - assertEquals(1, t.size()); - assertEquals("v2", t.get(0)); + .append("f1", "v1") + .append("f2", 2) + .append("f3", innerBag) + .append("f4", innerObj) + ; + insertData("testEmbeddedObject", obj); + + pigServerLocal = new PigServer(ExecType.LOCAL); + + //Test no schema + pigServerLocal.registerQuery(String.format( + "A = LOAD 'mongodb://localhost:27017/%s.%s' using com.mongodb.hadoop.pig.MongoLoader();", dbName, "testEmbeddedObject")); + + Iterator iter = pigServerLocal.openIterator("A"); + assertTrue(iter.hasNext()); + Tuple result1 = iter.next(); + assertFalse(iter.hasNext()); + + Map map1 = (Map) result1.get(0); + assertEquals(obj.getString("_id"), map1.get("_id")); + assertEquals(obj.getString("f1"), map1.get("f1")); + assertEquals(obj.getInt("f2"), map1.get("f2")); + + DataBag bag = (DataBag) map1.get("f3"); + assertEquals(innerBag.size(), bag.size()); + assertEquals(1, bag.iterator().next().get(0)); + + Map innerMap = (Map) map1.get("f4"); + assertEquals(innerObj.get("i1"), innerMap.get("i1")); + assertEquals(innerObj.get("i2"), innerMap.get("i2")); + + + //Test Schema and idAlias + pigServerLocal.registerQuery(String.format( + "B = LOAD 'mongodb://localhost:27017/%s.%s' using com.mongodb.hadoop.pig.MongoLoader('mongo_id:chararray, f1:chararray, f2:int, f3:bag{}, f4:(i1:chararray, i2:int)', 'mongo_id');", dbName, "testEmbeddedObject")); + + iter = pigServerLocal.openIterator("B"); + assertTrue(iter.hasNext()); + result1 = iter.next(); + assertFalse(iter.hasNext()); + + assertEquals(obj.getObjectId("_id"), result1.get(0)); + assertEquals(obj.getString("f1"), result1.get(1)); + assertEquals(obj.get("f2"), result1.get(2)); + + bag = (DataBag) result1.get(3); + assertEquals(innerBag.size(), bag.size()); + assertEquals(1, bag.iterator().next().get(0)); + + Tuple innerTuple = (Tuple) result1.get(4); + assertEquals(innerObj.get("i1"), innerTuple.get(0)); + assertEquals(innerObj.get("i2"), innerTuple.get(1)); } diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java new file mode 100644 index 00000000..031ed2d9 --- /dev/null +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java @@ -0,0 +1,76 @@ +package com.mongodb.hadoop.pig; + +import static org.junit.Assert.assertEquals; + +import java.util.Date; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.test.Util; +import org.junit.AfterClass; +import org.junit.Test; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; + +public class MongoUpdateStorageTest { + private static PigServer pigServerLocal = null; + private static String dbName = "MongoUpdateStorageTest-" + new Date().getTime(); + + @AfterClass + public static void tearDown() throws Exception { + new MongoClient().dropDatabase(dbName); + } + + @Test + public void testUpdate() throws Exception { + BasicDBObject obj1 = new BasicDBObject() + .append("f1", "a") + .append("f2", "value1"); + BasicDBObject obj2 = new BasicDBObject() + .append("f1", "b") + .append("f2", "value2"); + insertData("testUpdate", obj1, obj2); + + String[] input = { + "a\tnewValue1\t1", + "b\tnewValue2\t2" + }; + Util.createLocalInputFile("simple_input", input); + + pigServerLocal = new PigServer(ExecType.LOCAL); + pigServerLocal.registerQuery("A = LOAD 'simple_input' as (f1:chararray, f2:chararray, f3:int);"); + pigServerLocal.registerQuery(String.format( + "STORE A INTO 'mongodb://localhost:27017/%s.%s' USING com.mongodb.hadoop.pig.MongoUpdateStorage(" + + " '{f1:\"\\\\$f1\"}'," + + " '{\\\\$set:{f2:\"\\\\$f2\", f3:\"\\\\$f3\"}}'," + + " 'f1:chararray, f2:chararray, f3:int'" + + ");", dbName, "update_simple")); + pigServerLocal.setBatchOn(); + pigServerLocal.executeBatch(); + + MongoClient mc = new MongoClient(); + DBCollection col = mc.getDB(dbName).getCollection("update_simple"); + + DBCursor cursor = col.find(); + + assertEquals(2, cursor.size()); + DBObject result1 = cursor.next(); + assertEquals("a", result1.get("f1")); + assertEquals("newValue1", result1.get("f2")); + assertEquals(1, result1.get("f3")); + DBObject result2 = cursor.next(); + assertEquals("b", result2.get("f1")); + assertEquals("newValue2", result2.get("f2")); + assertEquals(2, result2.get("f3")); + } + + private void insertData(String collectionName, BasicDBObject... objs) throws Exception { + MongoClient mc = new MongoClient(); + DBCollection col = mc.getDB(dbName).getCollection(collectionName); + col.insert(objs); + } +} diff --git a/pig/src/test/resources/testEmbeddedObject.bson b/pig/src/test/resources/testEmbeddedObject.bson new file mode 100644 index 0000000000000000000000000000000000000000..e39f8607616f5b7a4471ce0ee0c60a71e93fc7cf GIT binary patch literal 103 zcmYdiU|?X6&rD$mHrcCfz%qkL$K;MOQ<@nWJxn-kOI;I1`LcK m$`C9nU<6jjz?^2nAOjR;$~0tP2hy2&d8tJ}&6yC*3=9AZe+>En literal 0 HcmV?d00001 diff --git a/pig/src/test/resources/testSimpleChararray.bson b/pig/src/test/resources/testSimpleChararray.bson new file mode 100644 index 0000000000000000000000000000000000000000..ec732313f6c3d8f00c46ad107216500804a32e81 GIT binary patch literal 72 zcmY#kU|?X6&rD$mHrXq0z%qkL$K;M8Qz8R9P`oTLr!>`&fk6d}8YN^kMhpx9BV7+Y literal 0 HcmV?d00001 From 70dfaae7c3aaaf0e18378361102821e64ba789d9 Mon Sep 17 00:00:00 2001 From: Jeremy Karn Date: Thu, 3 Apr 2014 13:45:45 -0400 Subject: [PATCH 9/9] Remove dependency on Pig test class --- .../hadoop/pig/MongoInsertStorageTest.java | 3 -- .../hadoop/pig/MongoUpdateStorageTest.java | 1 - .../java/com/mongodb/hadoop/pig/Util.java | 43 +++++++++++++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 pig/src/test/java/com/mongodb/hadoop/pig/Util.java diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java index f05dbd50..337355f3 100644 --- a/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoInsertStorageTest.java @@ -6,7 +6,6 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.test.Util; import org.junit.AfterClass; import org.junit.Test; @@ -52,6 +51,4 @@ public void testInsert() throws Exception { assertEquals("f12", result2.get("f1")); assertEquals(2, result2.get("f2")); } - - } diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java b/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java index 031ed2d9..86666f96 100644 --- a/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java +++ b/pig/src/test/java/com/mongodb/hadoop/pig/MongoUpdateStorageTest.java @@ -6,7 +6,6 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.test.Util; import org.junit.AfterClass; import org.junit.Test; diff --git a/pig/src/test/java/com/mongodb/hadoop/pig/Util.java b/pig/src/test/java/com/mongodb/hadoop/pig/Util.java new file mode 100644 index 00000000..d22c037d --- /dev/null +++ b/pig/src/test/java/com/mongodb/hadoop/pig/Util.java @@ -0,0 +1,43 @@ +/* + * Copyright 2011 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.hadoop.pig; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +public class Util { + public static File createLocalInputFile(String filename, String[] inputData) + throws IOException { + File f = new File(filename); + f.deleteOnExit(); + writeToFile(f, inputData); + return f; + } + + public static void writeToFile(File f, String[] inputData) + throws IOException { + PrintWriter pw = new PrintWriter(new OutputStreamWriter(new + FileOutputStream(f), "UTF-8")); + for (int i=0; i