diff --git a/kafka/Vagrantfile b/kafka/Vagrantfile
index 68c791e..3bf560d 100644
--- a/kafka/Vagrantfile
+++ b/kafka/Vagrantfile
@@ -11,7 +11,7 @@ Vagrant.configure("2") do |config|
aws.ami = 'ami-a2490dc2'
aws.region = "us-west-1"
aws.instance_type = 'm3.large'
- aws.security_groups = "mongodb"
+ aws.security_groups = "sentiment"
aws.iam_instance_profile_name = 'mongodb'
aws.tags = {
diff --git a/kafka/roles/zookeeper/templates/zoo.cfg.j2 b/kafka/roles/zookeeper/templates/zoo.cfg.j2
index c1239c8..2370ad3 100644
--- a/kafka/roles/zookeeper/templates/zoo.cfg.j2
+++ b/kafka/roles/zookeeper/templates/zoo.cfg.j2
@@ -1,3 +1,5 @@
tickTime=2000
dataDir=/var/lib/zookeeper
-clientPort=2181
\ No newline at end of file
+clientPort=2181
+admin.enableServer=true
+clientPortAddress=0.0.0.0
\ No newline at end of file
diff --git a/mongodb/Vagrantfile b/mongodb/Vagrantfile
index 494dece..4aaccde 100644
--- a/mongodb/Vagrantfile
+++ b/mongodb/Vagrantfile
@@ -11,7 +11,7 @@ Vagrant.configure("2") do |config|
aws.ami = 'ami-a2490dc2'
aws.region = "us-west-1"
aws.instance_type = 'm3.large'
- aws.security_groups = "mongodb"
+ aws.security_groups = "sentiment"
aws.iam_instance_profile_name = 'mongodb'
aws.tags = {
@@ -29,7 +29,7 @@ Vagrant.configure("2") do |config|
end
config.vm.provision "ansible" do |ansible|
- ansible.verbose = 'vvvv'
+ # ansible.verbose = 'vvvv'
ansible.playbook = "mongodb.yml"
end
end
diff --git a/rest/.idea/workspace.xml b/rest/.idea/workspace.xml
index 8afea3c..5367619 100644
--- a/rest/.idea/workspace.xml
+++ b/rest/.idea/workspace.xml
@@ -5,38 +5,16 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
@@ -49,8 +27,11 @@
+
+
+
@@ -65,44 +46,71 @@
-
-
+
+
+
+
+
-
-
+
+
-
-
+
+
-
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -135,8 +143,6 @@
-
-
@@ -148,15 +154,19 @@
-
-
+
+
+
+
-
+
+
+
@@ -167,7 +177,7 @@
DEFINITION_ORDER
-
+
@@ -227,7 +237,7 @@
-
+
@@ -237,7 +247,11 @@
-
+
+
+
+
+
@@ -247,11 +261,11 @@
-
+
-
+
@@ -261,15 +275,7 @@
-
-
-
-
-
-
-
-
-
+
@@ -282,25 +288,13 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -312,7 +306,7 @@
-
+
@@ -594,10 +588,48 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -614,23 +646,23 @@
-
-
+
+
-
-
-
+
+
+
-
+
-
+
@@ -640,27 +672,27 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -676,6 +708,13 @@
+
+
+ file://$PROJECT_DIR$/../twitter/sentimentConsumer.py
+ 42
+
+
+
@@ -683,45 +722,11 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -771,7 +776,7 @@
-
+
@@ -841,132 +846,122 @@
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
@@ -979,68 +974,97 @@
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
@@ -1049,21 +1073,41 @@
-
-
+
+
-
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/twitter/sentimentConsumer.py b/twitter/sentimentConsumer.py
new file mode 100644
index 0000000..1e01d79
--- /dev/null
+++ b/twitter/sentimentConsumer.py
@@ -0,0 +1,57 @@
+#!/usr/bin/python
+
+import argparse
+import json
+from kafka import KafkaConsumer
+from redis import Redis
+from rq import Queue
+from sentiments.sentimentProcessor import process_tweet
+
+parser = argparse.ArgumentParser(
+ description='Attach to the twitter stream API and send the tweets to a destination',
+ add_help=True,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+parser.add_argument('--kafkahost',
+ type=str,
+ required=False,
+ help='The host name of the Kafka broker if using Kafka serialization')
+parser.add_argument('--kafkaport',
+ type=int,
+ required=False,
+ help='The port of the Kafka broker if using Kafka serialization')
+parser.add_argument('--mongohost',
+ type=str,
+ required=False,
+ help='The host name of the mongoDB server')
+parser.add_argument('--mongoport',
+ type=int,
+ required=False,
+ help='The port number of the mongoDB server')
+parser.add_argument('--redishost',
+ type=str,
+ required=False,
+ help='The host name of the Redis cache')
+parser.add_argument('--redisport',
+ type=int,
+ required=False,
+ help='The port of the Redis Cache')
+
+args = parser.parse_args()
+
+
+kafka_server = "{0}:{1}".format(args.kafkahost, args.kafkaport)
+
+redis_conn = Redis(host=args.redishost, port=args.redisport)
+q = Queue(connection=redis_conn)
+
+consumer = KafkaConsumer('tweets',
+ value_deserializer=lambda m: json.loads(m.decode('utf-8')),
+ bootstrap_servers=[kafka_server])
+
+for message in consumer:
+ try:
+ job = q.enqueue(process_tweet, args=(message, args.mongohost, args.mongoport, 'tweets'))
+
+ except KeyboardInterrupt:
+ consumer.close()
diff --git a/twitter/sentiments/__init__.py b/twitter/sentiments/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/twitter/sentiments/sentimentProcessor.py b/twitter/sentiments/sentimentProcessor.py
new file mode 100644
index 0000000..bb191eb
--- /dev/null
+++ b/twitter/sentiments/sentimentProcessor.py
@@ -0,0 +1,19 @@
+from textblob import TextBlob
+from pymongo import MongoClient
+
+
+def process_tweet(tweet, host, port, db):
+ if 'text' in tweet:
+ print 'processing...'
+ mongo_client = MongoClient(host=host, port=port, db=db)
+ mongodb = mongo_client[db]
+ tweet_text_blob = TextBlob(tweet['text'])
+ result = dict(text=tweet['text'],
+ coordinates=tweet['coordinates'],
+ polarity=tweet_text_blob.polarity,
+ subjectivity=tweet_text_blob.subjectivity)
+
+ mongodb.sentiment.save(result)
+
+
+
diff --git a/twitter/tweet_serializers/kafkaTweetSerializer.py b/twitter/tweet_serializers/kafkaTweetSerializer.py
index fca76df..ffa558f 100644
--- a/twitter/tweet_serializers/kafkaTweetSerializer.py
+++ b/twitter/tweet_serializers/kafkaTweetSerializer.py
@@ -14,9 +14,13 @@ class KafkaTweetSerializer:
def __init__(self, host='localhost', port='9092'):
kafka_server = "{0}:{1}".format(host, str(port))
- self._producer = KafkaProducer(bootstrap_servers=kafka_server,
+ self._producer = KafkaProducer(bootstrap_servers=[kafka_server],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
def write(self, message):
self._producer.send(topic='tweets', value=message)
self._producer.flush()
+ print "Tweet!"
+
+ def end(self):
+ self._producer.close()
diff --git a/twitter/twitter.py b/twitter/twitter.py
index 4dcc39f..7d8242e 100644
--- a/twitter/twitter.py
+++ b/twitter/twitter.py
@@ -40,7 +40,7 @@
help='The host name of the Kafka broker if using Kafka serialization')
parser.add_argument('--kafkaport',
type=int,
- require=False,
+ required=False,
help='The port of the Kafka broker if using Kafka serialization')
args = parser.parse_args()
@@ -60,7 +60,10 @@
if args.sample:
listener.sample()
elif args.locations:
- listener.locate(locations=[-180, -90, 180, 90])
+ try:
+ listener.locate(locations=[-180, -90, 180, 90])
+ except KeyboardInterrupt:
+ listener.disconnect()
else:
if args.track is not None:
listener.track(args.track)
diff --git a/twitter/twitter_utils/twitterStreamListener.py b/twitter/twitter_utils/twitterStreamListener.py
index d8b5e22..dc32f56 100644
--- a/twitter/twitter_utils/twitterStreamListener.py
+++ b/twitter/twitter_utils/twitterStreamListener.py
@@ -27,13 +27,17 @@ def on_data(self, data):
with self._lock:
if not self.caughtInterrupt:
tweet = json.loads(data)
- if self.geo_only and 'coordinates' in tweet:
+ if self.geo_only and 'coordinates' in tweet and tweet['lang'] == 'en':
self.serializer.write(tweet)
return True
else:
self.serializer.end()
return False
+ def on_status(self, status):
+ print status
+ return True
+
def on_error(self, status_code):
if status_code == 420:
self.retryCount += 1
@@ -48,6 +52,8 @@ def interrupt(self, signum, frame):
print "CTRL-C caught, closing..."
with self._lock:
self.caughtInterrupt = True
+ self.serializer.end()
+ self.disconnect()
def track(self, track):
self.twitterStream = Stream(self.auth, self)
@@ -62,3 +68,8 @@ def locate(self, locations):
self.geo_only = True
self.twitterStream = Stream(self.auth, self)
self.twitterStream.filter(locations=locations)
+
+ def disconnect(self):
+ self.serializer.end()
+
+