From aa9768fc089a8a93c0cde08dbc1471d8a4e7ca6e Mon Sep 17 00:00:00 2001 From: Michael DeBonis Date: Fri, 7 Jul 2017 16:52:25 -0500 Subject: [PATCH] checkpoint --- lib/modules/directChannels.js | 57 ++++++++++----------- lib/modules/directPosts.js | 57 ++++++++++----------- lib/modules/posts.js | 96 +++++++++++++---------------------- lib/modules/transform.js | 2 +- lib/modules/users.js | 3 +- 5 files changed, 94 insertions(+), 121 deletions(-) diff --git a/lib/modules/directChannels.js b/lib/modules/directChannels.js index ced2452..a232dcb 100644 --- a/lib/modules/directChannels.js +++ b/lib/modules/directChannels.js @@ -1,21 +1,34 @@ -const { Transform } = require('stream') const Factory = require('../factory') +const transform = require('./transform') const Utils = require('./utils') module.exports = function(context) { return new Promise(function(resolve /*, reject */) { + console.log('direct channels: streaming records') // // Array to accumulate the channel // member pairs // var channels = {} // - // Set up the transform stream + // Query messages from Jabber and pipe + // through the post transform and + // then to the output. We use pipe to + // handle very large data sets using + // streams // - const toDirectChannel = new Transform({ - readableObjectMode: true, - writableObjectMode: true, - transform: function(result, encoding, callback) { + context.jabber.pipe( + // + // Define the query + // + `SELECT DISTINCT to_jid, from_jid FROM dbo.jm + WHERE msg_type = 'c' + AND direction = 'I' + AND (body_string != '' or datalength(body_text) > 0)`, + // + // Define the tranform + // + transform('direct channels', function(result, encoding, callback) { // // Generate the members array for the // direct channel @@ -44,28 +57,14 @@ module.exports = function(context) { // done with the chunk // return callback() - } - }).on('finish', function() { - console.log('direct channels: ... finished') - resolve(context) - }) - - console.log('direct channels: streaming records') - - // - // Query messages from Jabber and pipe - // through the post transform and - // then to the output. We use pipe to - // handle very large data sets using - // streams - // - context.jabber - .pipe(` - SELECT DISTINCT to_jid, from_jid FROM dbo.jm - WHERE msg_type = 'c' - AND direction = 'I' - AND (body_string != '' or datalength(body_text) > 0)`, - toDirectChannel - ) + }, + // + // Define the callback to be invoked + // on finish + // + function() { + resolve(context) + }) + ) }) } diff --git a/lib/modules/directPosts.js b/lib/modules/directPosts.js index 4a39d9c..4641829 100644 --- a/lib/modules/directPosts.js +++ b/lib/modules/directPosts.js @@ -1,16 +1,29 @@ -const { Transform } = require('stream') const Factory = require('../factory') +const transform = require('./transform') const Utils = require('./utils') module.exports = function(context) { return new Promise(function(resolve /*, reject */) { + console.log('direct posts: streaming records') // - // Set up the transform stream + // Query messages from Jabber and pipe + // through the post transform and + // then to the output. We use pipe to + // handle very large data sets using + // streams // - const toDirectPost = new Transform({ - readableObjectMode: true, - writableObjectMode: true, - transform: function(message, encoding, callback) { + context.jabber.pipe( + // + // Define the query + // + `SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm + WHERE msg_type = 'c' + AND direction = 'I' + AND (body_string != '' or datalength(body_text) > 0)`, + // + // Define the tranform + // + transform('direct channels', function(message, encoding, callback) { // // Write the direct post to the // output @@ -32,28 +45,14 @@ module.exports = function(context) { // done with the chunk // return callback() - } - }).on('finish', function() { - console.log('direct posts: ... finished') - resolve(context) - }) - - console.log('direct posts: streaming records') - - // - // Query messages from Jabber and pipe - // through the post transform and - // then to the output. We use pipe to - // handle very large data sets using - // streams - // - context.jabber - .pipe(` - SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm - WHERE msg_type = 'c' - AND direction = 'I' - AND (body_string != '' or datalength(body_text) > 0)`, - toDirectPost - ) + }, + // + // Define the callback to be invoked + // on finish + // + function() { + resolve(context) + }) + ) }) } diff --git a/lib/modules/posts.js b/lib/modules/posts.js index db88fa8..a4027c6 100644 --- a/lib/modules/posts.js +++ b/lib/modules/posts.js @@ -1,16 +1,33 @@ -const { Transform } = require('stream') const Factory = require('../factory') +const transform = require('./transform') const Utils = require('./utils') module.exports = function(context) { return new Promise(function(resolve /*, reject */) { + console.log('posts: streaming records') // - // Set up the transform stream + // Query messages from Jabber and pipe + // through the post transform and + // then to the output. We use pipe to + // handle very large data sets using + // streams // - const toPost = new Transform({ - readableObjectMode: true, - writableObjectMode: true, - transform: function(message, encoding, callback) { + context.jabber.pipe( + // + // Define the query + // + `SELECT + msg_id, + to_jid, + from_jid, + sent_date, + body_string, + body_text + FROM dbo.tc_msgarchive WHERE msg_type = 'g' AND (body_string != '' OR datalength(body_text) > 0) `, + // + // Define the tranform + // + transform('direct channels', function(message, encoding, callback) { // // Lookup the channel // @@ -44,60 +61,19 @@ module.exports = function(context) { } else { console.log(`posts: ... [!] skipping message ${message.msg_id}`) } - + // + // Invoke the call to mark that we are + // done with the chunk + // return callback() - } - }).on('finish', function() { - console.log('posts: ... finished') - resolve(context) - }) - - console.log('posts: streaming records') - - // - // Query messages from Jabber and pipe - // through the post transform and - // then to the output. We use pipe to - // handle very large data sets using - // streams - // - context.jabber - .pipe( - `SELECT - msg_id, - to_jid, - from_jid, - sent_date, - body_string, - body_text - FROM dbo.tc_msgarchive WHERE msg_type = 'g' AND (body_string != '' OR datalength(body_text) > 0) `, - toPost - ) + }, + // + // Define the callback to be invoked + // on finish + // + function() { + resolve(context) + }) + ) }) } - -// // -// // Lookup values -// // -// const lookup = function (type, map, key) { -// var found = map[key] -// -// if(!found) { -// console.log(`posts: ... ${type} ${key} not found`) -// } -// -// return found -// } -// -// // -// // Find the message body -// // -// const getBody = function (message) { -// var body = message.body_string || message.body_text -// -// if(!body) { -// console.log('posts: ... message body is empty') -// } -// -// return body -// } diff --git a/lib/modules/transform.js b/lib/modules/transform.js index e75bed3..42d009d 100644 --- a/lib/modules/transform.js +++ b/lib/modules/transform.js @@ -7,6 +7,6 @@ module.exports = function(name, transform, callback) { transform, }).on('finish', function() { console.log(`${name} ... finished`) - callback(context) + callback() }) } diff --git a/lib/modules/users.js b/lib/modules/users.js index 4c56c25..a402ccc 100644 --- a/lib/modules/users.js +++ b/lib/modules/users.js @@ -50,12 +50,11 @@ module.exports = function(context) { channels: [] }] }) - // // Look up the channel based on the // room id // - var channel = Utils.lookup('channel', context.values.channels, record.room_jid) + var channel = context.values.channels[record.room_jid] // // Add it to the user //