-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcrawl.py
executable file
·129 lines (113 loc) · 4 KB
/
crawl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
from collections import defaultdict
import beanstalkc
import json
import pdb
import signal
from datetime import datetime, timedelta
import time
from itertools import groupby
from restkit import Unauthorized
import logging
import multiprocessing
import Queue
import pdb
import maroon
from maroon import *
from models import User, Tweet
from twitter import TwitterResource
from settings import settings
from gisgraphy import GisgraphyResource
from procs import LocalProc, create_slaves
HALT = False
def set_halt(x=None,y=None):
print "halting"
global HALT
HALT=True
#signal.signal(signal.SIGINT, set_halt)
#signal.signal(signal.SIGUSR1, set_halt)
class CrawlMaster(LocalProc):
def __init__(self):
LocalProc.__init__(self,"crawl")
self.waiting = set()
self.todo = multiprocessing.JoinableQueue(30)
self.done = multiprocessing.Queue()
def run(self):
print "started crawl"
logging.info("started crawl")
try:
while not HALT:
self.queue_crawl()
print "naptime"
time.sleep(600)
except:
logging.exception("exception caused HALT")
print "done"
def queue_crawl(self):
logging.info("queue_crawl")
now = datetime.utcnow().timetuple()[0:6]
for user in User.database.paged_view('user/next_crawl',endkey=now):
uid = user['id']
if HALT: break
if uid in self.waiting: continue # they are queued
self.waiting.add(uid)
self.todo.put(uid)
if len(self.waiting)%100==0:
# let the queue empty a bit
self.read_crawled()
def read_crawled(self):
logging.info("read_crawled, %d",len(self.waiting))
try:
while True:
uid = self.done.get_nowait()
self.waiting.remove(uid)
except Queue.Empty:
return
class CrawlSlave(LocalProc):
def __init__(self, slave_id, todo, done):
LocalProc.__init__(self,'crawl', slave_id)
self.twitter = TwitterResource()
self.todo = todo
self.done = done
def run(self):
Tweet.database = CouchDB(settings.couchdb_root+"hou_new_tweet",True)
#pdb.Pdb(stdin=open('/dev/stdin', 'r+'), stdout=open('/dev/stdout', 'r+')).set_trace()
while not HALT:
user=None
try:
uid = self.todo.get()
user = User.get_id(uid)
self.crawl(user)
self.done.put(uid)
self.todo.task_done()
if self.twitter.remaining < 10:
dt = (self.twitter.reset_time-datetime.utcnow())
logging.info("goodnight for %r",dt)
time.sleep(dt.seconds)
except Exception as ex:
if user:
logging.exception("exception for user %s"%user.to_d())
else:
logging.exception("exception and user is None")
logging.info("api calls remaining: %d",self.twitter.remaining)
print "slave is done"
def crawl(self, user):
logging.debug("visiting %s - %s",user._id,user.screen_name)
tweets = self.twitter.save_timeline(user._id, user.last_tid)
if tweets:
user.last_tid = tweets[0]._id
now = datetime.utcnow()
last = user.last_crawl_date if user.last_crawl_date is not None else datetime(2010,11,12)
delta = now - last
seconds = delta.seconds + delta.days*24*3600
tph = (3600.0*len(tweets)/seconds + user.tweets_per_hour)/2
user.tweets_per_hour = tph
hours = min(settings.tweets_per_crawl/tph, settings.max_hours)
user.next_crawl_date = now+timedelta(hours=hours)
user.last_crawl_date = now
user.save()
if __name__ == '__main__':
User.database = CouchDB(settings.couchdb_root+"houtx_user",True)
proc = CrawlMaster()
create_slaves(CrawlSlave, proc.todo, proc.done)
proc.run()