From 248b217d2c1461c0b810ff24ded96e722f28f12d Mon Sep 17 00:00:00 2001 From: Ron Cordell Date: Sun, 21 Aug 2016 13:48:24 -0700 Subject: [PATCH] adding in the redis cache queuing and sentiment analyzers --- kafka/Vagrantfile | 2 +- kafka/roles/zookeeper/templates/zoo.cfg.j2 | 4 +- mongodb/Vagrantfile | 4 +- rest/.idea/workspace.xml | 494 ++++++++++-------- twitter/sentimentConsumer.py | 57 ++ twitter/sentiments/__init__.py | 0 twitter/sentiments/sentimentProcessor.py | 19 + .../tweet_serializers/kafkaTweetSerializer.py | 6 +- twitter/twitter.py | 7 +- .../twitter_utils/twitterStreamListener.py | 13 +- 10 files changed, 373 insertions(+), 233 deletions(-) create mode 100644 twitter/sentimentConsumer.py create mode 100644 twitter/sentiments/__init__.py create mode 100644 twitter/sentiments/sentimentProcessor.py diff --git a/kafka/Vagrantfile b/kafka/Vagrantfile index 68c791e..3bf560d 100644 --- a/kafka/Vagrantfile +++ b/kafka/Vagrantfile @@ -11,7 +11,7 @@ Vagrant.configure("2") do |config| aws.ami = 'ami-a2490dc2' aws.region = "us-west-1" aws.instance_type = 'm3.large' - aws.security_groups = "mongodb" + aws.security_groups = "sentiment" aws.iam_instance_profile_name = 'mongodb' aws.tags = { diff --git a/kafka/roles/zookeeper/templates/zoo.cfg.j2 b/kafka/roles/zookeeper/templates/zoo.cfg.j2 index c1239c8..2370ad3 100644 --- a/kafka/roles/zookeeper/templates/zoo.cfg.j2 +++ b/kafka/roles/zookeeper/templates/zoo.cfg.j2 @@ -1,3 +1,5 @@ tickTime=2000 dataDir=/var/lib/zookeeper -clientPort=2181 \ No newline at end of file +clientPort=2181 +admin.enableServer=true +clientPortAddress=0.0.0.0 \ No newline at end of file diff --git a/mongodb/Vagrantfile b/mongodb/Vagrantfile index 494dece..4aaccde 100644 --- a/mongodb/Vagrantfile +++ b/mongodb/Vagrantfile @@ -11,7 +11,7 @@ Vagrant.configure("2") do |config| aws.ami = 'ami-a2490dc2' aws.region = "us-west-1" aws.instance_type = 'm3.large' - aws.security_groups = "mongodb" + aws.security_groups = "sentiment" aws.iam_instance_profile_name = 'mongodb' aws.tags = { @@ -29,7 +29,7 @@ Vagrant.configure("2") do |config| end config.vm.provision "ansible" do |ansible| - ansible.verbose = 'vvvv' + # ansible.verbose = 'vvvv' ansible.playbook = "mongodb.yml" end end diff --git a/rest/.idea/workspace.xml b/rest/.idea/workspace.xml index 8afea3c..5367619 100644 --- a/rest/.idea/workspace.xml +++ b/rest/.idea/workspace.xml @@ -5,38 +5,16 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + @@ -49,8 +27,11 @@ + + + @@ -65,44 +46,71 @@ - - + + + + + - - + + - - + + - + - - + + - - + + - - + + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + @@ -135,8 +143,6 @@ @@ -167,7 +177,7 @@ DEFINITION_ORDER - - + @@ -312,7 +306,7 @@ - + - + + + + + + + @@ -614,23 +646,23 @@ - - + + - - - + + + - + - + @@ -640,27 +672,27 @@ - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + @@ -676,6 +708,13 @@ + + + file://$PROJECT_DIR$/../twitter/sentimentConsumer.py + 42 + + @@ -683,45 +722,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -771,7 +776,7 @@ - + @@ -841,132 +846,122 @@ - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + - - - - - - - - - - - + - - + + - + - - + + - + - + - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + @@ -979,68 +974,97 @@ - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + - + - - + + + + + + + + + + - - + + + + + + + + + + + + + + + + + + + + + + + - - + + @@ -1049,21 +1073,41 @@ - - + + - + - + - - + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/twitter/sentimentConsumer.py b/twitter/sentimentConsumer.py new file mode 100644 index 0000000..1e01d79 --- /dev/null +++ b/twitter/sentimentConsumer.py @@ -0,0 +1,57 @@ +#!/usr/bin/python + +import argparse +import json +from kafka import KafkaConsumer +from redis import Redis +from rq import Queue +from sentiments.sentimentProcessor import process_tweet + +parser = argparse.ArgumentParser( + description='Attach to the twitter stream API and send the tweets to a destination', + add_help=True, + formatter_class=argparse.RawDescriptionHelpFormatter) + +parser.add_argument('--kafkahost', + type=str, + required=False, + help='The host name of the Kafka broker if using Kafka serialization') +parser.add_argument('--kafkaport', + type=int, + required=False, + help='The port of the Kafka broker if using Kafka serialization') +parser.add_argument('--mongohost', + type=str, + required=False, + help='The host name of the mongoDB server') +parser.add_argument('--mongoport', + type=int, + required=False, + help='The port number of the mongoDB server') +parser.add_argument('--redishost', + type=str, + required=False, + help='The host name of the Redis cache') +parser.add_argument('--redisport', + type=int, + required=False, + help='The port of the Redis Cache') + +args = parser.parse_args() + + +kafka_server = "{0}:{1}".format(args.kafkahost, args.kafkaport) + +redis_conn = Redis(host=args.redishost, port=args.redisport) +q = Queue(connection=redis_conn) + +consumer = KafkaConsumer('tweets', + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + bootstrap_servers=[kafka_server]) + +for message in consumer: + try: + job = q.enqueue(process_tweet, args=(message, args.mongohost, args.mongoport, 'tweets')) + + except KeyboardInterrupt: + consumer.close() diff --git a/twitter/sentiments/__init__.py b/twitter/sentiments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/twitter/sentiments/sentimentProcessor.py b/twitter/sentiments/sentimentProcessor.py new file mode 100644 index 0000000..bb191eb --- /dev/null +++ b/twitter/sentiments/sentimentProcessor.py @@ -0,0 +1,19 @@ +from textblob import TextBlob +from pymongo import MongoClient + + +def process_tweet(tweet, host, port, db): + if 'text' in tweet: + print 'processing...' + mongo_client = MongoClient(host=host, port=port, db=db) + mongodb = mongo_client[db] + tweet_text_blob = TextBlob(tweet['text']) + result = dict(text=tweet['text'], + coordinates=tweet['coordinates'], + polarity=tweet_text_blob.polarity, + subjectivity=tweet_text_blob.subjectivity) + + mongodb.sentiment.save(result) + + + diff --git a/twitter/tweet_serializers/kafkaTweetSerializer.py b/twitter/tweet_serializers/kafkaTweetSerializer.py index fca76df..ffa558f 100644 --- a/twitter/tweet_serializers/kafkaTweetSerializer.py +++ b/twitter/tweet_serializers/kafkaTweetSerializer.py @@ -14,9 +14,13 @@ class KafkaTweetSerializer: def __init__(self, host='localhost', port='9092'): kafka_server = "{0}:{1}".format(host, str(port)) - self._producer = KafkaProducer(bootstrap_servers=kafka_server, + self._producer = KafkaProducer(bootstrap_servers=[kafka_server], value_serializer=lambda v: json.dumps(v).encode('utf-8')) def write(self, message): self._producer.send(topic='tweets', value=message) self._producer.flush() + print "Tweet!" + + def end(self): + self._producer.close() diff --git a/twitter/twitter.py b/twitter/twitter.py index 4dcc39f..7d8242e 100644 --- a/twitter/twitter.py +++ b/twitter/twitter.py @@ -40,7 +40,7 @@ help='The host name of the Kafka broker if using Kafka serialization') parser.add_argument('--kafkaport', type=int, - require=False, + required=False, help='The port of the Kafka broker if using Kafka serialization') args = parser.parse_args() @@ -60,7 +60,10 @@ if args.sample: listener.sample() elif args.locations: - listener.locate(locations=[-180, -90, 180, 90]) + try: + listener.locate(locations=[-180, -90, 180, 90]) + except KeyboardInterrupt: + listener.disconnect() else: if args.track is not None: listener.track(args.track) diff --git a/twitter/twitter_utils/twitterStreamListener.py b/twitter/twitter_utils/twitterStreamListener.py index d8b5e22..dc32f56 100644 --- a/twitter/twitter_utils/twitterStreamListener.py +++ b/twitter/twitter_utils/twitterStreamListener.py @@ -27,13 +27,17 @@ def on_data(self, data): with self._lock: if not self.caughtInterrupt: tweet = json.loads(data) - if self.geo_only and 'coordinates' in tweet: + if self.geo_only and 'coordinates' in tweet and tweet['lang'] == 'en': self.serializer.write(tweet) return True else: self.serializer.end() return False + def on_status(self, status): + print status + return True + def on_error(self, status_code): if status_code == 420: self.retryCount += 1 @@ -48,6 +52,8 @@ def interrupt(self, signum, frame): print "CTRL-C caught, closing..." with self._lock: self.caughtInterrupt = True + self.serializer.end() + self.disconnect() def track(self, track): self.twitterStream = Stream(self.auth, self) @@ -62,3 +68,8 @@ def locate(self, locations): self.geo_only = True self.twitterStream = Stream(self.auth, self) self.twitterStream.filter(locations=locations) + + def disconnect(self): + self.serializer.end() + +