diff --git a/rest/.idea/modules.xml b/rest/.idea/modules.xml index 33bfed6..7896e0d 100644 --- a/rest/.idea/modules.xml +++ b/rest/.idea/modules.xml @@ -3,6 +3,7 @@ + \ No newline at end of file diff --git a/rest/.idea/rest.iml b/rest/.idea/rest.iml index 59cb225..cf813a6 100644 --- a/rest/.idea/rest.iml +++ b/rest/.idea/rest.iml @@ -2,10 +2,11 @@ - + + @@ -59,9 +144,9 @@ DEFINITION_ORDER - @@ -99,6 +184,44 @@ - + - + - + + + + + + + @@ -378,7 +539,7 @@ - + @@ -387,22 +548,45 @@ - - + + + - + - + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + - + @@ -430,5 +623,133 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/twitter/.idea/encodings.xml b/twitter/.idea/encodings.xml new file mode 100644 index 0000000..97626ba --- /dev/null +++ b/twitter/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/twitter/.idea/misc.xml b/twitter/.idea/misc.xml new file mode 100644 index 0000000..ec5f9da --- /dev/null +++ b/twitter/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/twitter/.idea/modules.xml b/twitter/.idea/modules.xml new file mode 100644 index 0000000..f350cc1 --- /dev/null +++ b/twitter/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/twitter/.idea/twitter.iml b/twitter/.idea/twitter.iml new file mode 100644 index 0000000..0c73e5a --- /dev/null +++ b/twitter/.idea/twitter.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/twitter/.idea/workspace.xml b/twitter/.idea/workspace.xml new file mode 100644 index 0000000..d2bf3c0 --- /dev/null +++ b/twitter/.idea/workspace.xml @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + 1471721493100 + + + + + + + + + + \ No newline at end of file diff --git a/twitter/tweet_serializers/__init__.py b/twitter/tweet_serializers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/twitter/tweet_serializers/consoleTweetSerializer.py b/twitter/tweet_serializers/consoleTweetSerializer.py new file mode 100644 index 0000000..c1a153a --- /dev/null +++ b/twitter/tweet_serializers/consoleTweetSerializer.py @@ -0,0 +1,25 @@ +from termcolor import colored + + +class ConsoleTweetSerializer: + def __init__(self): + pass + + @staticmethod + def write(tweet): + try: + print '@%s: %s' % ( + colored(tweet['user']['screen_name'], 'yellow'), + colored(tweet['text'].encode('ascii', 'ignore'), 'green')) + except: + pass + + def end(self): + pass + + +if __name__ == '__main__': + s = ConsoleTweetSerializer() + u = {'screen_name': 'joe blow'} + t = {'user': u, 'text': 'this is a test. This is only a test...'} + s.write(t) diff --git a/twitter/tweet_serializers/kafkaTweetSerializer.py b/twitter/tweet_serializers/kafkaTweetSerializer.py new file mode 100644 index 0000000..7f2a745 --- /dev/null +++ b/twitter/tweet_serializers/kafkaTweetSerializer.py @@ -0,0 +1,48 @@ +import threading +import logging +import time + +from kafka import KafkaConsumer, KafkaProducer + + +class Producer(threading.Thread): + daemon = True + + def run(self): + producer = KafkaProducer(bootstrap_servers='localhost:9092') + + while True: + producer.send('my-topic', b"test") + producer.send('my-topic', b"\xc2Hola, mundo!") + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + auto_offset_reset='earliest') + consumer.subscribe(['my-topic']) + + for message in consumer: + print (message) + + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.INFO + ) + main() \ No newline at end of file diff --git a/twitter/twitter.py b/twitter/twitter.py new file mode 100644 index 0000000..1b1c238 --- /dev/null +++ b/twitter/twitter.py @@ -0,0 +1,54 @@ +#!/usr/bin/python + +from twitter_utils.apikeys import apikeys +from twitter_utils.twitterStreamListener import TwitterStreamListener +from tweet_serializers.consoleTweetSerializer import ConsoleTweetSerializer +import argparse +import time + + +parser = argparse.ArgumentParser( + description='Attach to the twitter stream API and send the tweets to a destination', + add_help=True, + formatter_class=argparse.RawDescriptionHelpFormatter) + +parser.add_argument('--key', + type=str, + required=True, + choices=apikeys.keys(), + help='the Twitter API key to use') +parser.add_argument('--track', + type=str, + required=False, + help='Twitter phrases to track') +parser.add_argument('--locations', + action='store_true', + required=False, + help='Filter the Twitter stream for geo-located tweets') +parser.add_argument('--sample', + action='store_true', + required=False, + help='Sample the Twitter stream') + +args = parser.parse_args() + + +# fetchSize = 1500 + +print "Writing tweets to console..." +serializer = ConsoleTweetSerializer() +fetchSize = 10 + +startTime = time.time() + +listener = TwitterStreamListener(serializer, apikeys[args.key]) + +if args.sample: + listener.sample() +elif args.locations: + listener.locate(locations=[-180, -90, 180, 90]) +else: + if args.track is not None: + listener.track(args.track) + +print "Tracking stopped after " + str(time.time() - startTime) + " seconds." diff --git a/twitter/twitter_utils/__init__.py b/twitter/twitter_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/twitter/twitter_utils/apikeys.py b/twitter/twitter_utils/apikeys.py new file mode 100644 index 0000000..9bb9e96 --- /dev/null +++ b/twitter/twitter_utils/apikeys.py @@ -0,0 +1,17 @@ +''' +These constants define a mapping to the environment variables that define the actual +Twitter API keys. The keys are assumed to have the form TWITTER_[string]_KEYNAME +where [string] is mapped to the constants, below +''' + +apikeys = { + 'W205P0' : 'W205_PROJECT', + 'W205P1' : 'W205_P1', + 'W205P2' : 'W205_P2', + 'W205P3' : 'W205_P3', + 'W205P4' : 'W205_P4', + 'W205P5' : 'W205_P5', + 'W205P6' : 'W205_P6', + 'W205A3' : 'W205_A3F', + 'NONE' : '' +} \ No newline at end of file diff --git a/twitter/twitter_utils/twitterAuth.py b/twitter/twitter_utils/twitterAuth.py new file mode 100644 index 0000000..144be58 --- /dev/null +++ b/twitter/twitter_utils/twitterAuth.py @@ -0,0 +1,35 @@ +import tweepy +import os + + +def get_app_auth(app): + keys = get_auth_keys(app) + if len(keys) < 4: + print "Error retrieving Twitter keys for authorization..." + return None + else: + return tweepy.AppAuthHandler(keys['app_key'], keys['app_secret']) + + +def get_oauth(app): + keys = get_auth_keys(app) + auth = tweepy.OAuthHandler(keys['app_key'], keys['app_secret']) + auth.set_access_token(keys['access_token'], keys['token_secret']) + return auth + + +# keys are environment variables with the patterns +# TWITTER_[app]_APP_KEY +# TWITTER_[app]_CONSUMER_SECRET +# etc +def get_auth_keys(app): + if app: + app += '_' + else: + app = '' + + keys = dict(app_key=os.getenv('TWITTER_' + app + 'APP_KEY'), + app_secret=os.getenv('TWITTER_' + app + 'CONSUMER_SECRET'), + access_token=os.getenv('TWITTER_' + app + 'ACCESS_TOKEN'), + token_secret=os.getenv('TWITTER_' + app + 'ACCESS_TOKEN_SECRET')) + return keys diff --git a/twitter/twitter_utils/twitterStreamListener.py b/twitter/twitter_utils/twitterStreamListener.py new file mode 100644 index 0000000..d8b5e22 --- /dev/null +++ b/twitter/twitter_utils/twitterStreamListener.py @@ -0,0 +1,64 @@ +from tweepy.streaming import StreamListener +from tweepy import Stream +import twitterAuth +import json +import time +import signal +import threading + + +class TwitterStreamListener(StreamListener): + + def __init__(self, serializer, key): + signal.signal(signal.SIGINT, self.interrupt) + self._lock = threading.RLock() + + self.serializer = serializer + self.auth = twitterAuth.get_oauth(key) + self.retryCount = 0 + self.retryTime = 1 + self.retryMax = 5 + self.caughtInterrupt = False + self.twitterStream = None + self.geo_only = False + + def on_data(self, data): + # process data + with self._lock: + if not self.caughtInterrupt: + tweet = json.loads(data) + if self.geo_only and 'coordinates' in tweet: + self.serializer.write(tweet) + return True + else: + self.serializer.end() + return False + + def on_error(self, status_code): + if status_code == 420: + self.retryCount += 1 + if self.retryCount > self.retryMax: + return False + else: + time.wait(self.retryTime) + self.retryTime *= 2 + return True + + def interrupt(self, signum, frame): + print "CTRL-C caught, closing..." + with self._lock: + self.caughtInterrupt = True + + def track(self, track): + self.twitterStream = Stream(self.auth, self) + self.twitterStream.filter(track=[track]) + + def sample(self, geo_only=False): + self.geo_only = geo_only + self.twitterStream = Stream(self.auth, self) + self.twitterStream.sample(async=False) + + def locate(self, locations): + self.geo_only = True + self.twitterStream = Stream(self.auth, self) + self.twitterStream.filter(locations=locations)