Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #13 from WW-Digital/filter
Browse files Browse the repository at this point in the history
Ability to ignore filtered oplog docs
  • Loading branch information
will123195 authored Dec 3, 2018
2 parents c4c6bfe + b55d6b0 commit 230450e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ const connectionStrings = [
'mongodb://shard1-secondary1/local'
];

const reader = new MongoOplogReader({
const reader = new MongoOplogReader({
redisClient,
workersPerOplog: 1 // total # of redundant workers per oplog (respected across all processes)
});
reader.filter(oplogDocument => oplogDocument.op !== 'u'); // ignore 'update' operations
reader.setConnectionStrings(connectionStrings);
reader.onEvent(data => {
// return a promise to apply backpressure on the oplog stream to prevent a
Expand Down
7 changes: 7 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,17 @@ class MongoOplogReader {
this.eventHandlers = [];
this.throttleDelay = 0;
this.throttleMemUsageThreshold = options.throttleMemUsageThreshold; // decimal percent format, i.e. 0.85
this.filterOp = () => true; // don't filter any ops by default

if (!Number.isInteger(this.workersPerOplog) || this.workersPerOplog > 10 || this.workersPerOplog < 1) {
throw new Error(`workersPerOplog '${this.workersPerOplog}' must be an integer between 1 and 10.`);
}
}

filter(fn) {
this.filterOp = fn;
}

start() {
if (!Array.isArray(this.connectionStrings) || !this.connectionStrings.length) {
throw new Error('There are no connectionStrings.');
Expand Down Expand Up @@ -350,6 +355,8 @@ class MongoOplogReader {
}

processOp(data, replSetName, memberName) {
const skip = !this.filterOp(data);
if (skip) return Promise.resolve();
debug(`processOp: ${replSetName} ${memberName} %o`, data);
const opId = this.getOpId(data);
return this.isOpMajorityDetected(replSetName, memberName, opId)
Expand Down
12 changes: 10 additions & 2 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const reader = new MongoOplogReader({
redisClient: redis.createClient()
});

reader.filter(oplogDocument => oplogDocument.op !== 'u'); // ignore 'update' operations

let opCount = 0;
let asyncOpCount = 0;

Expand All @@ -40,8 +42,14 @@ client.connect().then(() => {
.then(() => db.collection('books').insert({ title: 'Hello 1', rand: Math.random() }))
.then(() => console.log('inserted document 1'))
.then(() => reader.start())
.then(() => db.collection('books').insert({ title: 'Hello 2', rand: Math.random() }))
.then(() => console.log('inserted document 2'))
.then(() =>
db.collection('books').insert({ title: 'Hello 2', rand: Math.random() })
.then(result => {
const query = { _id: result.insertedIds[0] };
console.log('inserted document 2')
return db.collection('books').findOneAndUpdate(query, { $set: { note: 'hi!' } });
})
)
.delay(3000)
.then(() => db.collection('books').insert({ title: 'Hello 3', rand: Math.random() }))
.then(() => console.log('inserted document 3'))
Expand Down

0 comments on commit 230450e

Please sign in to comment.