Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle down services #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 51 additions & 31 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { app, uuid } from 'mu';
import request from 'request';
import fetch from 'node-fetch';
import services from '/config/rules.js';
import bodyParser from 'body-parser';
import dns from 'dns';

const IP_LOOKUP_CACHE = new Map();
const IP_LOOKUP_CACHE_RETRY_TIMEOUT = 15000;

// Also parse application/json as json
app.use( bodyParser.json( {
type: function(req) {
Expand Down Expand Up @@ -152,61 +155,78 @@ async function sendRequest( entry, changeSets, muCallIdTrail, muSessionId ) {
const method = entry.callback.method;
const url = entry.callback.url;
const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": changeSets[0].allowedGroups, "mu-call-id-trail": muCallIdTrail, "mu-call-id": uuid() , "mu-session-id": muSessionId };

if( entry.options && entry.options.resourceFormat ) {
// we should send contents
const body = formatChangesetBody( changeSets, entry.options );

// TODO: we now assume the mu-auth-allowed-groups will be the same
// for each changeSet. that's a simplification and we should not
// depend on it.

requestObject = {
url, method,
headers,
body: body
};
} else {
// we should only inform
requestObject = { url, method, headers };
}
// TODO: we now assume the mu-auth-allowed-groups will be the same
// for each changeSet. that's a simplification and we should not
// depend on it.
const body = entry.options && entry.options.resourceFormat ? formatChangesetBody( changeSets, entry.options ): null;


if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Executing send ${method} to ${url}`);

request( requestObject, function( error, response, body ) {
if( error ) {
try {
const response = await fetch(url, {
headers,
method,
body,
});
if(await response) {
// const respText = await response.text();
//console.log(respText);
}
} catch(e) {
console.log(`Could not send request ${method} ${url}`);
console.log(error);
console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send
}

if( response ) {
// console.log( body );
}
});

}

async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
return changeSets;
} else {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
try {
const originIpAddress = await getServiceIp( entry );
if(originIpAddress) {
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
} else{
// we couldn't figure what's the ip address, thus we filter everything. not great, but this is just
// for experimenting
return [];
}
} catch(e) {
// handle the case when a service is down and the dns lookup cannot be performed.
console.log(`something went wrong for changeSets ${changeSets} and entry ${entry} during lookup ${e}`);
return []; // service is down anyway, don't send.

}
}
}

function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
}

async function getServiceIp(entry) {
async function getServiceIp(entry, retry=false) {

const hostName = hostnameForEntry( entry );
if(!retry && IP_LOOKUP_CACHE.has(hostName)) {
return IP_LOOKUP_CACHE.get(hostName);
}
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
if( err ) {
IP_LOOKUP_CACHE.set(hostName, false);
setTimeout(async ()=> {
try {
await getServiceIp(entry, true);
}catch (e) {
}
}, IP_LOOKUP_CACHE_RETRY_TIMEOUT); // retry to put it in cache after x seconds
reject( err );
else
} else
IP_LOOKUP_CACHE.set(hostName, address);
resolve( address );
} );
} );
Expand Down
83 changes: 83 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
},
"homepage": "https://github.com/mu-semtech/delta-notifier#readme",
"dependencies": {
"request": "^2.88.0"
"node-fetch": "^2.6.1"
}
}