Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
demian85 committed May 27, 2012
1 parent 1df59bc commit 283c2d4
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules
lib/cache/*
19 changes: 19 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2010 Demián Rodriguez ([email protected])

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
69 changes: 50 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,60 @@
gnip
====

Connect to Gnip streaming API and manage rules.
Connect to Gnip streaming API and manage rules.
You must have a Gnip account with any data source available, like Twitter Power Track.

JSON is the only supported stream format, so you must enable data normalization in your admin panel.
Currenly, this module only supports JSON activity stream format, so you must enable data normalization in your admin panel.

# Gnip.Stream
This class is an EventEmitter and allows you to connect to the stream and start receiving data.

## API methods

#### stream.start()
Connect to the stream and start receiving data. At this point you should have registered at least one event listener for any of these events: 'data', 'object' or 'tweet'.

#### stream.end()
Terminates the connection.

## Events

###### ready
###### data
###### error
###### object
###### tweet
###### delete
###### end

# Gnip.Rules
This class allows you to manage an unlimited number of tracking rules.

## API methods

#### rules.getAll(Function callback)
Get cached rules.

#### rules.update(Array rules, Function callback)
Creates or replaces the live tracking rules.
Rules are sent in batches of 5000 (API limit), so you can pass an unlimited number of rules.
The current tracking rules are stored in a local JSON file so you can update the existing rules efficiently without having to remove them all.

**The following methods uses Gnip API directly and ignores the local cache. Avoid when possible.**
#### rules.live.update(Array rules, Function callback)
#### rules.live.add(Array rules, Function callback)
#### rules.live.remove(Array rules, Function callback)
#### rules.live.getAll(Function callback)
#### rules.live.removeAll(Function callback)


Example Usage
====
var Gnip = require('gnip');

var stream = new Gnip.Stream({
url : 'https://stream.gnip.com:443/accounts/xxx/publishers/twitter/streams/track/prod.json',
username : 'xxx',
user : 'xxx',
password : 'xxx'
});
stream.on('ready', function() {
Expand All @@ -25,26 +67,15 @@ Example Usage
console.error(err);
});

var rules = new Gnip.Rules('https://api.gnip.com:443/accounts/xxx/publishers/twitter/streams/track/prod/rules.json', 'xxx', 'xxx');
var rules = new Gnip.Rules({
url : 'https://api.gnip.com:443/accounts/xxx/publishers/twitter/streams/track/prod/rules.json',
user : 'xxx',
password : 'xxx'
});

// create or replace tracking rules and update local cache.
// rules are sent in batches of 5000 (API limit), so you can pass an unlimited number of rules.
rules.update(['#hashtag', 'keyword', '@user'], function(err) {
if (err) throw err;
stream.start();
});

API methods
====
stream.start()
stream.end()

rules.getAll(Function callback)
rules.update(Array rules, Function callback)
rules.live.update(Array rules, Function callback)
rules.live.add(Array rules, Function callback)
rules.live.remove(Array rules, Function callback)
rules.live.getAll(Function callback)
rules.live.removeAll(Function callback)

More details and tests soon...
24 changes: 9 additions & 15 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
/**
* Connects to Gnip streaming api.
*
* @author Demián Rodriguez <[email protected]>
* @version 0.1.0
*/
var EventEmitter = require('events').EventEmitter,
JSONParser = require('./JSONParser').JSONParser,
util = require('util'),
GnipRules = require('./rules'),
url = require('url'),
zlib = require('zlib'),
_ = require('underscore'),
https = require('https');

