-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
129 lines (108 loc) · 4.15 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import argparse
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from app.analysis.metrics import reapply_sent
from app.analysis.preprocessing import reprocess_headlines
from app.analysis.topics import analyze_all_topics
from app.registry import Scrapers
from app.scraper import SeleniumScraper, SeleniumResourceManager, Scraper
from app.builder import build
from app.utils import Config, get_logger
from utils.emailer import send_notification
import threading
logger = get_logger(__name__)
class SeleniumThread(threading.Thread):
def __init__(self, seleniums):
self.seleniums = seleniums
threading.Thread.__init__(self)
self.scrapers = []
def run(self):
t = time.time()
for sel in self.seleniums:
scraper = sel()
try:
scraper.run()
except Exception as e:
logger.error(f"Failed to run {scraper}: {e}")
continue
else:
self.scrapers.append(scraper)
SeleniumResourceManager().quit()
logger.info(f"Finished seleniums in {time.time() - t} seconds")
def post_run(self):
num = len(self.scrapers)
for i, sel in enumerate(self.scrapers):
sel.post_run()
logger.info(f"Finished {sel} ({i + 1} of {num})")
class Queue:
def __init__(self, args):
self.threads = []
self.seleniums = []
self.args = args
def run(self):
seleniumthread = SeleniumThread(self.seleniums)
if Config.run_selenium and self.args.run_selenium:
logger.info("Running seleniums")
seleniumthread.start()
num = len(self.threads)
with ThreadPoolExecutor(max_workers=Config.max_threads) as executor:
futures = {executor.submit(scraper.run): scraper for scraper in self.threads}
for i, future in enumerate(as_completed(futures)):
scraper: Scraper = futures[future]
if scraper.success:
scraper.post_run()
logger.info(f"Finished {scraper} ({i + 1} of {num})")
if Config.run_selenium and self.args.run_selenium:
logger.info("Waiting for seleniums")
seleniumthread.join()
seleniumthread.post_run()
def add(self, scraper):
if issubclass(scraper, SeleniumScraper):
self.seleniums.append(scraper)
else:
self.threads.append(scraper())
def scrape(args, scrapers):
if not len(scrapers):
raise ValueError("No scrapers provided")
queue = Queue(args)
logger.info("Initializing queue")
logger.info("Scrapers: %s", scrapers)
for scraper in scrapers:
queue.add(scraper)
queue.run()
def main(args: argparse.Namespace):
t = time.time()
if args.analyze_topics:
analyze_all_topics(True)
return
if args.analyze_sentiment is not None:
reapply_sent('all' in args.analyze_sentiment)
return
if args.reprocess is not None:
reprocess_headlines('all' in args.reprocess)
return
if args.email_newsletter:
with open(Config.newsletter, 'rt') as f:
send_notification(f.read())
return
if not args.skip_scrape:
scrapers = [s for s in Scrapers if s.agency == args.scraper] if args.scraper else Scrapers
scrape(args, scrapers)
build()
logger.info("Finished in %f minutes", round((time.time() - t) / 60, 2))
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--skip-scrape', action='store_true')
parser.add_argument('--scraper', type=str, default=None)
parser.add_argument('--email-newsletter', action='store_true')
parser.add_argument('--run-selenium', action='store_true')
parser.add_argument('--analyze-topics', action='store_true')
parser.add_argument('--analyze-sentiment', action='store', type=str)
parser.add_argument('--reprocess', action='store', type=str)
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
if args.debug:
Config.set_debug()
return args
if __name__ == '__main__':
main(get_args())