Skip to content

Commit

Permalink
adding in the redis cache queuing and sentiment analyzers
Browse files Browse the repository at this point in the history
  • Loading branch information
rocket-ron committed Aug 21, 2016
1 parent 0f68f1f commit 248b217
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 233 deletions.
2 changes: 1 addition & 1 deletion kafka/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Vagrant.configure("2") do |config|
aws.ami = 'ami-a2490dc2'
aws.region = "us-west-1"
aws.instance_type = 'm3.large'
aws.security_groups = "mongodb"
aws.security_groups = "sentiment"
aws.iam_instance_profile_name = 'mongodb'

aws.tags = {
Expand Down
4 changes: 3 additions & 1 deletion kafka/roles/zookeeper/templates/zoo.cfg.j2
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
clientPort=2181
admin.enableServer=true
clientPortAddress=0.0.0.0
4 changes: 2 additions & 2 deletions mongodb/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Vagrant.configure("2") do |config|
aws.ami = 'ami-a2490dc2'
aws.region = "us-west-1"
aws.instance_type = 'm3.large'
aws.security_groups = "mongodb"
aws.security_groups = "sentiment"
aws.iam_instance_profile_name = 'mongodb'

aws.tags = {
Expand All @@ -29,7 +29,7 @@ Vagrant.configure("2") do |config|
end

config.vm.provision "ansible" do |ansible|
ansible.verbose = 'vvvv'
# ansible.verbose = 'vvvv'
ansible.playbook = "mongodb.yml"
end
end
494 changes: 269 additions & 225 deletions rest/.idea/workspace.xml

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions twitter/sentimentConsumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/python

import argparse
import json
from kafka import KafkaConsumer
from redis import Redis
from rq import Queue
from sentiments.sentimentProcessor import process_tweet

parser = argparse.ArgumentParser(
description='Attach to the twitter stream API and send the tweets to a destination',
add_help=True,
formatter_class=argparse.RawDescriptionHelpFormatter)

parser.add_argument('--kafkahost',
type=str,
required=False,
help='The host name of the Kafka broker if using Kafka serialization')
parser.add_argument('--kafkaport',
type=int,
required=False,
help='The port of the Kafka broker if using Kafka serialization')
parser.add_argument('--mongohost',
type=str,
required=False,
help='The host name of the mongoDB server')
parser.add_argument('--mongoport',
type=int,
required=False,
help='The port number of the mongoDB server')
parser.add_argument('--redishost',
type=str,
required=False,
help='The host name of the Redis cache')
parser.add_argument('--redisport',
type=int,
required=False,
help='The port of the Redis Cache')

args = parser.parse_args()


kafka_server = "{0}:{1}".format(args.kafkahost, args.kafkaport)

redis_conn = Redis(host=args.redishost, port=args.redisport)
q = Queue(connection=redis_conn)

consumer = KafkaConsumer('tweets',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
bootstrap_servers=[kafka_server])

for message in consumer:
try:
job = q.enqueue(process_tweet, args=(message, args.mongohost, args.mongoport, 'tweets'))

except KeyboardInterrupt:
consumer.close()
Empty file added twitter/sentiments/__init__.py
Empty file.
19 changes: 19 additions & 0 deletions twitter/sentiments/sentimentProcessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from textblob import TextBlob
from pymongo import MongoClient


def process_tweet(tweet, host, port, db):
if 'text' in tweet:
print 'processing...'
mongo_client = MongoClient(host=host, port=port, db=db)
mongodb = mongo_client[db]
tweet_text_blob = TextBlob(tweet['text'])
result = dict(text=tweet['text'],
coordinates=tweet['coordinates'],
polarity=tweet_text_blob.polarity,
subjectivity=tweet_text_blob.subjectivity)

mongodb.sentiment.save(result)



6 changes: 5 additions & 1 deletion twitter/tweet_serializers/kafkaTweetSerializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ class KafkaTweetSerializer:

def __init__(self, host='localhost', port='9092'):
kafka_server = "{0}:{1}".format(host, str(port))
self._producer = KafkaProducer(bootstrap_servers=kafka_server,
self._producer = KafkaProducer(bootstrap_servers=[kafka_server],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def write(self, message):
self._producer.send(topic='tweets', value=message)
self._producer.flush()
print "Tweet!"

def end(self):
self._producer.close()
7 changes: 5 additions & 2 deletions twitter/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
help='The host name of the Kafka broker if using Kafka serialization')
parser.add_argument('--kafkaport',
type=int,
require=False,
required=False,
help='The port of the Kafka broker if using Kafka serialization')

args = parser.parse_args()
Expand All @@ -60,7 +60,10 @@
if args.sample:
listener.sample()
elif args.locations:
listener.locate(locations=[-180, -90, 180, 90])
try:
listener.locate(locations=[-180, -90, 180, 90])
except KeyboardInterrupt:
listener.disconnect()
else:
if args.track is not None:
listener.track(args.track)
Expand Down
13 changes: 12 additions & 1 deletion twitter/twitter_utils/twitterStreamListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ def on_data(self, data):
with self._lock:
if not self.caughtInterrupt:
tweet = json.loads(data)
if self.geo_only and 'coordinates' in tweet:
if self.geo_only and 'coordinates' in tweet and tweet['lang'] == 'en':
self.serializer.write(tweet)
return True
else:
self.serializer.end()
return False

def on_status(self, status):
print status
return True

def on_error(self, status_code):
if status_code == 420:
self.retryCount += 1
Expand All @@ -48,6 +52,8 @@ def interrupt(self, signum, frame):
print "CTRL-C caught, closing..."
with self._lock:
self.caughtInterrupt = True
self.serializer.end()
self.disconnect()

def track(self, track):
self.twitterStream = Stream(self.auth, self)
Expand All @@ -62,3 +68,8 @@ def locate(self, locations):
self.geo_only = True
self.twitterStream = Stream(self.auth, self)
self.twitterStream.filter(locations=locations)

def disconnect(self):
self.serializer.end()


0 comments on commit 248b217

Please sign in to comment.