diff --git a/README.md b/README.md index cb44e1f..95759f1 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ The exported property contains an array of definitions, each linking a match to - `options.resourceFormat`: Version format describing the format of the contents. Keys may be added to this format, but they may not be removed. Filter the properties as needed. - `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future. - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). + - `options.optOutMuScopeIds`: List of mu-scope-ids (as URIs) to ignore. + - `options.optInMuScopeIds`: List of mu-scope-ids (as URIs) for exclusive subscription. ## Delta formats @@ -95,7 +97,7 @@ v0.0.1 is the latest format of the delta messages. It may be extended with autho Genesis format as described by the initial Delta service PoC. It looks like: ```json - { + { "delta": { "inserts": [{"s": "http://mu.semte.ch/", "p": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", @@ -115,7 +117,7 @@ Debugging can be enabled in the service by setting environment variables. The f - `DEBUG_DELTA_SEND`: Logs all delta messages that are being sent to clients - `DEBUG_DELTA_MATCH`: Logs a check for each target block, indicating a check will occur - `DEBUG_TRIPLE_MATCHES_SPEC`: Extensive logging for triples matching a given specification. Handy when requests are unexpectedly not sent. - + ## Extending You are encouraged to help figure out how to best extend this service. Fork this repository. Run an experiment. Open an issue or PR describing your experiment. Feel free to open up an issue if you would like to discuss a possible extension. diff --git a/app.js b/app.js index 97c710a..20c416a 100644 --- a/app.js +++ b/app.js @@ -33,19 +33,21 @@ app.post( '/', function( req, res ) { const originalMuCallId = req.get('mu-call-id'); const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] ); + const muCallScopeId = req.get('mu-call-scope-id'); + changeSets.forEach( (change) => { change.insert = change.insert || []; change.delete = change.delete || []; } ); // inform watchers - informWatchers( changeSets, res, muCallIdTrail ); + informWatchers( changeSets, res, muCallIdTrail, muCallScopeId ); // push relevant data to interested actors res.status(204).send(); } ); -async function informWatchers( changeSets, res, muCallIdTrail ){ +async function informWatchers( changeSets, res, muCallIdTrail, muCallScopeId ){ services.map( async (entry) => { // for each entity if( process.env["DEBUG_DELTA_MATCH"] ) @@ -53,38 +55,42 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ const matchSpec = entry.match; - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + let filteredChangeSets = filterMatchesForMuScopeId( changeSets, entry, muCallScopeId ); - let allInserts = []; - let allDeletes = []; + if(filteredChangeSets.length ){ + filteredChangeSets = await filterMatchesForOrigin( filteredChangeSets, entry ); + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) + console.log(`There are ${filteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); - originFilteredChangeSets.forEach( (change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - } ); + let allInserts = []; + let allDeletes = []; + + filteredChangeSets.forEach( (change) => { + allInserts = [...allInserts, ...change.insert]; + allDeletes = [...allDeletes, ...change.delete]; + } ); - const changedTriples = [...allInserts, ...allDeletes]; + const changedTriples = [...allInserts, ...allDeletes]; - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); + const someTripleMatchedSpec = + changedTriples + .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - if( someTripleMatchedSpec ) { - // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) - console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + if( someTripleMatchedSpec ) { + // inform matching entities + if( process.env["DEBUG_DELTA_SEND"] ) + console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - if( entry.options && entry.options.gracePeriod ) { - setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); - } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + if( entry.options && entry.options.gracePeriod ) { + setTimeout( + () => sendRequest( entry, filteredChangeSets, muCallIdTrail ), + entry.options.gracePeriod ); + } else { + sendRequest( entry, filteredChangeSets, muCallIdTrail ); + } } } } ); @@ -195,6 +201,36 @@ async function filterMatchesForOrigin( changeSets, entry ) { } } +function filterMatchesForMuScopeId( changeSets, entry, muCallScopeId ){ + if( !muCallScopeId ){ + return changeSets; + } + //Note: if optIn and optOut are configured at the same time, optIn wins. + else if( hasConfiguredMuScopeOptIn(entry) ){ + if( entry.options.optInMuScopeIds.indexOf(muCallScopeId) > -1 ){ + return changeSets; + } + else return []; + } + else if( hasConfiguredMuScopeOptOut(entry) ){ + if( entry.options.optOutMuScopeIds.indexOf(muCallScopeId) > -1 ){ + return []; + } + else return changeSets; + } + else { + return changeSets; + } +} + +function hasConfiguredMuScopeOptOut( entry ){ + return entry.options && entry.options.optOutMuScopeIds && entry.options.optOutMuScopeIds.length; +} + +function hasConfiguredMuScopeOptIn( entry ){ + return entry.options && entry.options.optInMuScopeIds && entry.options.optInMuScopeIds.length; +} + function hostnameForEntry( entry ) { return (new URL(entry.callback.url)).hostname; }