Skip to content

Commit

Permalink
Working pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rocket-ron committed Aug 21, 2016
1 parent 248b217 commit f1ff14c
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 162 deletions.
5 changes: 5 additions & 0 deletions mongodb/roles/mongod/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@
sudo: yes
service: name=mongod state=started

- name: Create the tweet sentiment collection
shell: mongo tweets --eval "db.createCollection('sentiment')

- name: Create the sentiment geospatial index
shell: mongo tweets --eval 'db.sentiment.createIndex({ coordinates: "2dsphere" })'
268 changes: 115 additions & 153 deletions rest/.idea/workspace.xml

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions rest/rest.py → rest/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask_restful import Resource, Api
import unittest
import json
from pymongo import MongoClient


"""
Expand All @@ -12,7 +13,7 @@
longitude
radius (meters)
GET http://localhost:5000/sentiment?lat=40.7128N?lon=74.0059W?radius=5000
GET http://localhost:5000/sentiment?lat=40.7128?lon=74.0059?radius=5000
The response is a JSON object with the following structure
Expand Down Expand Up @@ -52,8 +53,8 @@ def get(self):
api.add_resource(Sentiment, '/')

# run the app server
# if __name__ == '__main__':
# app.run(debug=True)
if __name__ == '__main__':
app.run(debug=True)


"""
Expand Down Expand Up @@ -104,5 +105,5 @@ def test_bad_path(self):


# Run the unit tests
if __name__ == '__main__':
unittest.main()
#if __name__ == '__main__':
# unittest.main()
38 changes: 38 additions & 0 deletions rest/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
aniso8601==1.1.0
awsebcli==3.7.7
blessed==1.9.5
botocore==1.4.48
cement==2.8.2
click==6.6
colorama==0.3.7
docker-py==1.7.2
dockerpty==0.4.1
docopt==0.6.2
docutils==0.12
Flask==0.11.1
Flask-RESTful==0.3.5
itsdangerous==0.24
Jinja2==2.8
jmespath==0.9.0
kafka==1.3.1
MarkupSafe==0.23
nltk==3.2.1
oauthlib==1.1.2
pathspec==0.3.4
pymongo==3.3.0
python-dateutil==2.5.3
pytz==2016.6.1
PyYAML==3.11
redis==2.10.5
requests==2.9.1
requests-oauthlib==0.6.2
rq==0.6.0
semantic-version==2.5.0
six==1.10.0
termcolor==1.1.0
textblob==0.11.1
texttable==0.8.4
tweepy==3.5.0
wcwidth==0.1.7
websocket-client==0.37.0
Werkzeug==0.11.10
2 changes: 1 addition & 1 deletion twitter/sentimentConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

for message in consumer:
try:
job = q.enqueue(process_tweet, args=(message, args.mongohost, args.mongoport, 'tweets'))
job = q.enqueue(process_tweet, args=(message.value, args.mongohost, args.mongoport, 'tweets'))

except KeyboardInterrupt:
consumer.close()
3 changes: 2 additions & 1 deletion twitter/sentiments/sentimentProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@


def process_tweet(tweet, host, port, db):
print ('processing...')
if 'text' in tweet:
print 'processing...'
mongo_client = MongoClient(host=host, port=port, db=db)
mongo_client = MongoClient(host=host, port=port)
mongodb = mongo_client[db]
tweet_text_blob = TextBlob(tweet['text'])
result = dict(text=tweet['text'],
Expand Down
5 changes: 3 additions & 2 deletions twitter/twitter_utils/twitterStreamListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def on_data(self, data):
if not self.caughtInterrupt:
tweet = json.loads(data)
if self.geo_only and 'coordinates' in tweet and tweet['lang'] == 'en':
self.serializer.write(tweet)
return True
if tweet['coordinates'] is not None:
self.serializer.write(tweet)
return True
else:
self.serializer.end()
return False
Expand Down

0 comments on commit f1ff14c

Please sign in to comment.