Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mkdbns committed Jul 7, 2017
1 parent 45cb129 commit aa9768f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 121 deletions.
57 changes: 28 additions & 29 deletions lib/modules/directChannels.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
const { Transform } = require('stream')
const Factory = require('../factory')
const transform = require('./transform')
const Utils = require('./utils')

module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
console.log('direct channels: streaming records')
//
// Array to accumulate the channel
// member pairs
//
var channels = {}
//
// Set up the transform stream
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
const toDirectChannel = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform: function(result, encoding, callback) {
context.jabber.pipe(
//
// Define the query
//
`SELECT DISTINCT to_jid, from_jid FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
//
// Define the tranform
//
transform('direct channels', function(result, encoding, callback) {
//
// Generate the members array for the
// direct channel
Expand Down Expand Up @@ -44,28 +57,14 @@ module.exports = function(context) {
// done with the chunk
//
return callback()
}
}).on('finish', function() {
console.log('direct channels: ... finished')
resolve(context)
})

console.log('direct channels: streaming records')

//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber
.pipe(`
SELECT DISTINCT to_jid, from_jid FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
toDirectChannel
)
},
//
// Define the callback to be invoked
// on finish
//
function() {
resolve(context)
})
)
})
}
57 changes: 28 additions & 29 deletions lib/modules/directPosts.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
const { Transform } = require('stream')
const Factory = require('../factory')
const transform = require('./transform')
const Utils = require('./utils')

module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
console.log('direct posts: streaming records')
//
// Set up the transform stream
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
const toDirectPost = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform: function(message, encoding, callback) {
context.jabber.pipe(
//
// Define the query
//
`SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
//
// Define the tranform
//
transform('direct channels', function(message, encoding, callback) {
//
// Write the direct post to the
// output
Expand All @@ -32,28 +45,14 @@ module.exports = function(context) {
// done with the chunk
//
return callback()
}
}).on('finish', function() {
console.log('direct posts: ... finished')
resolve(context)
})

console.log('direct posts: streaming records')

//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber
.pipe(`
SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm
WHERE msg_type = 'c'
AND direction = 'I'
AND (body_string != '' or datalength(body_text) > 0)`,
toDirectPost
)
},
//
// Define the callback to be invoked
// on finish
//
function() {
resolve(context)
})
)
})
}
96 changes: 36 additions & 60 deletions lib/modules/posts.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
const { Transform } = require('stream')
const Factory = require('../factory')
const transform = require('./transform')
const Utils = require('./utils')

module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
console.log('posts: streaming records')
//
// Set up the transform stream
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
const toPost = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform: function(message, encoding, callback) {
context.jabber.pipe(
//
// Define the query
//
`SELECT
msg_id,
to_jid,
from_jid,
sent_date,
body_string,
body_text
FROM dbo.tc_msgarchive WHERE msg_type = 'g' AND (body_string != '' OR datalength(body_text) > 0) `,
//
// Define the tranform
//
transform('direct channels', function(message, encoding, callback) {
//
// Lookup the channel
//
Expand Down Expand Up @@ -44,60 +61,19 @@ module.exports = function(context) {
} else {
console.log(`posts: ... [!] skipping message ${message.msg_id}`)
}

//
// Invoke the call to mark that we are
// done with the chunk
//
return callback()
}
}).on('finish', function() {
console.log('posts: ... finished')
resolve(context)
})

console.log('posts: streaming records')

//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber
.pipe(
`SELECT
msg_id,
to_jid,
from_jid,
sent_date,
body_string,
body_text
FROM dbo.tc_msgarchive WHERE msg_type = 'g' AND (body_string != '' OR datalength(body_text) > 0) `,
toPost
)
},
//
// Define the callback to be invoked
// on finish
//
function() {
resolve(context)
})
)
})
}

// //
// // Lookup values
// //
// const lookup = function (type, map, key) {
// var found = map[key]
//
// if(!found) {
// console.log(`posts: ... ${type} ${key} not found`)
// }
//
// return found
// }
//
// //
// // Find the message body
// //
// const getBody = function (message) {
// var body = message.body_string || message.body_text
//
// if(!body) {
// console.log('posts: ... message body is empty')
// }
//
// return body
// }
2 changes: 1 addition & 1 deletion lib/modules/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ module.exports = function(name, transform, callback) {
transform,
}).on('finish', function() {
console.log(`${name} ... finished`)
callback(context)
callback()
})
}
3 changes: 1 addition & 2 deletions lib/modules/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ module.exports = function(context) {
channels: []
}]
})

//
// Look up the channel based on the
// room id
//
var channel = Utils.lookup('channel', context.values.channels, record.room_jid)
var channel = context.values.channels[record.room_jid]
//
// Add it to the user
//
Expand Down

0 comments on commit aa9768f

Please sign in to comment.