Skip to content

Commit

Permalink
Adding Twitter stream implemenation to console serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
rocket-ron committed Aug 20, 2016
1 parent 2c6eefe commit 4ec18ff
Show file tree
Hide file tree
Showing 16 changed files with 666 additions and 17 deletions.
1 change: 1 addition & 0 deletions rest/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rest/.idea/rest.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

353 changes: 337 additions & 16 deletions rest/.idea/workspace.xml

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions twitter/.idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions twitter/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions twitter/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions twitter/.idea/twitter.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions twitter/.idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
25 changes: 25 additions & 0 deletions twitter/tweet_serializers/consoleTweetSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from termcolor import colored


class ConsoleTweetSerializer:
def __init__(self):
pass

@staticmethod
def write(tweet):
try:
print '@%s: %s' % (
colored(tweet['user']['screen_name'], 'yellow'),
colored(tweet['text'].encode('ascii', 'ignore'), 'green'))
except:
pass

def end(self):
pass


if __name__ == '__main__':
s = ConsoleTweetSerializer()
u = {'screen_name': 'joe blow'}
t = {'user': u, 'text': 'this is a test. This is only a test...'}
s.write(t)
48 changes: 48 additions & 0 deletions twitter/tweet_serializers/kafkaTweetSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import threading
import logging
import time

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
daemon = True

def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092')

while True:
producer.send('my-topic', b"test")
producer.send('my-topic', b"\xc2Hola, mundo!")
time.sleep(1)


class Consumer(threading.Thread):
daemon = True

def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])

for message in consumer:
print (message)


def main():
threads = [
Producer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(10)

if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
54 changes: 54 additions & 0 deletions twitter/twitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/python

from twitter_utils.apikeys import apikeys
from twitter_utils.twitterStreamListener import TwitterStreamListener
from tweet_serializers.consoleTweetSerializer import ConsoleTweetSerializer
import argparse
import time


parser = argparse.ArgumentParser(
description='Attach to the twitter stream API and send the tweets to a destination',
add_help=True,
formatter_class=argparse.RawDescriptionHelpFormatter)

parser.add_argument('--key',
type=str,
required=True,
choices=apikeys.keys(),
help='the Twitter API key to use')
parser.add_argument('--track',
type=str,
required=False,
help='Twitter phrases to track')
parser.add_argument('--locations',
action='store_true',
required=False,
help='Filter the Twitter stream for geo-located tweets')
parser.add_argument('--sample',
action='store_true',
required=False,
help='Sample the Twitter stream')

args = parser.parse_args()


# fetchSize = 1500

print "Writing tweets to console..."
serializer = ConsoleTweetSerializer()
fetchSize = 10

startTime = time.time()

listener = TwitterStreamListener(serializer, apikeys[args.key])

if args.sample:
listener.sample()
elif args.locations:
listener.locate(locations=[-180, -90, 180, 90])
else:
if args.track is not None:
listener.track(args.track)

print "Tracking stopped after " + str(time.time() - startTime) + " seconds."
Empty file.
17 changes: 17 additions & 0 deletions twitter/twitter_utils/apikeys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'''
These constants define a mapping to the environment variables that define the actual
Twitter API keys. The keys are assumed to have the form TWITTER_[string]_KEYNAME
where [string] is mapped to the constants, below
'''

apikeys = {
'W205P0' : 'W205_PROJECT',
'W205P1' : 'W205_P1',
'W205P2' : 'W205_P2',
'W205P3' : 'W205_P3',
'W205P4' : 'W205_P4',
'W205P5' : 'W205_P5',
'W205P6' : 'W205_P6',
'W205A3' : 'W205_A3F',
'NONE' : ''
}
35 changes: 35 additions & 0 deletions twitter/twitter_utils/twitterAuth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import tweepy
import os


def get_app_auth(app):
keys = get_auth_keys(app)
if len(keys) < 4:
print "Error retrieving Twitter keys for authorization..."
return None
else:
return tweepy.AppAuthHandler(keys['app_key'], keys['app_secret'])


def get_oauth(app):
keys = get_auth_keys(app)
auth = tweepy.OAuthHandler(keys['app_key'], keys['app_secret'])
auth.set_access_token(keys['access_token'], keys['token_secret'])
return auth


# keys are environment variables with the patterns
# TWITTER_[app]_APP_KEY
# TWITTER_[app]_CONSUMER_SECRET
# etc
def get_auth_keys(app):
if app:
app += '_'
else:
app = ''

keys = dict(app_key=os.getenv('TWITTER_' + app + 'APP_KEY'),
app_secret=os.getenv('TWITTER_' + app + 'CONSUMER_SECRET'),
access_token=os.getenv('TWITTER_' + app + 'ACCESS_TOKEN'),
token_secret=os.getenv('TWITTER_' + app + 'ACCESS_TOKEN_SECRET'))
return keys
64 changes: 64 additions & 0 deletions twitter/twitter_utils/twitterStreamListener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from tweepy.streaming import StreamListener
from tweepy import Stream
import twitterAuth
import json
import time
import signal
import threading


class TwitterStreamListener(StreamListener):

def __init__(self, serializer, key):
signal.signal(signal.SIGINT, self.interrupt)
self._lock = threading.RLock()

self.serializer = serializer
self.auth = twitterAuth.get_oauth(key)
self.retryCount = 0
self.retryTime = 1
self.retryMax = 5
self.caughtInterrupt = False
self.twitterStream = None
self.geo_only = False

def on_data(self, data):
# process data
with self._lock:
if not self.caughtInterrupt:
tweet = json.loads(data)
if self.geo_only and 'coordinates' in tweet:
self.serializer.write(tweet)
return True
else:
self.serializer.end()
return False

def on_error(self, status_code):
if status_code == 420:
self.retryCount += 1
if self.retryCount > self.retryMax:
return False
else:
time.wait(self.retryTime)
self.retryTime *= 2
return True

def interrupt(self, signum, frame):
print "CTRL-C caught, closing..."
with self._lock:
self.caughtInterrupt = True

def track(self, track):
self.twitterStream = Stream(self.auth, self)
self.twitterStream.filter(track=[track])

def sample(self, geo_only=False):
self.geo_only = geo_only
self.twitterStream = Stream(self.auth, self)
self.twitterStream.sample(async=False)

def locate(self, locations):
self.geo_only = True
self.twitterStream = Stream(self.auth, self)
self.twitterStream.filter(locations=locations)

0 comments on commit 4ec18ff

Please sign in to comment.