Skip to content

Commit

Permalink
Merge pull request #5 from brechtvdv/master
Browse files Browse the repository at this point in the history
Improvements
  • Loading branch information
Pieter Colpaert committed Oct 7, 2015
2 parents f795ce7 + 84516a2 commit e81eb21
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 122 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Arrivals and Departures to Connections

__Status__: not functional - work in progress

This javascript library transforms a stream of "arrivals and departures" to "connections".
This javascript library transforms a [JSON-LD stream](https://github.com/pietercolpaert/jsonld-stream) of "arrivals and departures" to "connections".

This code is useful for route planning applications where arrivals and departures are easy to generate, but connections are difficult to get right in the same processing.

Expand All @@ -17,10 +15,16 @@ npm install -g arrdep2connections
Now you can use the code on top of 2 files which follow the [JSON-LD stream specification](https://github.com/pietercolpaert/jsonld-stream):

```bash
arrdep2connections arrivalsfile.json departurefile.json -i context.json > connections.json
node arrdep2connections --arrivals arrivals.jsonldstream --departures departures.jsonldstream [--inbound context.json] > connections.jsonldstream
```

Optionally, you can specify a different inbound `context.json` using the `-i` flag, and a different outbound context by using the `-o` flag.
Optionally, you can specify a different inbound `context.json` using the `--inbound` flag, and a different outbound context by using the `--outbound` flag.

You can also load connections into MongoDB using the `--mongodb` flag. See `config/development.json` for more configuration options.

```bash
node arrdep2connections --arrivals arrivals.jsonldstream --departures departures.jsonldstream --mongodb
```

### NodeJS library

Expand Down
62 changes: 37 additions & 25 deletions arrdep2connections.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
var ReadStream = require('./lib/ReadStream.js'),
ArrDep2Connections = require('./lib/arrdep2connections.js'),
var ArrDep2Connections = require('./lib/arrdep2connections.js'),
stringify = require('JSONStream').stringify(false),
jsonldstream = require('jsonld-stream'),
param = require('param'),
MongoClient = require('mongodb').MongoClient,
StreamToMongo = require('./lib/StreamToMongo.js'),
fs = require('fs');

// Read filename arrivals.json and departure.json from parameters
var arrivalsFilename = './' + process.argv[2];
var arrivalData = require(arrivalsFilename);
var departuresFilename = './' + process.argv[3];
var departureData = require(departuresFilename);
// Read filename arrivals.jsonldstream and departure.jsonldstream from parameters
var arrivals = param('arrivals');
var departures = param('departures');

// Initialise streams
var arrivalStream = new ReadStream(arrivalData);
var departureStream = new ReadStream(departureData);
var arrdep2connections = new ArrDep2Connections(arrivalStream); // our transform stream
// Read extra configuration parameters
var options = {};
options['mongoDb'] = param('mongodb');
options['mongoDbConfig'] = param('mongoDbConfig');
options['inbound'] = param('inbound');
options['outbound'] = param('outbound');

departureStream.pipe(arrdep2connections).pipe(stringify).pipe(process.stdout);
// Initialise streams
var arrivalStream = fs.createReadStream(arrivals, {encoding: 'utf8'}).pipe(new jsonldstream.Deserializer());
var departureStream = fs.createReadStream(departures, {encoding: 'utf8'}).pipe(new jsonldstream.Deserializer());
var arrdep2connections = new ArrDep2Connections(arrivalStream, options); // our transform stream

// Check parameter 4 for optional inbound or outbound
if (process.argv[4] && process.argv[4] === '-i') {
var inbound = process.argv[5];
} else if (process.argv[4] && process.argv[4] === '-o') {
var outbound = process.argv[5];
}
// Load in MongoDB
if (options.mongoDb === true ) {
var url = 'mongodb://' + options.mongoDbConfig.host + ':' + options.mongoDbConfig.port + '/' + options.mongoDbConfig.database;

// Check parameter 6 for optional inbound or outbound
if (process.argv[6] && process.argv[6] === '-i') {
var inbound = process.argv[7];
} else if (process.argv[6] && process.argv[6] === '-o') {
var outbound = process.argv[7];
}
// First empty collection
MongoClient.connect(url, function(err, db) {
if (err) {
die("Wasn't able to connect to MongoDB server. Check if your server is running.", url);
}

// TODO: taking care of in/outbound
var collection = db.collection(options.mongoDbConfig.collection);
collection.remove(); // empty the collection
var streamToMongo = new StreamToMongo(collection);
var stream = departureStream.pipe(arrdep2connections).pipe(streamToMongo).on('finish', function () {
db.close(); // close connection
});
});
} else {
// Write to stdout
departureStream.pipe(arrdep2connections).pipe(stringify).pipe(process.stdout);
}
15 changes: 13 additions & 2 deletions arrdepcontext.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
{
"@context" : {
"st" : "http://semweb.mmlab.be/ns/stoptimes#",
"Arrival" : "st:Arrival",
"Departure" : "st:Departure",
"gtfs" : "http://vocab.gtfs.org/terms#",
"arrivalTime" : "gtfs:arrivalTime",
"departureTime" : "gtfs:departureTime",
"date" : "http://purl.org/dc/terms/date",
"stop" : {
"@type" : "@id",
"@id" : "gtfs:stop"
}
},
"trip" : {
"@type" : "@id",
"@id" : "gtfs:trip"
},
"route" : {
"@type" : "@id",
"@id" : "gtfs:route"
},
"stopSequence" : "gtfs:stopSequence"
}
}
}
14 changes: 14 additions & 0 deletions config/development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"mongoDbConfig": {
"host": "localhost",
"port": "27017",
"database": "gtfs",
"collection": "connections"
},
"mongodb": false,
"inbound": "arrdepcontext.json",
"outbound": "connectionscontext.json",
"arrivals": "./arrivals.jsonldstream",
"departures": "./departures.jsonldstream",
"self": "{./development.json}"
}
14 changes: 13 additions & 1 deletion connectionscontext.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"@context" : {
"lc" : "http://semweb.mmlab.be/ns/linkedconnections#",
"gtfs" : "http://vocab.gtfs.org/terms#",
"Connection" : "http://semweb.mmlab.be/ns/linkedconnections#Connection",
"dct" : "http://purl.org/dc/terms/",
"date" : "dct:date",
"arrivalTime" : "lc:arrivalTime",
"departureTime" : "lc:departureTime",
"arrivalStop" : {
Expand All @@ -11,6 +14,15 @@
"departureStop" : {
"@type" : "@id",
"@id" : "http://semweb.mmlab.be/ns/linkedconnections#departureStop"
}
},
"trip" : {
"@type" : "@id",
"@id" : "gtfs:trip"
},
"route" : {
"@type" : "@id",
"@id" : "gtfs:route"
},
"headsign" : "gtfs:headsign"
}
}
19 changes: 0 additions & 19 deletions lib/ReadStream.js

This file was deleted.

5 changes: 5 additions & 0 deletions lib/StreamIterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ EventEmitter.prototype._maxListeners = 1000;
var StreamIterator = function (stream) {
this._stream = stream;
this["@context"] = {};
var self = this;
this._stream.on("end", function () {
self._cb();
});
};

util.inherits(StreamIterator, EventEmitter);
Expand All @@ -18,6 +22,7 @@ StreamIterator.prototype.next = function (callback) {
object = null;
}
if (!object) {
this._cb = callback;
this._stream.once("readable", function () {;
self.next(callback);
});
Expand Down
23 changes: 23 additions & 0 deletions lib/StreamToMongo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
var Writable = require('stream').Writable;
var util = require('util');

function StreamToMongo(collection) {
Writable.call(this, { objectMode: true });

this._collection = collection;
}

util.inherits(StreamToMongo, Writable);

StreamToMongo.prototype._write = function (obj, encoding, done) {
this._collection.insert(obj, function(err, result) {
if (!err) {
//console.log(result);
done();
} else {
done(err);
}
});
};

module.exports = StreamToMongo;
Loading

0 comments on commit e81eb21

Please sign in to comment.