Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Fix MongoInsertStorage documentation #105

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730
by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;

-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
-- Arg to MongoInsertStorage: field to use as '_id'.
STORE year_10yearavg
INTO 'mongodb://localhost:27017/demo.asfkjabfa'
USING
com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');
com.mongodb.hadoop.pig.MongoInsertStorage('group');



Expand Down Expand Up @@ -315,4 +315,4 @@ After phase two is finished, the result documents look like this (the `logs_coun
],
"logs_count": 1050616
}
```
```
10 changes: 5 additions & 5 deletions pig/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ file, you will also need to set `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAcces

To make each output record be used as an insert into a MongoDB collection, use the `MongoInsertStorage` class supplying the output URI.
For example:
```
STORE <aliasname> INTO 'mongodb://localhost:27017/<db>.<collection>'
USING com.mongodb.hadoop.pig.MongoInsertStorage('<idAlias>');
```

STORE dates_averages INTO 'mongodb://localhost:27017/demo.yield_aggregated' USING com.mongodb.hadoop.pig.MongoInsertStorage('', '' );

The `MongoInsertStorage` class also takes two args: an `idAlias` and a `schema` as described above. If `schema` is left blank, it will
attempt to infer the output schema from the data using the strategy described above. If `idAlias` is left blank, an `ObjectId` will be
generated for the value of the `_id` field in each output document.
The `MongoInsertStorage` class takes an argument: `idAlias` as described above. If `idAlias` is left blank, an `ObjectId` will be generated for the value of the `_id` field in each output document.


### Updating a MongoDB collection
Expand Down
15 changes: 11 additions & 4 deletions pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;

import org.joda.time.DateTime;

import java.util.Date;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -58,7 +61,7 @@ public class BSONStorage extends StoreFunc implements StoreMetadata {
private String idField = null;

private final BSONFileOutputFormat outputFormat = new BSONFileOutputFormat();

public BSONStorage() {
}

Expand Down Expand Up @@ -113,6 +116,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
return o.toString();
case DataType.CHARARRAY:
return o;
case DataType.DATETIME:
return ((DateTime) o).toDate();

//Given a TUPLE, create a Map so BSONEncoder will eat it
case DataType.TUPLE:
Expand All @@ -123,7 +128,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
ResourceFieldSchema[] fs = s.getFields();
Map<String, Object> m = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
m.put(fs[j].getName(), getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore));
String fn = FieldUtils.getEscFieldName(fs[j].getName());
m.put(fn, getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore));
}
return m;

Expand Down Expand Up @@ -159,7 +165,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
for (Tuple t : (DataBag) o) {
Map<String, Object> ma = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
ma.put(fs[j].getName(), t.get(j));
String fn = FieldUtils.getEscFieldName(fs[j].getName());
ma.put(fn, t.get(j));
}
a.add(ma);
}
Expand All @@ -184,7 +191,7 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
@SuppressWarnings("unchecked")
protected void writeField(final BasicDBObjectBuilder builder, final ResourceFieldSchema field, final Object d) throws IOException {
Object convertedType = getTypeForBSON(d, field, null);
String fieldName = field != null ? field.getName() : "value";
String fieldName = field != null ? FieldUtils.getEscFieldName(field.getName()) : "value";

if (convertedType instanceof Map) {
for (Map.Entry<String, Object> mapentry : ((Map<String, Object>) convertedType).entrySet()) {
Expand Down
13 changes: 13 additions & 0 deletions pig/src/main/java/com/mongodb/hadoop/pig/FieldUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.mongodb.hadoop.pig;

public class FieldUtils {
// Escape name that starts with esc_
private static final String ESC_PREFIX = "esc_";

public static String getEscFieldName(String fieldName){
if (fieldName.startsWith(ESC_PREFIX)) {
fieldName = fieldName.replace(ESC_PREFIX, "");
}
return fieldName;
}
}
14 changes: 11 additions & 3 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,32 @@ public class MongoInsertStorage extends StoreFunc implements StoreMetadata {

private String udfcSignature = null;
private String idField = null;
private String toIgnore = null;


private final MongoOutputFormat outputFormat = new MongoOutputFormat();

public MongoInsertStorage() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing (or really here you're just renaming) useUpsert is a breaking change. You'll need to add toIgnore as a third parameter.


public MongoInsertStorage(final String idField, final String useUpsert) {
public MongoInsertStorage(final String idField) {
this.idField = idField;
}

public MongoInsertStorage(final String idField, final String toIgnore) {
this.idField = idField;
this.toIgnore = toIgnore;
}

protected void writeField(final BasicDBObjectBuilder builder,
final ResourceSchema.ResourceFieldSchema field,
final Object d) throws IOException {
Object convertedType = BSONStorage.getTypeForBSON(d, field, null);
Object convertedType = BSONStorage.getTypeForBSON(d, field, this.toIgnore);
if (field.getName() != null && field.getName().equals(this.idField)) {
builder.add("_id", convertedType);
} else {
builder.add(field.getName(), convertedType);
String fieldName = FieldUtils.getEscFieldName(field.getName());
builder.add(fieldName, convertedType);
}

}
Expand Down
28 changes: 16 additions & 12 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ protected void writeField(final BasicDBObjectBuilder builder,
final ResourceSchema.ResourceFieldSchema field,
final Object d) throws IOException {

String fieldName = FieldUtils.getEscFieldName(field.getName());

// If the field is missing or the value is null, write a null
if (d == null) {
builder.add(field.getName(), null);
builder.add(fieldName, null);
return;
}

Expand All @@ -127,35 +129,37 @@ protected void writeField(final BasicDBObjectBuilder builder,
// Based on the field's type, write it out
byte i = field.getType();
if (i == DataType.INTEGER) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.LONG) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.FLOAT) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.DOUBLE) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.DATETIME) {
builder.add(fieldName, d);
} else if (i == DataType.BYTEARRAY) {
builder.add(field.getName(), d.toString());
builder.add(fieldName, d.toString());
} else if (i == DataType.CHARARRAY) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.TUPLE) {
// Given a TUPLE, create a Map so BSONEncoder will eat it
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
ResourceFieldSchema[] fs = s.getFields();
Map<String, Object> m = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
m.put(fs[j].getName(), ((Tuple) d).get(j));
}
builder.add(field.getName(), (Map) m);
builder.add(fieldName, (Map) m);
} else if (i == DataType.BAG) {
// Given a BAG, create an Array so BSONEncoder will eat it.
ResourceFieldSchema[] fs;
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
fs = s.getFields();
if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
Expand All @@ -166,7 +170,7 @@ protected void writeField(final BasicDBObjectBuilder builder,
s = fs[0].getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
fs = s.getFields();

Expand All @@ -179,7 +183,7 @@ protected void writeField(final BasicDBObjectBuilder builder,
a.add(ma);
}

builder.add(field.getName(), a);
builder.add(fieldName, a);
} else if (i == DataType.MAP) {
Map map = (Map) d;
for (Object key : map.keySet()) {
Expand Down