Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mkdbns committed Jul 6, 2017
1 parent 9752e82 commit 1c90a16
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions lib/modules/directChannels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const _ = require('lodash')
const { Transform } = require('stream')
const Factory = require('../factory')
const Utils = require('./utils')

module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
//
// Array to accumulate the unique
// pair of users
//
var members = []
//
// Set up the transform stream
//
const toDirectChannel = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform: function(pairs, encoding, callback) {
//
// Convert the jabber jid to usernames and
// sort them so it's easier to dedup. We
// purposely allow an uncaught exception to be
// thrown if the user lookup fails since
// it should never happen. We want to terminate
// the export
//
let sorted = _.sortBy(
_.map(pairs, function(value) {
return context.values.users[Utils.realJID(value)].username
})
)
//
// Ensure that we don't have a pair with the same
// usernames
//
if(sorted[0] !== sorted[1]) {
members.push(sorted)
}
//
// Invoke the call to mark that we are
// done with the chunk
//
return callback()
}
}).on('finish', function() {
_.map(
_.uniqBy(members, function(value) {
return `${value[0]}|${value}[1]`
}), function(value) {
context.output.write(
Factory.directChannel({
members: value
})
)
}
)

console.log('direct channels: ... finished')

resolve(context)
})

console.log('direct channels: streaming records')

//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber
.pipe(`
SELECT DISTINCT to_jid, from_jid FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
toDirectChannel
)
})
}

0 comments on commit 1c90a16

Please sign in to comment.