From a4bdf1d8e5251b3521a35a821c7f29e99079a989 Mon Sep 17 00:00:00 2001 From: Shrey Gupta Date: Mon, 22 Feb 2021 18:13:57 +0530 Subject: [PATCH] Added Async Push to Receive Order Signed-off-by: Shrey Gupta --- common/models/org-model.js | 43 ++++- common/models/stock-order-lineitem-model.js | 38 +++- common/models/stock-order-lineitem-model.json | 3 + workers/sqsWorker.js | 16 ++ .../receive-consignment-vend.js | 12 +- .../receive-line-item-vend.js | 164 ++++++++++++++++++ 6 files changed, 267 insertions(+), 9 deletions(-) create mode 100644 workers/workers-v2/receive-consignment/receive-line-item-vend.js diff --git a/common/models/org-model.js b/common/models/org-model.js index 1a169773..267c1270 100644 --- a/common/models/org-model.js +++ b/common/models/org-model.js @@ -10,6 +10,8 @@ var validate = Promise.promisify(require('joi').validate); var vendSdk = require('vend-nodejs-sdk')({}); const rp = require('request-promise'); const sql = require('mssql'); +const workerUtils = require("../utils/workers") + module.exports = function (OrgModel) { @@ -847,7 +849,7 @@ module.exports = function (OrgModel) { returns: {arg: 'status', type: 'object', root: true} }); - OrgModel.updateAllStockOrderLineItemModels = function (id, reportModelId, lineItemIds, data, options, cb) { + OrgModel.updateAllStockOrderLineItemModels = function (id, reportModelId, lineItemIds, data, options) { logger.debug({ message: 'Will update these line items for order', data, @@ -864,7 +866,15 @@ module.exports = function (OrgModel) { inq: lineItemIds }; } - return OrgModel.app.models.StockOrderLineitemModel.updateAll(filter, data) + + return OrgModel.app.models.StockOrderLineitemModel.updateAll(filter, { + $set: data, + // Worker will update it to true / false + $unset: { + asyncPushSuccess: false // false is irrelevant + }, + }, + {allowExtendedOperators: true}) .catch(function (error) { logger.error({ message: 'Could not update these line items', @@ -874,6 +884,35 @@ module.exports = function (OrgModel) { error }); return Promise.reject('Could not update stock order line items'); + }) + .then(function (updateResult) { + // If received is getting invoked + if (data.hasOwnProperty('received') || data.hasOwnProperty('receivedQuantity')) { + const payload = { + stockOrderLineItemIds: lineItemIds, + reportModelId: reportModelId, + op: 'receiveLineItemVend', + eventType: workerUtils.messageFor.MESSAGE_FOR_CLIENT, + callId: options.accessToken.userId, + }; + // Push the updates to vend + return workerUtils.sendPayLoad(payload) + .catch(function (error) { + logger.error({ + message: 'Could not send receiveLineItemVend to worker', + options, + error, + functionName: 'updateAllStockOrderLineItemModels' + }); + // QUESTION: Should we fail it instead ? + return Promise.resolve(daupdateResultta); + }) + .then(function (){ + return Promise.resolve(updateResult); + }); + } else { + return Promise.resolve(updateResult); + } }); }; diff --git a/common/models/stock-order-lineitem-model.js b/common/models/stock-order-lineitem-model.js index e6a36aa4..f5c6bbd0 100644 --- a/common/models/stock-order-lineitem-model.js +++ b/common/models/stock-order-lineitem-model.js @@ -5,7 +5,7 @@ var _ = require('underscore'); var path = require('path'); var fileName = path.basename(__filename, '.js'); // gives the filename without the .js extension const logger = require('sp-json-logger')({fileName: 'common:models:' + fileName}); - +var workerUtils = require('../utils/workers'); module.exports = function (StockOrderLineitemModel) { @@ -175,7 +175,11 @@ module.exports = function (StockOrderLineitemModel) { updateSetObject = { $inc: { receivedQuantity: 1 - } + }, + // Worker will update it to true / false + $unset: { + asyncPushSuccess: false // false is irrelevant + }, }; }else { logger.debug({ @@ -220,7 +224,6 @@ module.exports = function (StockOrderLineitemModel) { }); return Promise.reject(error); }) - .then(function ([obj, stockOrderLineItemId]) { logger.debug({ functionName: 'scanBarcodeStockOrder', @@ -250,9 +253,34 @@ module.exports = function (StockOrderLineitemModel) { }); return Promise.reject(error); }) - .then(function ([obj, stockLineItem]) { - return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON())); + var payload = { + stockOrderLineItemIds: [stockLineItem.id], + op: 'receiveLineItemVend', + reportModelId: reportModelId, + eventType: workerUtils.messageFor.MESSAGE_FOR_CLIENT, + callId: options.accessToken.userId, + }; + return workerUtils.sendPayLoad(payload) + .then(function (response) { + logger.debug({ + message: 'Sent receiveLineItemVend operation to worker', + options, + response, + functionName: 'scanBarcodeStockOrder' + }); + return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON())); + }) + .catch(function (error) { + logger.error({ + message: 'Could not send receiveConsignmentAsyncVend to worker', + options, + error, + functionName: 'scanBarcodeStockOrder' + }); + // return Promise.reject('Error in creating stock order'); + return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON())); + }); }); }; }; diff --git a/common/models/stock-order-lineitem-model.json b/common/models/stock-order-lineitem-model.json index de191c13..60208fb1 100644 --- a/common/models/stock-order-lineitem-model.json +++ b/common/models/stock-order-lineitem-model.json @@ -82,6 +82,9 @@ "vendDeletedAt": { "type": "date", "required": false + }, + "asyncPushSuccess": { + "type": "boolean" } }, "validations": [], diff --git a/workers/sqsWorker.js b/workers/sqsWorker.js index 0b535b2a..55c62f05 100755 --- a/workers/sqsWorker.js +++ b/workers/sqsWorker.js @@ -598,6 +598,22 @@ function routeToWorker(payload, config, taskId, messageId, receiptHandle) { return Promise.reject('Internal Server Error'); }); } + else if (payload.op === 'receiveLineItemVend') { + logger.tag('Routed').info({ + messageId: messageId, + message: 'routed to receiveLineItemVend' + }); + var receiveLineItem = require('./workers-v2/receive-consignment/receive-line-item-vend'); + return receiveLineItem.run(payload, config, taskId, messageId) + .then(function () { + logger.debug({messageId: messageId, message: 'received line item in Vend successfully'}); + return Promise.resolve(receiptHandle); + }) + .catch(function (error) { + logger.error({err: error, messageId: messageId}); + return Promise.reject('Internal Server Error'); + }); + } else if (payload.op === 'importVendOrderFromFile') { logger.tag('Routed').info({ messageId: messageId, diff --git a/workers/workers-v2/receive-consignment/receive-consignment-vend.js b/workers/workers-v2/receive-consignment/receive-consignment-vend.js index 13e66d21..37080736 100644 --- a/workers/workers-v2/receive-consignment/receive-consignment-vend.js +++ b/workers/workers-v2/receive-consignment/receive-consignment-vend.js @@ -78,12 +78,20 @@ var runMe = function (payload, config, taskId, messageId) { .then(function (response) { reportModelInstance = response; logger.debug({ - message: 'Found report model instance, will look for store and supplier model', + message: 'Found report model instance, will look for not pushed line items', response, messageId }); return db.collection('StockOrderLineitemModel').find({ - reportModelId: ObjectId(payload.reportModelId) + reportModelId: ObjectId(payload.reportModelId), + $or:[ + // Failed to Async Push + {asyncPushSuccess: false}, + // Not Async Pushed + {asyncPushSuccess: { $exists: false }}, + // Received Quantity is 0 + {receivedQuantity: 0} + ], }).toArray(); }) .catch(function (error) { diff --git a/workers/workers-v2/receive-consignment/receive-line-item-vend.js b/workers/workers-v2/receive-consignment/receive-line-item-vend.js new file mode 100644 index 00000000..19077b6c --- /dev/null +++ b/workers/workers-v2/receive-consignment/receive-line-item-vend.js @@ -0,0 +1,164 @@ +const path = require('path'); +const commandName = path.basename(__filename, '.js'); // gives the filename without the .js extension +const logger = require('sp-json-logger')({fileName: 'workers:workers-v2:' + commandName}); +const dbUrl = process.env.DB_URL; +const MongoClient = require('mongodb').MongoClient; +const ObjectId = require('mongodb').ObjectID; +var db = null; //database connected +const utils = require('../../jobs/utils/utils.js'); +const Promise = require('bluebird'); + +var runMe = function (payload, config, taskId, messageId) { + var stockOrderLineItemIds = payload.stockOrderLineItemIds; + var reportModelId = payload.reportModelId; + var reportModelInstance; + try { + logger.debug({ + commandName: commandName, + argv: process.argv, + stockOrderLineItemIds, + messageId, + payload + }); + return Promise.resolve() + .then(function () { + logger.debug({ + message: 'Will connect to Mongo DB', + commandName, + messageId + }); + return MongoClient.connect(dbUrl, {promiseLibrary: Promise}); + }) + .catch(function (error) { + logger.error({ + message: 'Could not connect to Mongo DB', + error, + commandName, + messageId + }); + return Promise.reject('Could not connect to Mongo DB'); + }) + .then(function (dbInstance) { + db = dbInstance; + logger.debug({ + message: 'Connected to Mongo DB, will look for stockLineItem model', + commandName, + messageId + }); + return db.collection('ReportModel').findOne({ + _id: ObjectId(reportModelId) + }); + }) + .catch(function (error) { + logger.error({ + message: 'Could not find report model instance', + stockOrderLineItemIds, + reportModelId, + error, + commandName, + messageId + }); + return Promise.reject('Could not find report instances'); + }) + .then(function (reportModel) { + logger.debug({ + message: 'Found Report Model Instance', + reportModel, + messageId + }); + reportModelInstance = reportModel; + if (!utils.notReceivedStates.includes(reportModel.state)) { + // No need to proceed if report is not in receiving state + return Promise.resolve([]); + } + return db.collection('StockOrderLineitemModel').find({ + _id: { + $in: stockOrderLineItemIds.map(function (stockOrderLineItemId) { + return ObjectId(stockOrderLineItemId); + }), + }, + reportModelId: ObjectId(reportModelId) + }).toArray(); + }) + .catch(function (error) { + logger.error({ + message: 'Could not find StockOrderLineitemModel instances', + stockOrderLineItemIds, + error, + commandName, + messageId + }); + return Promise.reject('Could not find StockOrderLineitemModel instances'); + }) + .then(function (lineItems){ + logger.debug({ + message: 'Found line items to push to vend', + lineItems, + messageId + }); + return Promise.map(lineItems, function (lineItem) { + return Promise.delay(1000) + .then(function () { + logger.debug({ + message: 'Will push a line item to vend', + lineItem, + messageId + }); + return utils.updateStockOrderLineitemForVend(db, reportModelInstance, lineItem, messageId); + }) + .catch(function (error){ + logger.error({ + message: 'Could not push line item to vend, will set error status to true', + lineItem, + error, + commandName, + messageId + }); + return db.collection('StockOrderLineitemModel').updateOne({ + _id: ObjectId(lineItem._id) + }, { + $set: { + asyncPushSuccess: false + } + }); + }) + .then(function () { + logger.debug({ + message: 'Pushed item to vend successfully', + lineItem, + messageId + }); + return db.collection('StockOrderLineitemModel').updateOne({ + _id: ObjectId(lineItem._id) + }, { + $set: { + asyncPushSuccess: true + } + }); + }); + }, { concurrency: 1 }); + + }) + .catch(function (error) { + logger.error({ + commandName, + error, + reason: error, + message: 'Could not update receiving quantities for line item', + messageId + }); + return Promise.reject('Could not update receiving quantities for line item'); + }); + // return Promise.resolve(); + } catch (e) { + logger.error({ + message: 'last catch block', err: e, + messageId + }); + throw e; + } +}; + +module.exports = { + run: runMe +};