From 979f599fda488b749687d5d6ba05b78c160d97a0 Mon Sep 17 00:00:00 2001 From: Haydar KULEKCI Date: Sun, 29 Oct 2017 23:01:18 +0300 Subject: [PATCH] added queue support for insert update and delete operation --- README.md | 10 ++++- docker-compose.yml | 14 +++++++ libraries/redis-client.js | 4 ++ package-lock.json | 35 ++++++++++++------ package.json | 6 ++- queue-listener.js | 77 +++++++++++++++++++++++++++++++++++++++ routes/products.js | 57 ++++++----------------------- 7 files changed, 143 insertions(+), 60 deletions(-) create mode 100644 queue-listener.js diff --git a/README.md b/README.md index f41cde1..100ea1e 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ ``` docker-compose up elasticsearch kibana -docker-compose up mysql -docker-compose up app +docker-compose up mysql redis +docker-compose up app listener ``` ## Database Structure and Initialization @@ -21,6 +21,12 @@ Port : 33060 And import `data/sample_data.sql` file for our sample data. +Redis configuration : + +``` +Host: redis +``` + ## Elastic Integration diff --git a/docker-compose.yml b/docker-compose.yml index f21a18e..6c7ab44 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,3 +59,17 @@ services: - elasticsearch - mysql - redis + + listener: + image: "node:8" + user: "node" + working_dir: /home/node/app + environment: + - NODE_ENV=production + volumes: + - ./:/home/node/app + command: "npm run listen" + links: + - elasticsearch + - mysql + - redis diff --git a/libraries/redis-client.js b/libraries/redis-client.js index dbc5f2f..5143884 100644 --- a/libraries/redis-client.js +++ b/libraries/redis-client.js @@ -2,6 +2,10 @@ var redis = require('redis'); client = redis.createClient({'host': 'redis'}); var RedisClient = { + getClient: function() { + return client; + }, + push: function(key, value, callback) { client.lpush(key, value, function(err, result) { callback(err, result); diff --git a/package-lock.json b/package-lock.json index 9e5afdf..6708414 100644 --- a/package-lock.json +++ b/package-lock.json @@ -163,9 +163,9 @@ } }, "bignumber.js": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-4.0.2.tgz", - "integrity": "sha1-LR3DfuWWiGfs6pC22k0W5oYI0h0=" + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-4.0.4.tgz", + "integrity": "sha512-LDXpJKVzEx2/OqNbG9mXBNvHuiRL4PzHCGfnANHMJ+fv68Ads3exDVJeGDJws+AoNEuca93bU3q+S0woeUaCdg==" }, "binary-extensions": { "version": "1.10.0", @@ -2162,14 +2162,14 @@ "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" }, "mysql": { - "version": "2.14.1", - "resolved": "https://registry.npmjs.org/mysql/-/mysql-2.14.1.tgz", - "integrity": "sha512-ZPXqQeYH7L1QPDyC77Rcp32cNCQnNjz8Y4BbF17tOjm5yhSfjFa3xS4PvuxWJtEEmwVc4ccI7sSntj4eyYRq0A==", + "version": "2.15.0", + "resolved": "https://registry.npmjs.org/mysql/-/mysql-2.15.0.tgz", + "integrity": "sha512-C7tjzWtbN5nzkLIV+E8Crnl9bFyc7d3XJcBAvHKEVkjrYjogz3llo22q6s/hw+UcsE4/844pDob9ac+3dVjQSA==", "requires": { - "bignumber.js": "4.0.2", + "bignumber.js": "4.0.4", "readable-stream": "2.3.3", "safe-buffer": "5.1.1", - "sqlstring": "2.2.0" + "sqlstring": "2.3.0" } }, "nan": { @@ -2669,6 +2669,19 @@ "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.2.tgz", "integrity": "sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=" }, + "simple-redis-queue": { + "version": "git+ssh://git@github.com/hkulekci/node-simple-redis-queue.git#623f02369aa8544f1675e895c12944b10dfc8b83", + "requires": { + "redis": "0.12.1" + }, + "dependencies": { + "redis": { + "version": "0.12.1", + "resolved": "https://registry.npmjs.org/redis/-/redis-0.12.1.tgz", + "integrity": "sha1-ZN92rQ/IrOuuvSoGReikj6xJGF4=" + } + } + }, "sntp": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/sntp/-/sntp-2.0.2.tgz", @@ -2693,9 +2706,9 @@ } }, "sqlstring": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/sqlstring/-/sqlstring-2.2.0.tgz", - "integrity": "sha1-wxNcTqirzX5+50GklmqJHYak8ZE=" + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/sqlstring/-/sqlstring-2.3.0.tgz", + "integrity": "sha1-UluKT9Jtb3GqYegipsr5dtMa0qg=" }, "sshpk": { "version": "1.13.1", diff --git a/package.json b/package.json index 0cc955c..e6dfeeb 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,8 @@ "version": "0.0.0", "private": true, "scripts": { - "start": "nodemon -e js,twig,css ./bin/www" + "start": "nodemon -e js,twig,css ./bin/www", + "listen": "nodemon -e queue-listener.js ./queue-listener.js" }, "dependencies": { "async": "^2.5.0", @@ -16,11 +17,12 @@ "express": "~4.15.5", "less-middleware": "~2.2.1", "morgan": "~1.9.0", - "mysql": "^2.14.1", + "mysql": "^2.15.0", "node-datetime": "^2.0.3", "nodemon": "^1.12.1", "redis": "^2.8.0", "serve-favicon": "~2.4.5", + "simple-redis-queue": "git+ssh://git@github.com/hkulekci/node-simple-redis-queue.git", "twig": "~0.10.3" } } diff --git a/queue-listener.js b/queue-listener.js new file mode 100644 index 0000000..90b4568 --- /dev/null +++ b/queue-listener.js @@ -0,0 +1,77 @@ +#!/usr/bin/env node + +require('dotenv').config(); +var productService = require('./services/productService'); +var categoryService = require('./services/categoryService'); +var productSearchService = require('./services/productSearchService'); +var redisClient = require('./libraries/redis-client'); +var RedisQueue = require("simple-redis-queue"); +var waterfall = require('async/waterfall'); + +var pop_queue = new RedisQueue(redisClient.getClient()); + +var upsertOperation = function(productId, functionCallback) { + waterfall([ + // Insert Operation + function(waterfallCallback) { // Getting Product Category From DB (Preparing for Elasticsearch) + var result = {}; + categoryService.getProductCategories(productId, function(err, productCategories) { + if (err) {} + result.productCategories = productCategories; + waterfallCallback(false, result); + }); + }, + function(result, waterfallCallback) { // Getting Product Data From MySQL DB (Preparing for Elasticsearch) + productService.getRecord(productId, function(err, product) { + if (err) {} + result.product = product; + waterfallCallback(false, result); + }); + }, + function(result, waterfallCallback) { // Saving Product to Elasticsearch + var product = result.product; + product.categories = result.productCategories; + + productSearchService.insert(product, function() { + waterfallCallback(false, product); + }); + }, + function(err, result) { + functionCallback(err, result); + } + ]); +}; + +var deleteOperation = function(productId, functionCallback) { + // Delete Operation + productSearchService.delete(productId, function(err, result) { + functionCallback(false); + }); +}; + + +/***** Redis Message Queue Listener ******/ +pop_queue.on("message", function (queueName, payload) { + var messageData = JSON.parse(payload); + if (messageData.action == 'update' || messageData.action == 'insert') { + upsertOperation(messageData.productId, function() { + console.log('[' + queueName + '] - Processed! - ' + payload); + pop_queue.next("product_updates"); + }); + } else if (messageData.action == 'delete') { + deleteOperation(messageData.productId, function() { + console.log('[' + queueName + '] - Processed! - ' + payload); + pop_queue.next("product_updates"); + }); + } else { + console.log('[' + queueName + '] - Not Processed! - ' + payload); + pop_queue.next("product_updates"); + } +}); + +// Listen for errors +pop_queue.on("error", function (error) { + console.log("pop_queue Error : " + error); +}); + +pop_queue.next("product_updates"); \ No newline at end of file diff --git a/routes/products.js b/routes/products.js index 15d2955..7acad84 100644 --- a/routes/products.js +++ b/routes/products.js @@ -6,6 +6,9 @@ var waterfall = require('async/waterfall'); var productSearchService = require('../services/productSearchService'); var redisClient = require('../libraries/redis-client'); var md5 = require('blueimp-md5'); +var RedisQueue = require("simple-redis-queue"); + +var push_queue = new RedisQueue(redisClient.getClient()); /* GET products listing. */ router.get('/', function(req, res, next) { @@ -80,32 +83,15 @@ router.post('/new', function(req, res, next) { }) }); }, - function(result, waterfallCallback) { // Getting Product Category From DB (Preparing for Elasticsearch) - categoryService.getProductCategories(result.insertId, function(err, productCategories) { - if (err) {} - result.productCategories = productCategories; - waterfallCallback(false, result); - }); - }, - function(result, waterfallCallback) { // Getting Product Data From MySQL DB (Preparing for Elasticsearch) - productService.getRecord(result.insertId, function(err, product) { - if (err) {} - result.product = product; + function(result, waterfallCallback) { + push_queue.push('product_updates', {'action':'insert', 'productId': result.insertId}, function(err, res) { waterfallCallback(false, result); }); - }, - function(result, waterfallCallback) { // Saving Product to Elasticsearch - var product = result.product; - product.categories = result.productCategories; - - productSearchService.insert(product, function() { - waterfallCallback(false, product); - }); } ], - function(err, product) { + function(err, result) { if (err) {} - res.redirect('/product/id/'+product.id); + res.redirect('/product/id/'+result.insertId); } ); }); @@ -122,8 +108,7 @@ router.get('/:id/delete', function(req, res, next) { }); }, function(waterfallCallback) { - productSearchService.delete(params.id, function(err, result) { - //TODO: check error status + push_queue.push('product_updates', {'action':'delete', 'productId': params.id}, function(err, res) { waterfallCallback(false); }); } @@ -210,33 +195,15 @@ router.post('/:id/edit', function(req, res, next) { }) }); }, - function(waterfallCallback) { // Getting Product Categories Data from MySQL (Preparing for Elasticsearch) - var result = {}; - categoryService.getProductCategories(params.id, function(err, productCategories) { - if (err) {} - result.productCategories = productCategories; - waterfallCallback(false, result); - }); - }, - function(result, waterfallCallback) { // Getting Product Data from MySQL (Preparing for Elasticsearch) - productService.getRecord(params.id, function(err, product) { - if (err) {} - result.product = product; - waterfallCallback(false, result); - }); - }, - function(result, waterfallCallback) { // Saving Data to Elasticsearch - var product = result.product; - product.categories = result.productCategories; - - productSearchService.insert(product, function() { - waterfallCallback(false, product); + function(waterfallCallback) { + push_queue.push('product_updates', {'action':'update', 'productId': queryParams.id}, function(err, res) { + waterfallCallback(false); }); } ], function(err, product) { if (err) {} - res.redirect('/product/id/' + product.id); + res.redirect('/product/id/' + queryParams.id); return; } );