Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mkdbns committed Jul 7, 2017
1 parent 91429fb commit 45cb129
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 48 deletions.
17 changes: 6 additions & 11 deletions lib/modules/directChannels.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const _ = require('lodash')
const { Transform } = require('stream')
const Factory = require('../factory')
const Utils = require('./utils')
Expand All @@ -18,17 +17,13 @@ module.exports = function(context) {
writableObjectMode: true,
transform: function(result, encoding, callback) {
//
// Convert the jabber jid to usernames and
// sort them so it's easier to dedup. We
// purposely allow an uncaught exception to be
// thrown if the user lookup fails since
// it should never happen. We want to terminate
// the export
// Generate the members array for the
// direct channel
//
let members = _.sortBy(
_.map(result, function(jid) {
return context.values.users[Utils.realJID(jid)].username
})
let members = Utils.members(
context.values.users,
result.to_jid,
result.from_jid
)
//
// If we haven't processed this pair already
Expand Down
59 changes: 59 additions & 0 deletions lib/modules/directPosts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const { Transform } = require('stream')
const Factory = require('../factory')
const Utils = require('./utils')

module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
//
// Set up the transform stream
//
const toDirectPost = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform: function(message, encoding, callback) {
//
// Write the direct post to the
// output
//
context.output.write(
Factory.directPost({
channel_members: Utils.members(
context.values.users,
message.to_jid,
message.from_jid
),
user: Utils.username(context.values.users, message.from_jid),
message: Utils.body(message),
create_at: Utils.millis(message.sent_date)
})
)
//
// Invoke the call to mark that we are
// done with the chunk
//
return callback()
}
}).on('finish', function() {
console.log('direct posts: ... finished')
resolve(context)
})

console.log('direct posts: streaming records')

//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber
.pipe(`
SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
toDirectPost
)
})
}
54 changes: 27 additions & 27 deletions lib/modules/posts.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@ module.exports = function(context) {
//
// Lookup the channel
//
var channel = lookup(
var channel = Utils.lookup(
'channel', context.values.channels, message.to_jid
)
//
// Lookup the user
//
var user = lookup(
'user', context.values.users, Utils.realJID(message.from_jid)
var username = Utils.username(
context.values.users, message.from_jid
)
//
// Get the body
//
var body = getBody(message)
var body = Utils.body(message)
//
// If we have enough data, write the
// post to the output
//
if(channel && user && body) {
if(channel && username && body) {
context.output.write(
Factory.post({
team: context.values.team.name,
channel: channel.name,
user: user.username,
user: username,
message: body,
create_at: new Date(message.sent_date).getTime()
})
Expand Down Expand Up @@ -76,28 +76,28 @@ module.exports = function(context) {
})
}

// //
// // Lookup values
// //
// const lookup = function (type, map, key) {
// var found = map[key]
//
// Lookup values
// if(!found) {
// console.log(`posts: ... ${type} ${key} not found`)
// }
//
const lookup = function (type, map, key) {
var found = map[key]

if(!found) {
console.log(`posts: ... ${type} ${key} not found`)
}

return found
}

// return found
// }
//
// Find the message body
// //
// // Find the message body
// //
// const getBody = function (message) {
// var body = message.body_string || message.body_text
//
const getBody = function (message) {
var body = message.body_string || message.body_text

if(!body) {
console.log('posts: ... message body is empty')
}

return body
}
// if(!body) {
// console.log('posts: ... message body is empty')
// }
//
// return body
// }
12 changes: 12 additions & 0 deletions lib/modules/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const { Transform } = require('stream')

module.exports = function(name, transform, callback) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform,
}).on('finish', function() {
console.log(`${name} ... finished`)
callback(context)
})
}
2 changes: 1 addition & 1 deletion lib/modules/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ module.exports = function(context) {
// Look up the channel based on the
// room id
//
var channel = context.values.channels[record.room_jid]
var channel = Utils.lookup('channel', context.values.channels, record.room_jid)
//
// Add it to the user
//
Expand Down
79 changes: 70 additions & 9 deletions lib/modules/utils.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,72 @@
module.exports = {
//
// Removes trailing /<string> suffixes that
// may exist in the user ids. This happens
// if a user logs in to jabber more than
// once at the same time
//
realJID: function (jid='') {
return jid.split('/')[0].replace(/\\20/g, '.')
const _ = require('lodash')

//
// Declare utils object
//
const utils = {}

//
// Removes trailing /<string> suffixes that
// may exist in the user ids. This happens
// if a user logs in to jabber more than
// once at the same time
//
utils.realJID = function (jid='') {
return jid.split('/')[0].replace(/\\20/g, '.')
}

//
// Lookup values
//
utils.lookup = function (type, map, key) {
var found = map[key]

if(!found) {
console.log(`... ${type} ${key} not found`)
}

return found
}

//
// Obtain the username from a jid
//
utils.username = function (users, jid) {
return utils.lookup('user', users, utils.realJID(jid)).username
}

//
// Find the message body
//
utils.body = function (message) {
var body = message.body_string || message.body_text

if(!body) {
console.log('... message body is empty')
}

return body
}

//
// Coverts the to / from JIDs to a members
// array
//
utils.members = function(users, to, from) {
return _.sortBy([
utils.username(users, to),
utils.username(users, from),
])
}

//
// Convert ISO to millis
//
utils.millis = function(date) {
return new Date(date).getTime()
}

//
// Export the functions
//
module.exports = utils

0 comments on commit 45cb129

Please sign in to comment.