/**
* Connects to Gnip streaming api and tracks keywords.
* All errors are emitted as 'error' events.
*
* @param options Object with the following properties:
* - (String) username
* - (String) user
* - (String) password
* - (String) userAgent
* - (String) streamURL
* - (String) url
*
* Events:
* - data: function(String data) {...}
Expand All @@ -36,11 +30,11 @@ var GnipStream = function(options) {

var self = this;

self.options = Object.merge({
username : '',
self.options = _.extend({
user : '',
password : '',
userAgent : null,
streamURL : null
url : null
}, options || {});

self._req = null;
Expand All @@ -66,13 +60,13 @@ GnipStream.prototype._basicAuth = function(user, pass) {
GnipStream.prototype.start = function() {
var self = this;

if (!self.options.streamURL) throw new Error('Invalid end point specified!');
if (!self.options.url) throw new Error('Invalid end point specified!');

if (self._req) self.end();

var streamUrl = require('url').parse(self.options.streamURL);
var streamUrl = require('url').parse(self.options.url);
var headers = {
'Authorization' : self._basicAuth(self.options.username, self.options.password),
'Authorization' : self._basicAuth(self.options.user, self.options.password),
'Accept-Encoding' : 'gzip',
'Connection' : 'keep-alive'
};
Expand Down
27 changes: 22 additions & 5 deletions lib/rules.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
var request = require('request'),
crypto = require('crypto'),
_ = require('underscore'),
fs = require('fs'),
path = require('path'),
async = require('async');

var LiveRules = function(endPoint, user, password) {
Expand Down Expand Up @@ -104,11 +107,21 @@ LiveRules.prototype.removeAll = function(cb) {



var GnipRules = function(endPoint, user, password) {
this._api = endPoint;
this._auth = "Basic " + new Buffer(user + ':' + password).toString('base64');
this._cacheFile = __dirname + '/' + crypto.createHash('md5').update(endPoint).digest('hex') + '.json';
this.live = new LiveRules(endPoint, user, password);
var GnipRules = function(options) {
this.options = _.extend({
user : '',
password : '',
url : null
}, options || {});

this._api = this.options.url;
this._auth = "Basic " + new Buffer(this.options.user + ':' + this.options.password).toString('base64');
this._cacheFile = __dirname + '/cache/' + crypto.createHash('md5').update(this._api).digest('hex') + '.json';
this.live = new LiveRules(this._api, this.options.user, this.options.password);

if (!path.existsSync(this._cacheFile)) {
fs.writeFileSync(this._cacheFile, '[]', 'utf8');
}
};

GnipRules.prototype._deleteRulesBatch = function(rules, max, cb) {
Expand Down Expand Up @@ -150,6 +163,10 @@ GnipRules.prototype.getAll = function(cb) {
});
};

GnipRules.prototype.clearCache = function(cb) {
fs.writeFile(this._cacheFile, '[]', 'utf8', cb);
};

GnipRules.prototype.update = function(rules, cb) {

var self = this;
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
"node": ">=0.6"
},
"dependencies": {
"request" : ">=2.9.100",
"async" : ">=0.1.1"
"request" : "2.9.x",
"async" : "0.1.x",
"underscore" : "1.3.x"
},
"devDependencies": {}
}
70 changes: 70 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
var Gnip = require('gnip'),
async = require('async');

var stream = new Gnip.Stream({
url : 'https://stream.gnip.com:443/accounts/xxx/publishers/twitter/streams/track/dev.json',
user : 'xxx',
password : 'xxx'
});
stream.on('ready', function() {
console.log('Stream ready!');
});
stream.on('tweet', function(tweet) {
console.log('New tweet: ' + tweet.body);
});
stream.on('error', function(err) {
console.error(err);
});

var rules = new Gnip.Rules({
url : 'https://api.gnip.com:443/accounts/xxx/publishers/twitter/streams/track/dev/rules.json',
user : 'xxx',
password : 'xxx'
});

rules.live.removeAll(function(err) {
if (err) throw err;

rules.clearCache();

rules.update(['nodeJS'], function(err) {
if (err) throw err;

stream.start();

async.series([
async.apply(test, ['ford', 'peugeot']),
async.apply(test, ['peugeot']),
async.apply(test, ['volkswagen', 'audi', {value:'ferrari'}])
], function(err) {
if (err) throw err;
});
});
});

function test(r, cb) {
console.log('Testing rule update...');

rules.update(r, function(err) {
if (err) throw err;

async.series([
function(cb) {
rules.getAll(function(err, rules) {
if (err) throw err;
console.log('Local rules:')
console.log(rules)
cb();
});
},
function(cb) {
rules.live.getAll(function(err, rules) {
if (err) throw err;
console.log('Live rules:')
console.log(rules)
cb();
})
}
], cb);
});
}

0 comments on commit 283c2d4

Please sign in to comment.