diff --git a/README.md b/README.md index c46ef84..b23ba5d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # Arrivals and Departures to Connections -__Status__: not functional - work in progress - -This javascript library transforms a stream of "arrivals and departures" to "connections". +This javascript library transforms a [JSON-LD stream](https://github.com/pietercolpaert/jsonld-stream) of "arrivals and departures" to "connections". This code is useful for route planning applications where arrivals and departures are easy to generate, but connections are difficult to get right in the same processing. @@ -17,10 +15,16 @@ npm install -g arrdep2connections Now you can use the code on top of 2 files which follow the [JSON-LD stream specification](https://github.com/pietercolpaert/jsonld-stream): ```bash -arrdep2connections arrivalsfile.json departurefile.json -i context.json > connections.json +node arrdep2connections --arrivals arrivals.jsonldstream --departures departures.jsonldstream [--inbound context.json] > connections.jsonldstream ``` -Optionally, you can specify a different inbound `context.json` using the `-i` flag, and a different outbound context by using the `-o` flag. +Optionally, you can specify a different inbound `context.json` using the `--inbound` flag, and a different outbound context by using the `--outbound` flag. + +You can also load connections into MongoDB using the `--mongodb` flag. See `config/development.json` for more configuration options. + +```bash +node arrdep2connections --arrivals arrivals.jsonldstream --departures departures.jsonldstream --mongodb +``` ### NodeJS library diff --git a/arrdep2connections.js b/arrdep2connections.js index ea7804b..1eb2267 100755 --- a/arrdep2connections.js +++ b/arrdep2connections.js @@ -1,33 +1,45 @@ -var ReadStream = require('./lib/ReadStream.js'), - ArrDep2Connections = require('./lib/arrdep2connections.js'), +var ArrDep2Connections = require('./lib/arrdep2connections.js'), stringify = require('JSONStream').stringify(false), + jsonldstream = require('jsonld-stream'), + param = require('param'), + MongoClient = require('mongodb').MongoClient, + StreamToMongo = require('./lib/StreamToMongo.js'), fs = require('fs'); -// Read filename arrivals.json and departure.json from parameters -var arrivalsFilename = './' + process.argv[2]; -var arrivalData = require(arrivalsFilename); -var departuresFilename = './' + process.argv[3]; -var departureData = require(departuresFilename); +// Read filename arrivals.jsonldstream and departure.jsonldstream from parameters +var arrivals = param('arrivals'); +var departures = param('departures'); -// Initialise streams -var arrivalStream = new ReadStream(arrivalData); -var departureStream = new ReadStream(departureData); -var arrdep2connections = new ArrDep2Connections(arrivalStream); // our transform stream +// Read extra configuration parameters +var options = {}; +options['mongoDb'] = param('mongodb'); +options['mongoDbConfig'] = param('mongoDbConfig'); +options['inbound'] = param('inbound'); +options['outbound'] = param('outbound'); -departureStream.pipe(arrdep2connections).pipe(stringify).pipe(process.stdout); +// Initialise streams +var arrivalStream = fs.createReadStream(arrivals, {encoding: 'utf8'}).pipe(new jsonldstream.Deserializer()); +var departureStream = fs.createReadStream(departures, {encoding: 'utf8'}).pipe(new jsonldstream.Deserializer()); +var arrdep2connections = new ArrDep2Connections(arrivalStream, options); // our transform stream -// Check parameter 4 for optional inbound or outbound -if (process.argv[4] && process.argv[4] === '-i') { - var inbound = process.argv[5]; -} else if (process.argv[4] && process.argv[4] === '-o') { - var outbound = process.argv[5]; -} +// Load in MongoDB +if (options.mongoDb === true ) { + var url = 'mongodb://' + options.mongoDbConfig.host + ':' + options.mongoDbConfig.port + '/' + options.mongoDbConfig.database; -// Check parameter 6 for optional inbound or outbound -if (process.argv[6] && process.argv[6] === '-i') { - var inbound = process.argv[7]; -} else if (process.argv[6] && process.argv[6] === '-o') { - var outbound = process.argv[7]; -} + // First empty collection + MongoClient.connect(url, function(err, db) { + if (err) { + die("Wasn't able to connect to MongoDB server. Check if your server is running.", url); + } -// TODO: taking care of in/outbound \ No newline at end of file + var collection = db.collection(options.mongoDbConfig.collection); + collection.remove(); // empty the collection + var streamToMongo = new StreamToMongo(collection); + var stream = departureStream.pipe(arrdep2connections).pipe(streamToMongo).on('finish', function () { + db.close(); // close connection + }); + }); +} else { + // Write to stdout + departureStream.pipe(arrdep2connections).pipe(stringify).pipe(process.stdout); +} \ No newline at end of file diff --git a/arrdepcontext.json b/arrdepcontext.json index 3cf7b85..e3e3f7f 100644 --- a/arrdepcontext.json +++ b/arrdepcontext.json @@ -1,6 +1,8 @@ { "@context" : { "st" : "http://semweb.mmlab.be/ns/stoptimes#", + "Arrival" : "st:Arrival", + "Departure" : "st:Departure", "gtfs" : "http://vocab.gtfs.org/terms#", "arrivalTime" : "gtfs:arrivalTime", "departureTime" : "gtfs:departureTime", @@ -8,6 +10,15 @@ "stop" : { "@type" : "@id", "@id" : "gtfs:stop" - } + }, + "trip" : { + "@type" : "@id", + "@id" : "gtfs:trip" + }, + "route" : { + "@type" : "@id", + "@id" : "gtfs:route" + }, + "stopSequence" : "gtfs:stopSequence" } -} +} \ No newline at end of file diff --git a/config/development.json b/config/development.json new file mode 100644 index 0000000..a0ac17f --- /dev/null +++ b/config/development.json @@ -0,0 +1,14 @@ +{ + "mongoDbConfig": { + "host": "localhost", + "port": "27017", + "database": "gtfs", + "collection": "connections" + }, + "mongodb": false, + "inbound": "arrdepcontext.json", + "outbound": "connectionscontext.json", + "arrivals": "./arrivals.jsonldstream", + "departures": "./departures.jsonldstream", + "self": "{./development.json}" +} \ No newline at end of file diff --git a/connectionscontext.json b/connectionscontext.json index 8a7f76a..a5e76ad 100644 --- a/connectionscontext.json +++ b/connectionscontext.json @@ -2,6 +2,9 @@ "@context" : { "lc" : "http://semweb.mmlab.be/ns/linkedconnections#", "gtfs" : "http://vocab.gtfs.org/terms#", + "Connection" : "http://semweb.mmlab.be/ns/linkedconnections#Connection", + "dct" : "http://purl.org/dc/terms/", + "date" : "dct:date", "arrivalTime" : "lc:arrivalTime", "departureTime" : "lc:departureTime", "arrivalStop" : { @@ -11,6 +14,15 @@ "departureStop" : { "@type" : "@id", "@id" : "http://semweb.mmlab.be/ns/linkedconnections#departureStop" - } + }, + "trip" : { + "@type" : "@id", + "@id" : "gtfs:trip" + }, + "route" : { + "@type" : "@id", + "@id" : "gtfs:route" + }, + "headsign" : "gtfs:headsign" } } diff --git a/lib/ReadStream.js b/lib/ReadStream.js deleted file mode 100644 index 7f6fedf..0000000 --- a/lib/ReadStream.js +++ /dev/null @@ -1,19 +0,0 @@ -var Readable = require('stream').Readable, - util = require('util'); - -var ReadStream = function(data) { - Readable.call(this, {objectMode: true}); - this.data = data; - this.curIndex = 0; -}; -util.inherits(ReadStream, Readable); - -ReadStream.prototype._read = function() { - if (this.curIndex === this.data.length) - return this.push(null); - - var data = this.data[this.curIndex++]; - this.push(data); -}; - -module.exports = ReadStream; \ No newline at end of file diff --git a/lib/StreamIterator.js b/lib/StreamIterator.js index fb3e719..a16872b 100644 --- a/lib/StreamIterator.js +++ b/lib/StreamIterator.js @@ -6,6 +6,10 @@ EventEmitter.prototype._maxListeners = 1000; var StreamIterator = function (stream) { this._stream = stream; this["@context"] = {}; + var self = this; + this._stream.on("end", function () { + self._cb(); + }); }; util.inherits(StreamIterator, EventEmitter); @@ -18,6 +22,7 @@ StreamIterator.prototype.next = function (callback) { object = null; } if (!object) { + this._cb = callback; this._stream.once("readable", function () {; self.next(callback); }); diff --git a/lib/StreamToMongo.js b/lib/StreamToMongo.js new file mode 100644 index 0000000..c8aaa6a --- /dev/null +++ b/lib/StreamToMongo.js @@ -0,0 +1,23 @@ +var Writable = require('stream').Writable; +var util = require('util'); + +function StreamToMongo(collection) { + Writable.call(this, { objectMode: true }); + + this._collection = collection; +} + +util.inherits(StreamToMongo, Writable); + +StreamToMongo.prototype._write = function (obj, encoding, done) { + this._collection.insert(obj, function(err, result) { + if (!err) { + //console.log(result); + done(); + } else { + done(err); + } + }); +}; + +module.exports = StreamToMongo; \ No newline at end of file diff --git a/lib/arrdep2connections.js b/lib/arrdep2connections.js index fc1808b..621ef34 100644 --- a/lib/arrdep2connections.js +++ b/lib/arrdep2connections.js @@ -1,6 +1,7 @@ var Transform = require('stream').Transform, util = require('util'), moment = require('moment'), + fs = require('fs'), StreamIterator = require('./StreamIterator.js'); util.inherits(ArrDep2Connections, Transform); @@ -12,25 +13,16 @@ function ArrDep2Connections (arrivalsStream, options) { this._arrivalsQueue = []; //chronologically sorted list of arrivals this._arrivalsQueueIndex = 0; this._departuresContext = {"@context" : {}}; + //This is a jsonldstream as well and has a this["@context"] - this["@context"] = { - "gtfs": "http://vocab.gtfs.org/terms#", - "lc" : "http://semweb.mmlab.be/ns/linkedconnections#", - "Connection" : "http://semweb.mmlab.be/ns/linkedconnections#Connection", - "dct" : "http://purl.org/dc/terms/", - "data" : "dct:date", - "arrivalTime" : "lc:arrivalTime", - "arrivalStop" : "lc:arrivalStop", - "departureTime" : "lc:departureTime", - "departureStop" : "lc:departureStop" - }; - this.push({"@context":this["@context"]}); + var context = JSON.parse(fs.readFileSync(options.outbound, 'utf8')); + this.push(context); } ArrDep2Connections.prototype._flush = function (done) { //Try to match the remainder of the departure/arrival pairs //console.log(this._arrivalsQueue); - done(); + done(); }; ArrDep2Connections.prototype._transform = function (departure, encoding, done) { @@ -38,32 +30,57 @@ ArrDep2Connections.prototype._transform = function (departure, encoding, done) { this._departuresContext = departure["@context"]; done(); } else { - departure["lc:departureTime"] = new Date(this._normalizeISO8601(departure["date"] + 'T' + departure["gtfs:departureTime"])); + departure["departureTime"] = new Date(this._normalizeISO8601(departure["date"] + 'T' + departure["departureTime"])); var self = this; this._arrivalsQueueIndex = 0; + this._arrivalTimeOffsetKey = 'minutes'; + this._arrivalTimeOffsetUnits = 240; // amount of time that arrivals are possible after a departure + this._maxArrivalTime = null; // holds maximum arrivalTime that is possible for a departure this._getNextArrival(function (arrival) { if (arrival) { var processNextArrival = function () { self._getNextArrival(function (nextArrival) { if (nextArrival) { - //TODO: check if gtfs:stopSequence and maxStopSequence is set... otherwise, ignore this feature? - if (departure["gtfs:stopSequence"] != departure["maxStopSequence"]) { + if (self._maxArrivalTime != null) { + // Is arrival still possible + if (nextArrival["arrivalTime"].getTime() <= self._maxArrivalTime.getTime()) { + setImmediate(function () { + self._processArrival(departure, nextArrival, processNextArrival, done); + }); + } else { + // No possible arrival found + this._arrivalsQueueIndex = 0; // Turn back index to process arrivals again for next departure + this._maxArrivalTime = null; // reset + done(); // next departure + } + } else if (departure["stopSequence"] != departure["maxStopSequence"]) { setImmediate(function () { - debugger; self._processArrival(departure, nextArrival, processNextArrival, done); }); } else { + this._arrivalsQueueIndex = 0; // Turn back index to process arrivals again for next departure + this._maxArrivalTime = null; // reset done(); // next departure } } else { - console.error("no next arrival"); + // console.error("no next arrival"); + done(); } }); }; + // Is stopSequence feature available to know if end of trip is reached? + if (typeof departure["stopSequence"] === "undefined" || typeof departure["maxStopSequence"] === "undefined") { + // Use time offset of arrivalTimes that are possible + this._maxArrivalTime = moment(departure["departureTime"]).add(self._arrivalTimeOffsetUnits, self._arrivalTimeOffsetKey).toDate(); + } else { + this._maxArrivalTime = null; + } + //we can call nextArrival if no connections have been found, or done when a connection has been found self._processArrival(departure, arrival, processNextArrival, done); } else { - console.error("No beginning arrival found..."); + // console.error("No beginning arrival found..."); + done(); } }); } @@ -72,14 +89,14 @@ ArrDep2Connections.prototype._transform = function (departure, encoding, done) { ArrDep2Connections.prototype._createConnection = function (arrival, departure) { var connection = {}; connection["@type"] = "Connection"; - connection["arrivalTime"] = arrival["lc:arrivalTime"]; - connection["arrivalStop"] = arrival["gtfs:stop"]; - connection["departureTime"] = departure["lc:departureTime"]; - connection["departureStop"] = departure["gtfs:stop"]; - connection["gtfs:trip"] = departure["gtfs:trip"]; - if (departure["gtfs:headsign"] || arrival["gtfs:headsign"]) - connection["gtfs:headsign"] = departure["gtfs:headsign"] || arrival["gtfs:headsign"]; - connection["gtfs:route"] = departure["gtfs:route"]; + connection["arrivalTime"] = arrival["arrivalTime"]; + connection["arrivalStop"] = arrival["stop"]; + connection["departureTime"] = departure["departureTime"]; + connection["departureStop"] = departure["stop"]; + connection["trip"] = departure["trip"]; + if (departure["headsign"] || arrival["headsign"]) + connection["headsign"] = departure["headsign"] || arrival["headsign"]; + connection["route"] = departure["route"]; //TODO: extend with other... return connection; }; @@ -118,37 +135,77 @@ ArrDep2Connections.prototype._getNextArrival = function (cb) { cb(arrivalQueueItem); } else { this._arrivalsIterator.next(function (arrival) { - arrival["lc:arrivalTime"] = new Date(self._normalizeISO8601(arrival["date"] + 'T' + arrival["gtfs:arrivalTime"])); - self._arrivalsQueue.push(arrival); - cb(arrival); + if (arrival) { + arrival["arrivalTime"] = new Date(self._normalizeISO8601(arrival["date"] + 'T' + arrival["arrivalTime"])); + self._arrivalsQueue.push(arrival); + cb(arrival); + } else if (self._arrivalsQueue.length > 0) { + // Only possible when stopSequence feature is not available + // and there are connections with the same departure- and arrivalTime + self._arrivalsQueueIndex = 0; + cb(null); // No next arrival, so drop departure + } else { + cb(); + self.end(null,null, function () { + //console.error("finished"); + }); + } }); } } ArrDep2Connections.prototype._processArrival = function (departure, arrival, next, done) { - //check if arrival has the same trip id, if it does, we've got a winner: first thing always wins - var departureTime = departure["lc:departureTime"]; // e.g.: Date object 2015-09-09T00:01 - var arrivalTime = arrival["lc:arrivalTime"]; - var departureDateTime = new Date(departureTime); - var arrivalDateTime = new Date(arrivalTime); + console.error(arrival); + if (arrival) { + //check if arrival has the same trip id, if it does, we've got a winner: first thing always wins + var departureTime = departure["departureTime"]; // e.g.: Date object 2015-09-09T00:01 + var arrivalTime = arrival["arrivalTime"]; + var departureDateTime = new Date(departureTime); + var arrivalDateTime = new Date(arrivalTime); + + // Connections with equal arrival- and departuretimes are possible (e.g. De Lijn) + // Keep those to stay consistent with stops + if (arrivalDateTime < departureDateTime) { + //discart it (as this is only possible for the first X items, we can do shift and bring the arrivalsQueueIndex back to 0 + for (var i = 0; i < this._arrivalsQueueIndex; i++) { + this._arrivalsQueue.shift(); + } + this._arrivalsQueueIndex = 0; + next(); + } else if (departure["trip"] === arrival["trip"]) { + // If feature with stopSequence is available, we can make connections with departureTime equal to arrivalTime + if (departure['stopSequence'] && arrival['stopSequence'] && (departure['stopSequence'] == (arrival['stopSequence'] - 1))) { + var connection = this._createConnection(arrival, departure); + this._arrivalsQueueIndex = 0; + this.push(connection); + done(); + } else if (departureTime < arrivalTime) { + // first one to encounter each other: it's a match! + var connection = this._createConnection(arrival, departure); + this._arrivalsQueueIndex = 0; + this.push(connection); + done(); + } else { + // arrival is part of previous connection or feature stopSequence is not available + next(); + } + } else { + // Set maximum arrivalTime of connection, so no all arrivals have to be checked. (prevent memory issues) + if (!this._maxArrivalTime) { + this._maxArrivalTime = moment(departure["departureTime"]).add(this._arrivalTimeOffsetUnits, this._arrivalTimeOffsetKey).toDate(); + } - if (arrivalDateTime <= departureDateTime) { - //discart it (as this is only possible for the first X items, we can do shift and bring the arrivalsQueueIndex back to 0 - for (var i = 0; i < this._arrivalsQueueIndex; i++) { - this._arrivalsQueue.shift(); + if (arrival["arrivalTime"].getTime() <= this._maxArrivalTime.getTime()) { + //arrival is part of the next one part of another trip. + next(); + } else { + done(); + } } - this._arrivalsQueueIndex = 0; - next(); - } else if (departure["gtfs:trip"] === arrival["gtfs:trip"] && parseInt(departure["gtfs:stopSequence"])+1 === parseInt(arrival["gtfs:stopSequence"])) { - //TODO: what if stopSequence is not set? Can we still calculate arrivals and departures? - //first one to encounter each other: it's a match! - var connection = this._createConnection(arrival, departure); - this.push(connection); - done(); } else { - //arrival is part of the next one part of another trip. - next(); - //done(); + debugger; + // No matching arrival found + done(); } } diff --git a/package.json b/package.json index 7b04238..b3fc1c2 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,9 @@ "JSONStream": "^1.0.4", "jsonld": "^0.3.25", "jsonld-stream": "^1.0.2", - "moment": "^2.10.6" + "moment": "^2.10.6", + "mongodb": "^2.0.43", + "param": "^1.0.1" }, "devDependencies": { "should": "^6.0.3", diff --git a/test/data/arrivals.jsonldstream b/test/data/arrivals.jsonldstream index 5eff88e..65c18d5 100644 --- a/test/data/arrivals.jsonldstream +++ b/test/data/arrivals.jsonldstream @@ -1,8 +1,8 @@ -{ "@context" : {"gtfs": "http://vocab.gtfs.org/terms#", "Arrival" : "http://semweb.mmlab.be/ns/stoptimes#Arrival", "dct" : "http://purl.org/dc/terms/", "arrival" : "http://example.org/arrivals/", "trip" : "http://example.org/trips/", "stop":"http://example.org/stops/" } } -{ "@type" : "Arrival", "@id" : "arrival:1", "gtfs:arrivalTime" : "12:14", "dct:date" : "20160101", "gtfs:stop" : "stop:3", "gtfs:trip" : "trip:1", "maxStopSequence" : 4 , "gtfs:stopSequence" : 1 } -{ "@type" : "Arrival", "@id" : "arrival:2", "gtfs:arrivalTime" : "12:16", "dct:date" : "20160101", "gtfs:stop" : "stop:1", "gtfs:trip" : "trip:1", "maxStopSequence" : 4 , "gtfs:stopSequence" : 2 } -{ "@type" : "Arrival", "@id" : "arrival:3", "gtfs:arrivalTime" : "12:19", "dct:date" : "20160101", "gtfs:stop" : "stop:2", "gtfs:trip" : "trip:2", "maxStopSequence" : 2 , "gtfs:stopSequence" : 1 } -{ "@type" : "Arrival", "@id" : "arrival:4", "gtfs:arrivalTime" : "12:25", "dct:date" : "20160101", "gtfs:stop" : "stop:1", "gtfs:trip" : "trip:1","maxStopSequence" : 4 , "gtfs:stopSequence" : 3 } -{ "@type" : "Arrival", "@id" : "arrival:5", "gtfs:arrivalTime" : "12:30", "dct:date" : "20160101", "gtfs:stop" : "stop:2", "gtfs:trip" : "trip:1","maxStopSequence" : 4 , "gtfs:stopSequence" : 4 } -{ "@type" : "Arrival", "@id" : "arrival:6", "gtfs:arrivalTime" : "12:45", "dct:date" : "20160101", "gtfs:stop" : "stop:2", "gtfs:trip" : "trip:2", "maxStopSequence" : 2 , "gtfs:stopSequence" : 2 } -{ "@type" : "Arrival", "@id" : "arrival:7", "gtfs:arrivalTime" : "36:50", "dct:date" : "20153101", "gtfs:stop" : "stop:2", "gtfs:trip" : "trip:1","maxStopSequence" : 4 , "gtfs:stopSequence" : 4 } +{ "@context" : {"gtfs": "http://vocab.gtfs.org/terms#", "Arrival" : "http://semweb.mmlab.be/ns/stoptimes#Arrival", "dct" : "http://purl.org/dc/terms/", "date":"dct:date", "arrival" : "http://example.org/arrivals/", "trip" : "gtfs:trip", "stop":"gtfs:stop", "stopSequence":"gtfs:stopSequence", "arrivalTime":"gtfs:arrivalTime" } } +{ "@type" : "Arrival", "@id" : "arrival:1", "arrivalTime" : "12:14", "date" : "2016-01-01", "stop" : "stop3", "trip" : "trip1", "maxStopSequence" : 4 , "stopSequence" : 1 } +{ "@type" : "Arrival", "@id" : "arrival:2", "arrivalTime" : "12:16", "date" : "2016-01-01", "stop" : "stop1", "trip" : "trip1", "maxStopSequence" : 4 , "stopSequence" : 2 } +{ "@type" : "Arrival", "@id" : "arrival:3", "arrivalTime" : "12:19", "date" : "2016-01-01", "stop" : "stop2", "trip" : "trip2", "maxStopSequence" : 2 , "stopSequence" : 1 } +{ "@type" : "Arrival", "@id" : "arrival:4", "arrivalTime" : "12:25", "date" : "2016-01-01", "stop" : "stop1", "trip" : "trip1","maxStopSequence" : 4 , "stopSequence" : 3 } +{ "@type" : "Arrival", "@id" : "arrival:5", "arrivalTime" : "12:30", "date" : "2016-01-01", "stop" : "stop2", "trip" : "trip1","maxStopSequence" : 4 , "stopSequence" : 4 } +{ "@type" : "Arrival", "@id" : "arrival:6", "arrivalTime" : "12:45", "date" : "2016-01-01", "stop" : "stop2", "trip" : "trip2", "maxStopSequence" : 2 , "stopSequence" : 2 } +{ "@type" : "Arrival", "@id" : "arrival:7", "arrivalTime" : "12:50", "date" : "2016-31-01", "stop" : "stop2", "trip" : "trip1","maxStopSequence" : 4 , "stopSequence" : 4 } diff --git a/test/data/departures.jsonldstream b/test/data/departures.jsonldstream index 4f56796..a42607c 100644 --- a/test/data/departures.jsonldstream +++ b/test/data/departures.jsonldstream @@ -1,7 +1,6 @@ -{ "@context" : {"gtfs": "http://vocab.gtfs.org/terms#", "Departure" : "http://semweb.mmlab.be/ns/stoptimes#Departure", "dct" : "http://purl.org/dc/terms/", "departure" : "http://example.org/departures/", "trip" : "http://example.org/trips/", "stop":"http://example.org/stops/" } } -{ "@type" : "Departure", "@id" : "departure:1", "gtfs:departureTime" : "12:09", "dct:date" : "20160101", "gtfs:stop" : "stop:1", "gtfs:trip" : "trip:1", "maxStopSequence" : 4 , "gtfs:stopSequence" : 1 } -{ "@type" : "Departure", "@id" : "departure:2", "gtfs:departureTime" : "36:11", "dct:date" : "20153101", "gtfs:stop" : "stop:3", "gtfs:trip" : "trip:1", "maxStopSequence" : 4 , "gtfs:stopSequence" : 3 } -{ "@type" : "Departure", "@id" : "departure:3", "gtfs:departureTime" : "12:14", "dct:date" : "20160101", "gtfs:stop" : "stop:1", "gtfs:trip" : "trip:2","maxStopSequence" : 2 , "gtfs:stopSequence" : 2 } -{ "@type" : "Departure", "@id" : "departure:4", "gtfs:departureTime" : "12:20", "dct:date" : "20160101", "gtfs:stop" : "stop:2", "gtfs:trip" : "trip:1","maxStopSequence" : 4 , "gtfs:stopSequence" : 3 } -{ "@type" : "Departure", "@id" : "departure:5", "gtfs:departureTime" : "12:25", "dct:date" : "20160101", "gtfs:stop" : "stop:1", "gtfs:trip" : "trip:1","maxStopSequence" : 4 , "gtfs:stopSequence" : 4 } -{ "@type" : "Departure", "@id" : "departure:6", "gtfs:departureTime" : "12:40", "dct:date" : "20160101", "gtfs:stop" : "stop:3", "gtfs:trip" : "trip:2","maxStopSequence" : 2 , "gtfs:stopSequence" : 2 } +{ "@context" : {"gtfs": "http://vocab.gtfs.org/terms#", "Departure" : "http://semweb.mmlab.be/ns/stoptimes#Departure", "dct" : "http://purl.org/dc/terms/", "date":"dct:date", "departure" : "http://example.org/departure/", "trip" : "gtfs:trip", "stop":"gtfs:stop", "stopSequence":"gtfs:stopSequence", "arrivalTime":"gtfs:arrivalTime" } } +{ "@type" : "Departure", "@id" : "departure:1", "departureTime" : "12:09", "date" : "2016-01-01", "stop" : "stop1", "trip" : "trip1", "maxStopSequence" : 4 , "stopSequence" : 1 } +{ "@type" : "Departure", "@id" : "departure:3", "departureTime" : "12:14", "date" : "2016-01-01", "stop" : "stop1", "trip" : "trip2", "maxStopSequence" : 2 , "stopSequence" : 2 } +{ "@type" : "Departure", "@id" : "departure:4", "departureTime" : "12:20", "date" : "2016-01-01", "stop" : "stop2", "trip" : "trip1", "maxStopSequence" : 4 , "stopSequence" : 3 } +{ "@type" : "Departure", "@id" : "departure:5", "departureTime" : "12:25", "date" : "2016-01-01", "stop" : "stop1", "trip" : "trip1", "maxStopSequence" : 4 , "stopSequence" : 4 } +{ "@type" : "Departure", "@id" : "departure:6", "departureTime" : "12:40", "date" : "2016-01-01", "stop" : "stop3", "trip" : "trip2", "maxStopSequence" : 2 , "stopSequence" : 2 } diff --git a/test/test.js b/test/test.js index e9e5e69..d0e6863 100644 --- a/test/test.js +++ b/test/test.js @@ -1,6 +1,7 @@ var fs = require('fs'), should = require('should'), jsonldstream = require('jsonld-stream'), + param = require('param'), ArrDep2Connections = require('../lib/arrdep2connections.js'); describe('Creating lists', function () { @@ -10,17 +11,23 @@ describe('Creating lists', function () { var departuresStream = fs.createReadStream('./test/data/departures.jsonldstream', {encoding: 'utf8'}).pipe(new jsonldstream.Deserializer()); //Now, they should be able to emit connections when combined in the code it('should be emitting connections', function (done) { - - var transformer = new ArrDep2Connections(arrivalsStream); + // Read standard configuration parameters + var options = {}; + options['mongoDb'] = param('mongodb'); + options['mongoDbConfig'] = param('mongoDbConfig'); + options['inbound'] = param('inbound'); + options['outbound'] = param('outbound'); + + var transformer = new ArrDep2Connections(arrivalsStream, options); var stream = departuresStream.pipe(transformer); var count = 0; stream.on("data", function (connection) { if (connection && !connection["@context"]) { - count ++; + count++; } }); stream.on("end", function () { - count.should.be.exactly(6); + count.should.be.exactly(2); done(); }); });