Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Async Push to Vend #482

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions common/models/org-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ var validate = Promise.promisify(require('joi').validate);
var vendSdk = require('vend-nodejs-sdk')({});
const rp = require('request-promise');
const sql = require('mssql');
const workerUtils = require("../utils/workers")

module.exports = function (OrgModel) {


Expand Down Expand Up @@ -847,7 +849,7 @@ module.exports = function (OrgModel) {
returns: {arg: 'status', type: 'object', root: true}
});

OrgModel.updateAllStockOrderLineItemModels = function (id, reportModelId, lineItemIds, data, options, cb) {
OrgModel.updateAllStockOrderLineItemModels = function (id, reportModelId, lineItemIds, data, options) {
logger.debug({
message: 'Will update these line items for order',
data,
Expand All @@ -864,7 +866,15 @@ module.exports = function (OrgModel) {
inq: lineItemIds
};
}
return OrgModel.app.models.StockOrderLineitemModel.updateAll(filter, data)

return OrgModel.app.models.StockOrderLineitemModel.updateAll(filter, {
$set: data,
// Worker will update it to true / false
$unset: {
asyncPushSuccess: false // false is irrelevant
},
},
{allowExtendedOperators: true})
.catch(function (error) {
logger.error({
message: 'Could not update these line items',
Expand All @@ -874,6 +884,35 @@ module.exports = function (OrgModel) {
error
});
return Promise.reject('Could not update stock order line items');
})
.then(function (updateResult) {
// If received is getting invoked
if (data.hasOwnProperty('received') || data.hasOwnProperty('receivedQuantity')) {
const payload = {
stockOrderLineItemIds: lineItemIds,
reportModelId: reportModelId,
op: 'receiveLineItemVend',
eventType: workerUtils.messageFor.MESSAGE_FOR_CLIENT,
callId: options.accessToken.userId,
};
// Push the updates to vend
return workerUtils.sendPayLoad(payload)
.catch(function (error) {
logger.error({
message: 'Could not send receiveLineItemVend to worker',
options,
error,
functionName: 'updateAllStockOrderLineItemModels'
});
// QUESTION: Should we fail it instead ?
return Promise.resolve(daupdateResultta);
})
.then(function (){
return Promise.resolve(updateResult);
});
} else {
return Promise.resolve(updateResult);
}
});
};

Expand Down
38 changes: 33 additions & 5 deletions common/models/stock-order-lineitem-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var _ = require('underscore');
var path = require('path');
var fileName = path.basename(__filename, '.js'); // gives the filename without the .js extension
const logger = require('sp-json-logger')({fileName: 'common:models:' + fileName});

var workerUtils = require('../utils/workers');

module.exports = function (StockOrderLineitemModel) {

Expand Down Expand Up @@ -175,7 +175,11 @@ module.exports = function (StockOrderLineitemModel) {
updateSetObject = {
$inc: {
receivedQuantity: 1
}
},
// Worker will update it to true / false
$unset: {
asyncPushSuccess: false // false is irrelevant
},
};
}else {
logger.debug({
Expand Down Expand Up @@ -220,7 +224,6 @@ module.exports = function (StockOrderLineitemModel) {
});
return Promise.reject(error);
})

.then(function ([obj, stockOrderLineItemId]) {
logger.debug({
functionName: 'scanBarcodeStockOrder',
Expand Down Expand Up @@ -250,9 +253,34 @@ module.exports = function (StockOrderLineitemModel) {
});
return Promise.reject(error);
})

.then(function ([obj, stockLineItem]) {
return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON()));
var payload = {
stockOrderLineItemIds: [stockLineItem.id],
op: 'receiveLineItemVend',
reportModelId: reportModelId,
eventType: workerUtils.messageFor.MESSAGE_FOR_CLIENT,
callId: options.accessToken.userId,
};
return workerUtils.sendPayLoad(payload)
.then(function (response) {
logger.debug({
message: 'Sent receiveLineItemVend operation to worker',
options,
response,
functionName: 'scanBarcodeStockOrder'
});
return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON()));
})
.catch(function (error) {
logger.error({
message: 'Could not send receiveConsignmentAsyncVend to worker',
options,
error,
functionName: 'scanBarcodeStockOrder'
});
// return Promise.reject('Error in creating stock order');
return Promise.resolve(Object.assign({}, obj, stockLineItem.toJSON()));
});
});
};
};
3 changes: 3 additions & 0 deletions common/models/stock-order-lineitem-model.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
"vendDeletedAt": {
"type": "date",
"required": false
},
"asyncPushSuccess": {
"type": "boolean"
}
},
"validations": [],
Expand Down
16 changes: 16 additions & 0 deletions workers/sqsWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,22 @@ function routeToWorker(payload, config, taskId, messageId, receiptHandle) {
return Promise.reject('Internal Server Error');
});
}
else if (payload.op === 'receiveLineItemVend') {
logger.tag('Routed').info({
messageId: messageId,
message: 'routed to receiveLineItemVend'
});
var receiveLineItem = require('./workers-v2/receive-consignment/receive-line-item-vend');
return receiveLineItem.run(payload, config, taskId, messageId)
.then(function () {
logger.debug({messageId: messageId, message: 'received line item in Vend successfully'});
return Promise.resolve(receiptHandle);
})
.catch(function (error) {
logger.error({err: error, messageId: messageId});
return Promise.reject('Internal Server Error');
});
}
else if (payload.op === 'importVendOrderFromFile') {
logger.tag('Routed').info({
messageId: messageId,
Expand Down
12 changes: 10 additions & 2 deletions workers/workers-v2/receive-consignment/receive-consignment-vend.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,20 @@ var runMe = function (payload, config, taskId, messageId) {
.then(function (response) {
reportModelInstance = response;
logger.debug({
message: 'Found report model instance, will look for store and supplier model',
message: 'Found report model instance, will look for not pushed line items',
response,
messageId
});
return db.collection('StockOrderLineitemModel').find({
reportModelId: ObjectId(payload.reportModelId)
reportModelId: ObjectId(payload.reportModelId),
$or:[
// Failed to Async Push
{asyncPushSuccess: false},
// Not Async Pushed
{asyncPushSuccess: { $exists: false }},
// Received Quantity is 0
{receivedQuantity: 0}
],
}).toArray();
})
.catch(function (error) {
Expand Down
164 changes: 164 additions & 0 deletions workers/workers-v2/receive-consignment/receive-line-item-vend.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
const path = require('path');
const commandName = path.basename(__filename, '.js'); // gives the filename without the .js extension
const logger = require('sp-json-logger')({fileName: 'workers:workers-v2:' + commandName});
const dbUrl = process.env.DB_URL;
const MongoClient = require('mongodb').MongoClient;
const ObjectId = require('mongodb').ObjectID;
var db = null; //database connected
const utils = require('../../jobs/utils/utils.js');
const Promise = require('bluebird');

var runMe = function (payload, config, taskId, messageId) {
var stockOrderLineItemIds = payload.stockOrderLineItemIds;
var reportModelId = payload.reportModelId;
var reportModelInstance;
try {
logger.debug({
commandName: commandName,
argv: process.argv,
stockOrderLineItemIds,
messageId,
payload
});
return Promise.resolve()
.then(function () {
logger.debug({
message: 'Will connect to Mongo DB',
commandName,
messageId
});
return MongoClient.connect(dbUrl, {promiseLibrary: Promise});
})
.catch(function (error) {
logger.error({
message: 'Could not connect to Mongo DB',
error,
commandName,
messageId
});
return Promise.reject('Could not connect to Mongo DB');
})
.then(function (dbInstance) {
db = dbInstance;
logger.debug({
message: 'Connected to Mongo DB, will look for stockLineItem model',
commandName,
messageId
});
return db.collection('ReportModel').findOne({
_id: ObjectId(reportModelId)
});
})
.catch(function (error) {
logger.error({
message: 'Could not find report model instance',
stockOrderLineItemIds,
reportModelId,
error,
commandName,
messageId
});
return Promise.reject('Could not find report instances');
})
.then(function (reportModel) {
logger.debug({
message: 'Found Report Model Instance',
reportModel,
messageId
});
reportModelInstance = reportModel;
if (!utils.notReceivedStates.includes(reportModel.state)) {
// No need to proceed if report is not in receiving state
return Promise.resolve([]);
}
return db.collection('StockOrderLineitemModel').find({
_id: {
$in: stockOrderLineItemIds.map(function (stockOrderLineItemId) {
return ObjectId(stockOrderLineItemId);
}),
},
reportModelId: ObjectId(reportModelId)
}).toArray();
})
.catch(function (error) {
logger.error({
message: 'Could not find StockOrderLineitemModel instances',
stockOrderLineItemIds,
error,
commandName,
messageId
});
return Promise.reject('Could not find StockOrderLineitemModel instances');
})
.then(function (lineItems){
logger.debug({
message: 'Found line items to push to vend',
lineItems,
messageId
});
return Promise.map(lineItems, function (lineItem) {
return Promise.delay(1000)
.then(function () {
logger.debug({
message: 'Will push a line item to vend',
lineItem,
messageId
});
return utils.updateStockOrderLineitemForVend(db, reportModelInstance, lineItem, messageId);
})
.catch(function (error){
logger.error({
message: 'Could not push line item to vend, will set error status to true',
lineItem,
error,
commandName,
messageId
});
return db.collection('StockOrderLineitemModel').updateOne({
_id: ObjectId(lineItem._id)
}, {
$set: {
asyncPushSuccess: false
}
});
})
.then(function () {
logger.debug({
message: 'Pushed item to vend successfully',
lineItem,
messageId
});
return db.collection('StockOrderLineitemModel').updateOne({
_id: ObjectId(lineItem._id)
}, {
$set: {
asyncPushSuccess: true
}
});
});
}, { concurrency: 1 });

})
.catch(function (error) {
logger.error({
commandName,
error,
reason: error,
message: 'Could not update receiving quantities for line item',
messageId
});
return Promise.reject('Could not update receiving quantities for line item');
});
// return Promise.resolve();
} catch (e) {
logger.error({
message: 'last catch block', err: e,
messageId
});
throw e;
}
};

module.exports = {
run: runMe
};