Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support entity canon zone / Workaround Duplicate Key / Support hint / Support BSON / Remove Mongo 3 Warnings #71

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# ...

* Support multiple mongodb databases usage based on entity canon zone. Using a default DB when no zone are available in entity canon.
* Workaround the duplicate key issue of mongo in saving with upsert by automatically retry save (see https://jira.mongodb.org/browse/SERVER-14322)
* Support hint$ in query to help mongo to select the best index to use.

# 1.4.0

# 1.1.0 - 27.08.2016

* Added Seneca 3 and Node 6 support
Expand Down
124 changes: 75 additions & 49 deletions mongo-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

var _ = require('lodash')
var Mongo = require('mongodb')
var Dot = require('mongo-dot-notation')
var MongoClient = Mongo.MongoClient
var ObjectID = Mongo.ObjectID

Expand Down Expand Up @@ -83,9 +82,14 @@ function metaquery(qent, q) {
}

if (q.fields$) {
mq.fields = q.fields$
mq.projection = q.fields$
}
} else {

if (q.hint$) {
mq.hint = q.hint$
}
}
else {
mq = _.isArray(q.native$) ? q.native$[1] : mq
}

Expand All @@ -96,8 +100,8 @@ module.exports = function(opts) {
var seneca = this
var desc

var dbinst = null
var dbclient = null
var defaultDB
var collmap = {}

function error(args, err, cb) {
Expand All @@ -124,19 +128,19 @@ module.exports = function(opts) {
conf.uri += conf.port ? ':' + conf.port : ':27017'
}

conf.db = conf.db || conf.name
// Keeping trace of the default database name to use throughout the plugin if canon entity's zone is not defined
defaultDB = conf.db = conf.db || conf.name

// Connect using the URI
MongoClient.connect(
conf.uri,
{ useNewUrlParser: true },
function(err, client) {
if (err) {
return seneca.die('connect', err, conf)
}
dbclient = client
// Set the instance to use throughout the plugin
dbinst = client.db(conf.db)
seneca.log.debug('init', 'db open', conf.db)
seneca.log.debug('init', 'db connect', conf.uri)
cb(null)
}
)
Expand All @@ -145,17 +149,23 @@ module.exports = function(opts) {
function getcoll(args, ent, cb) {
var canon = ent.canon$({ object: true })

var zone = canon.zone ? canon.zone : defaultDB
var collname = (canon.base ? canon.base + '_' : '') + canon.name

if (!collmap[collname]) {
dbinst.collection(collname, function(err, coll) {
if (_.isEmpty(zone)) cb(new Error('No canon/zone define in entity ' + collname + ' and no default database defined.'))

collmap[zone] = collmap[zone] || {}
if (!collmap[zone][collname]) {
let db = dbclient.db(zone)
db.collection(collname, function (err, coll) {
if (!error(args, err, cb)) {
collmap[collname] = coll
cb(null, coll)
collmap[zone][collname] = coll
cb(null, coll, db)
}
})
} else {
cb(null, collmap[collname])
}
else {
cb(null, collmap[zone][collname])
}
}

Expand Down Expand Up @@ -215,11 +225,11 @@ module.exports = function(opts) {
var func = 'replaceOne'

if (shouldMerge) {
set = Dot.flatten(entp)
set = { $set: entp }
func = 'updateOne'
}

coll[func](q, set, { upsert: true }, function(err) {
var handle = function (err) {
if (!error(args, err, cb)) {
seneca.log.debug('save/update', ent, desc)
coll.findOne(q, {}, function(err, entu) {
Expand All @@ -234,13 +244,22 @@ module.exports = function(opts) {
}
})
}
}

coll[func](q, set, { upsert: true }, function(err) {
// https://jira.mongodb.org/browse/SERVER-14322 => catch duplicate key and retry one time
if (err && err.message.includes('E11000')) {
seneca.log.warn('Duplicate key caught:', err.message)
// retry
coll[func](q, set, { upsert: true }, handle)
} else handle(err)
})
}
}
})
},

load: function(args, cb) {
load: function (args, cb) {
var qent = args.qent
var q = args.q

Expand Down Expand Up @@ -278,19 +297,17 @@ module.exports = function(opts) {
if (!error(args, err, cb)) {
var list = []

cur.each(function(err, entp) {
if (!error(args, err, cb)) {
if (entp) {
var fent = null
entp.id = idstr(entp._id)
delete entp._id
fent = qent.make$(entp)
list.push(fent)
} else {
seneca.log.debug('list', q, list.length, list[0], desc)
cb(null, list)
}
}
cur.forEach(function(entp) {
var fent = null
entp.id = idstr(entp._id)
delete entp._id
fent = qent.make$(entp)
list.push(fent)
}).then(function() {
seneca.log.debug('list', q, list.length, list[0], desc)
cb(null, list)
}).catch(function(err) {
error(args, err, cb)
})
}
})
Expand All @@ -316,24 +333,24 @@ module.exports = function(opts) {
var list = []
var toDelete = []

cur.each(function(err, entp) {
if (!error(args, err, cb)) {
cur.forEach(function(entp) {
if (entp) {
var fent = null
if (entp) {
var fent = null
if (entp) {
toDelete.push(entp._id)
entp.id = idstr(entp._id)
delete entp._id
fent = qent.make$(entp)
}
list.push(fent)
} else {
coll.remove({ _id: { $in: toDelete } }, function(err) {
seneca.log.debug('remove/all', q, desc)
cb(err, null)
})
toDelete.push(entp._id)
entp.id = idstr(entp._id)
delete entp._id
fent = qent.make$(entp)
}
list.push(fent)
}
}).then(function() {
coll.deleteMany({ _id: { $in: toDelete } }, function(err) {
seneca.log.debug('remove/all', q, desc)
cb(err, null)
})
}).catch(function(err) {
error(args, err, cb)
})
}
})
Expand All @@ -342,6 +359,8 @@ module.exports = function(opts) {
if (!error(args, err, cb)) {
if (entp) {
coll.deleteOne({ _id: entp._id }, {}, function(err) {
entp.id = idstr(entp._id)
delete entp._id
seneca.log.debug('remove/one', q, entp, desc)
var ent = load ? entp : null
cb(err, ent)
Expand All @@ -354,13 +373,20 @@ module.exports = function(opts) {
})
},

native: function(args, done) {
dbinst.collection('seneca', function(err, coll) {
native: function (args, done) {
var zone = defaultDB
if (args.ent) {
var canon = args.ent.canon$({object: true})
zone = canon.zone ? canon.zone : defaultDB
}
var db = dbclient.db(zone)
db.collection('seneca', function (err, coll) {
if (!error(args, err, done)) {
coll.findOne({}, {}, function(err) {
if (!error(args, err, done)) {
done(null, dbinst)
} else {
done(null, db)
}
else {
done(err)
}
})
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"description": "Seneca data store plugin for MongoDB",
"main": "mongo-store.js",
"scripts": {
"test": "lab -v -P test -L -t 85 -I URL,URLSearchParams,SharedArrayBuffer,Atomics,BigUint64Array,BigInt64Array,BigInt",
"test": "lab -v -P test -L -t 84 -I URL,URLSearchParams,SharedArrayBuffer,Atomics,BigUint64Array,BigInt64Array,BigInt",
"prettier": "prettier --write --no-semi --single-quote *.js lib/*.js test/*.js",
"coveralls": "lab -s -P test -r lcov | coveralls",
"coverage": "lab -v -P test -L -t 85 -r html > docs/coverage.html",
"coverage": "lab -v -P test -L -t 84 -r html > docs/coverage.html",
"build": "docker-compose build",
"start": "docker-compose up",
"stop": "docker-compose kill",
Expand Down Expand Up @@ -42,7 +42,6 @@
"license": "MIT",
"dependencies": {
"lodash": "4",
"mongo-dot-notation": "1",
"mongodb": "3"
},
"devDependencies": {
Expand Down