Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mu-scope-id #6

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ The exported property contains an array of definitions, each linking a match to
- `options.resourceFormat`: Version format describing the format of the contents. Keys may be added to this format, but they may not be removed. Filter the properties as needed.
- `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future.
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).
- `options.optOutMuScopeIds`: List of mu-scope-ids (as URIs) to ignore.
- `options.optInMuScopeIds`: List of mu-scope-ids (as URIs) for exclusive subscription.

## Delta formats

Expand All @@ -95,7 +97,7 @@ v0.0.1 is the latest format of the delta messages. It may be extended with autho
Genesis format as described by the initial Delta service PoC. It looks like:

```json
{
{
"delta": {
"inserts": [{"s": "http://mu.semte.ch/",
"p": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
Expand All @@ -115,7 +117,7 @@ Debugging can be enabled in the service by setting environment variables. The f
- `DEBUG_DELTA_SEND`: Logs all delta messages that are being sent to clients
- `DEBUG_DELTA_MATCH`: Logs a check for each target block, indicating a check will occur
- `DEBUG_TRIPLE_MATCHES_SPEC`: Extensive logging for triples matching a given specification. Handy when requests are unexpectedly not sent.

## Extending

You are encouraged to help figure out how to best extend this service. Fork this repository. Run an experiment. Open an issue or PR describing your experiment. Feel free to open up an issue if you would like to discuss a possible extension.
90 changes: 63 additions & 27 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,58 +33,64 @@ app.post( '/', function( req, res ) {
const originalMuCallId = req.get('mu-call-id');
const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] );

const muCallScopeId = req.get('mu-call-scope-id');

changeSets.forEach( (change) => {
change.insert = change.insert || [];
change.delete = change.delete || [];
} );

// inform watchers
informWatchers( changeSets, res, muCallIdTrail );
informWatchers( changeSets, res, muCallIdTrail, muCallScopeId );

// push relevant data to interested actors
res.status(204).send();
} );

async function informWatchers( changeSets, res, muCallIdTrail ){
async function informWatchers( changeSets, res, muCallIdTrail, muCallScopeId ){
services.map( async (entry) => {
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);

const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);
let filteredChangeSets = filterMatchesForMuScopeId( changeSets, entry, muCallScopeId );

let allInserts = [];
let allDeletes = [];
if(filteredChangeSets.length ){
filteredChangeSets = await filterMatchesForOrigin( filteredChangeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${filteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);

originFilteredChangeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );
let allInserts = [];
let allDeletes = [];

filteredChangeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );

const changedTriples = [...allInserts, ...allDeletes];
const changedTriples = [...allInserts, ...allDeletes];

const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );
const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );

if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);

if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);
if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options && entry.options.gracePeriod ) {
setTimeout(
() => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ),
entry.options.gracePeriod );
} else {
sendRequest( entry, originFilteredChangeSets, muCallIdTrail );
if( entry.options && entry.options.gracePeriod ) {
setTimeout(
() => sendRequest( entry, filteredChangeSets, muCallIdTrail ),
entry.options.gracePeriod );
} else {
sendRequest( entry, filteredChangeSets, muCallIdTrail );
}
}
}
} );
Expand Down Expand Up @@ -195,6 +201,36 @@ async function filterMatchesForOrigin( changeSets, entry ) {
}
}

function filterMatchesForMuScopeId( changeSets, entry, muCallScopeId ){
if( !muCallScopeId ){
return changeSets;
}
//Note: if optIn and optOut are configured at the same time, optIn wins.
else if( hasConfiguredMuScopeOptIn(entry) ){
if( entry.options.optInMuScopeIds.indexOf(muCallScopeId) > -1 ){
return changeSets;
}
else return [];
}
else if( hasConfiguredMuScopeOptOut(entry) ){
if( entry.options.optOutMuScopeIds.indexOf(muCallScopeId) > -1 ){
return [];
}
else return changeSets;
}
else {
return changeSets;
}
}

function hasConfiguredMuScopeOptOut( entry ){
return entry.options && entry.options.optOutMuScopeIds && entry.options.optOutMuScopeIds.length;
}

function hasConfiguredMuScopeOptIn( entry ){
return entry.options && entry.options.optInMuScopeIds && entry.options.optInMuScopeIds.length;
}

function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
}
Expand Down