diff --git a/README.md b/README.md
index 7ea3904..ce94651 100644
--- a/README.md
+++ b/README.md
@@ -19,14 +19,30 @@ Build and deploy a system in Python for sentiment analysis of public, geolocated
Set up a simple consumer of the Twitter Streaming API to get a stream of all public, geolocated tweets (filter out any post without geo). Tweedy (http://tweepy.readthedocs.org/en/v3.5.0/streaming_how_to.html) is a good library for consuming the streaming API. These tweets should be dispatched to your sentiment analysis workers through a message broker like RabbitMQ, Redis, or Kafka.
+Kafka was chosen because of the ease of setup on AWS EC2. A Vagrant startup file and Ansible Zookeeper and Kafka configuration playbook are executed to create and start the Kafka server.
+
+The Twitter stream processor listens to the Twitter Streaming API and places tweets that have geolocation data and are english language onto the Kafka topic.
+
### Sentiment Analysis Pipeline
Set up a worker that will consume tweets from your message broker and perform sentiment analysis using TextBlob (https://textblob.readthedocs.org/en/dev/quickstart.html#sentiment-analysis) or a similar library. We recommend using a task queueing system such as Celery (http://www.celeryproject.org/) or RQ (http://python-rq.org/) though you can also use the message broker directly. This component should be designed to scale to multiple machines easily and tolerate instance failures. The results of sentiment analysis should be stored in a MongoDB collection for retrieval from the API. See Sentiment API doc below to figure out which fields will need to be persisted.
+An AWS ElasticCache backed with Redis was created for ease of creation and management, and the Python RQ library was chosen to work with it also because of its simplicity.
+
+A Python command line program was created to consume messages from the Kafka topic (tweets) and queue them onto an RQ queue for sentiment processing with TextBlob.
+
+Another Python program implements the sentiment processor worker that dequeues the processing job, executes the sentiment analysis and places the results into MongoDB. The results consist of the sentiment scores, tweet text and location data.
+
+Both of these Python programs are simple and multiples of them may be executed on machines with available cores, which allows quick horizontal scaling. However more management tooling is necessary to create larger sets of parallel processes to queue and de-queue these tasks. Currently these programs run on a t2.micro instance, one per program.
+
### Database
Set up a MongoDB collection with indexing to support the structure of the API response as presented below.
+The MongoDB is a single instance AWS EC2 m3.large SSD instance. It isn't a large server in order to keep costs down, but is set up with SSD. A single sentiment collection contains each analyzed tweet text, sentiment scores and location document.
+A geolocation 2dsphere index is created on the `location` field of the documents to allow use of MongoDB's geo-query functionality. In this way we can use an aggregation query to compute the average, min, and max sentiment scores that fall within a 2d sphere of a given radius.
+
+
### Sentiment API
Design a RESTful API with Flask (http://flask.pocoo.org/), Bottle (http://bottlepy.org/docs/dev/index.html), or a similar library that allows a user to query the average sentiment at a location by providing a latitude, longitude, and radius. The API should provide a JSON response in the following format:
@@ -43,8 +59,14 @@ Design a RESTful API with Flask (http://flask.pocoo.org/), Bottle (http://bottle
"coordinates": [-75.14311344, 40.05701716]
}
}
-
+
### Deployment
Project should be hosted in a public repository on Github or Bitbucket. The system should be deployed on AWS EC2, Elasticbeanstalk, Heroku, Google AppEngine, or a similar service.
+The REST API is located at `http://develop.8hum4jfqxp.us-west-1.elasticbeanstalk.com`
+
+A sample query is `http://develop.8hum4jfqxp.us-west-1.elasticbeanstalk.com/sentiment?lat=40.9&lon=-75.0&dist=100` which looks for tweets in a 100km radius around the geographical point given by the lat/lon coordinates.
+
+
+
diff --git a/rest/.ebignore b/rest/.ebignore
new file mode 100644
index 0000000..0e317d8
--- /dev/null
+++ b/rest/.ebignore
@@ -0,0 +1,7 @@
+# pycharm project files
+.idea/*
+
+# Elastic Beanstalk Files
+.elasticbeanstalk/*
+!.elasticbeanstalk/*.cfg.yml
+!.elasticbeanstalk/*.global.yml
diff --git a/rest/.gitignore b/rest/.gitignore
new file mode 100644
index 0000000..bca646a
--- /dev/null
+++ b/rest/.gitignore
@@ -0,0 +1,5 @@
+
+# Elastic Beanstalk Files
+.elasticbeanstalk/*
+!.elasticbeanstalk/*.cfg.yml
+!.elasticbeanstalk/*.global.yml
diff --git a/rest/.idea/workspace.xml b/rest/.idea/workspace.xml
index cecd40a..d3c8d39 100644
--- a/rest/.idea/workspace.xml
+++ b/rest/.idea/workspace.xml
@@ -5,12 +5,11 @@
-
+
+
+
-
-
-
-
+
@@ -39,18 +38,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
@@ -58,31 +45,28 @@
-
-
-
-
-
+
+
-
-
+
+
-
+
-
-
+
+
-
-
+
+
@@ -122,12 +106,14 @@
-
-
+
+
-
-
-
+
+
+
+
+
@@ -135,8 +121,8 @@
-
-
+
+
@@ -144,6 +130,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -195,6 +197,10 @@
+
+
+
+
@@ -205,8 +211,8 @@
DEFINITION_ORDER
-
-
+
+
@@ -251,7 +257,7 @@
-
+
@@ -260,7 +266,7 @@
-
+
@@ -612,7 +618,7 @@
-
+
@@ -689,42 +695,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -958,19 +928,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -979,16 +936,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -1015,6 +962,58 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1027,22 +1026,14 @@
-
-
+
+
-
-
-
-
-
-
-
-
@@ -1053,21 +1044,41 @@
-
+
-
-
-
+
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/rest/application.py b/rest/application.py
index f46401b..1aaa318 100644
--- a/rest/application.py
+++ b/rest/application.py
@@ -1,10 +1,19 @@
from flask import Flask
-from flask_restful import Resource, Api
+from flask_restful import Resource, Api, reqparse
import unittest
import json
+import os
from pymongo import MongoClient
+mongo_client = MongoClient(host='ec2-54-193-5-118.us-west-1.compute.amazonaws.com')
+mongodb = mongo_client['tweets']
+
+parser = reqparse.RequestParser()
+parser.add_argument('lat', type=float, required=True)
+parser.add_argument('lon', type=float, required=True)
+parser.add_argument('dist', type=float, required=True)
+
"""
RESTful API for Sentiment Analysis
@@ -13,7 +22,7 @@
longitude
radius (meters)
- GET http://localhost:5000/sentiment?lat=40.7128?lon=74.0059?radius=5000
+ GET http://localhost:5000/sentiment?lat=40.7128?lon=74.0059?dist=5000
The response is a JSON object with the following structure
@@ -30,31 +39,91 @@
}
}
+ db.sentiment.find({ coordinates: {$geoWithin: {$centerSphere: [ [ -74, 40.74 ], 10/6371] } } } )
+
+ radius of earth in kilometers = 6371
+
"""
# Implement the sentiment REST GET query
class Sentiment(Resource):
def get(self):
- return {'tweets': 100,
- 'average_polarity': 0.4,
- 'most_positive': {
- 'text': 'what a great day!',
- 'coordinates': [-75.14310264, 40.05701649]
- },
- 'most_negative': {
- 'text': 'worst lunch ever!',
- 'coordinates': [-75.14311344, 40.05701716]
- }}
-
-app = Flask(__name__)
-api = Api(app)
-
-api.add_resource(Sentiment, '/')
+ args = parser.parse_args()
+ if 'lat' in args and 'lon' in args and args['lat'] and args['lon']:
+ if args['lat']:
+ latitude = float(args['lat'])
+ if args['lon']:
+ longitude = float(args['lon'])
+ if 'dist' in args:
+ # distance should be in kilometers
+ distance = float(args['dist'])/6371
+ else:
+ distance = 0.0
+ cursor = mongodb['sentiment'].aggregate(
+ [{"$match": {"coordinates": {"$geoWithin": {"$centerSphere": [[longitude, latitude], distance]}}}},
+ {"$group": {
+ "_id": "null",
+ "tweets": {"$sum": 1},
+ "average_polarity": {"$avg": "$polarity"},
+ "most_positive": {"$max": "$polarity"},
+ "most_negative": {"$min": "$polarity"},
+ "docs": {"$push": {
+ "polarity": "$polarity",
+ "text": "$text",
+ "coordinates": "$coordinates.coordinates"
+ }}
+ }},
+ {"$project": {
+ "tweets": 1,
+ "average_polarity": 1,
+ "most_positive": {
+ "$setDifference": [
+ {"$map": {
+ "input": "$docs",
+ "as": "doc",
+ "in": {
+ "$cond": [
+ {"$eq": ["$most_positive", "$$doc.polarity"]},
+ "$$doc",
+ "false"
+ ]}
+ }},
+ ["false"]
+ ]
+ },
+ "most_negative": {
+ "$setDifference": [
+ {"$map": {
+ "input": "$docs",
+ "as": "doc",
+ "in": {
+ "$cond": [
+ {"$eq": ["$most_negative", "$$doc.polarity"]},
+ "$$doc",
+ "false"
+ ]
+ }
+ }},
+ ["false"]
+ ]}
+ }}
+ ])
+ for result in cursor:
+ return result
+ else:
+ return "Latitude: {0} Longitude: {1} distance: {2}".format(args['lat'], args['lon'], args['dist'])
+
+
+application = Flask(__name__)
+api = Api(application)
+
+api.add_resource(Sentiment, '/sentiment')
+
# run the app server
if __name__ == '__main__':
- app.run(debug=True)
+ application.run()
"""
@@ -76,7 +145,7 @@ def tearDownClass(cls):
def setUp(self):
# creates a Flask test client
- self.app = app.test_client()
+ self.app = application.test_client()
# propogate exceptions to the test client
self.app.testing = True
diff --git a/rest/config.py b/rest/config.py
new file mode 100644
index 0000000..ec81578
--- /dev/null
+++ b/rest/config.py
@@ -0,0 +1,25 @@
+import os
+
+class Config:
+ MONGO_CONNECTION_STRING = os.environ.get('MONGO_CONNECTION_STRING')
+
+ @staticmethod
+ def init_app(app):
+ pass
+
+class DevelopmentConfig(Config):
+ DEBUG = True
+
+class TestingConfig(Config):
+ TESTING = True
+
+class ProductionConfig(Config):
+ pass
+
+
+config = {
+ 'development': DevelopmentConfig,
+ 'testing': TestingConfig,
+ 'production': ProductionConfig,
+ 'default': DevelopmentConfig
+}
\ No newline at end of file
diff --git a/twitter/README.md b/twitter/README.md
new file mode 100644
index 0000000..4b62660
--- /dev/null
+++ b/twitter/README.md
@@ -0,0 +1,27 @@
+# Twitter Stream Processors
+
+There are 3 components in the Twitter stream processing:
+
+- The Twitter stream listener, which registers for the Twitter stream and places tweets in a Kafka topic
+
+- The Kafka consumer, which takes each tweet off the Kafka topic and queues the sentiment processor job for the tweet
+
+- The Sentiment processor, which measures the tweet sentiment polarity and subjectivity scores and writes the results to MongoDB
+
+## Twitter Stream Listener
+
+The Twitter stream listener is a simple Python program that uses a single Twitter API key to register as a streaming API consumer. The program runs from the command line with command line arguments
+to specify the Twitter API key, the type of stream events if which we're interested such as tracking or location, and the Kafka connection information.
+
+The stream listener registers for tweets that contain location data and are english language tweets. However, not all tweets meet this criteria that are intercepted by the stream listener so additional
+program checks are made to take only those tweets that have geolocation data and have an english language specification. Once those criteria are met the tweet is placed in the Kafka topic.
+
+## Kafka Tweet Consumer
+
+The Kafka tweet consumer is a simple Python program that registers to consume messages from the 'tweets' topic of the Kafka server. For every tweet consumed from the topic a job is scheduled on a Python RQ
+task queue. This allows decoupling of the execution of the sentiment analysis and storing of the results in MongoDB from the tweet stream consumption.
+
+## Sentiment Processor
+
+The Python RQ task executes on a simple worker program that executes the sentiment analysis on the tweet text using TextBlob, resulting in a polarity and subjectivity score. Both scores are added to a dictionary
+along with the tweet text and coordinates and stored as a BSON document in MongoDB.
\ No newline at end of file