From c763f61de7d63878547542e086259646d2e7eac4 Mon Sep 17 00:00:00 2001 From: Andrew Jackson Date: Wed, 29 Sep 2021 13:11:23 +0100 Subject: [PATCH] Initial launcher from ukwa-manage, for #3. --- README.md | 54 ++++++++++ crawlstreams/launcher.py | 223 ++++++++++++++++++++++++++++++++------- 2 files changed, 241 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index ded5137..0661621 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,52 @@ Crawl Streams Tools for operating on the event streams relating to our crawler activity. +## Commands + +When installed, the following commands are installed. + +### crawlstreams.launcher + +The `launcher` command can be used to launch crawls according to a crawl specification. Each specification contains the crawl schedule as well as the seeds and other configuration. Currently we only support a proprietary JSON spec., but will support [`crawlspec`](https://github.com/ato/crawlspec) in the future. + + The current spec. looks like: + +``` +{ + "id": 57875, + "title": "Bae Colwyn Mentre Treftadaeth Treflun | Colwyn Bay Townscape Heritage Initiative", + "seeds": [ + "https://www.colwynbaythi.co.uk/" + ], + "depth": "CAPPED", + "scope": "subdomains", + "ignoreRobotsTxt": true, + "schedules": [ + { + "startDate": "2017-10-17 09:00:00", + \"endDate": "", + "frequency": "QUARTERLY" + } + ], + "watched": false, + "documentUrlScheme": null, + "loginPageUrl": "", + "logoutUrl": "", + "secretId": "" +} +``` + +- [ ] TBA - the newer fields around parallel queues, etc. need to be added in. See https://github.com/ukwa/crawl-streams/issues/3 + +The `launcher` is designed to be run hourly, and will enqueue all URLs that are due that hour. Finer-grained launching of requests is not yet supported. The crawler itself uses the embedded launch timestamp to determine if the request has already been satisfied, making the requests idempotent. This means it's okay if the launcher accidentally runs multiple times per hour. + +As an example, launching crawls for the current hour for NPLD looks like this: + +``` +$ launcher -k crawler06.n45.bl.uk:9094 fc.tocrawl.npld /shared/crawl_feed_npld.jsonl +``` + +and the command will log what happens, and report the outcome in terms of numbers of crawls launched. ## Development Setup @@ -71,9 +117,17 @@ When performing read operations, it's fine to run against the live system. e.g. $ python -m crawlstreams.report -k crawler05.n45.bl.uk:9094 -t -1 -q fc.crawled -r | head ``` +#### Running on a test crawler + +``` +$ python -m crawlstreams.launcher -k crawler05.n45.bl.uk:9094 fc.tocrawl.npld ~/crawl_feed_npld.jsonl +``` + ### Running inside Docker TBA $ docker-compose build $ docker-compose run crawlstreams -k kafka:9092 ... +docker run --net host -ti wurstmeister/kafka:2.12-2.1.0 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server crawler06.n45.bl.uk:9094 --from-beginning --topic fc.tocrawl.npld --max-messages 100 + diff --git a/crawlstreams/launcher.py b/crawlstreams/launcher.py index 27bd115..3603258 100755 --- a/crawlstreams/launcher.py +++ b/crawlstreams/launcher.py @@ -9,10 +9,11 @@ import os import sys import time +import json import logging import argparse from crawlstreams.enqueue import KafkaLauncher - +from datetime import datetime, timezone, timedelta # Set up a logging handler: handler = logging.StreamHandler() @@ -30,54 +31,204 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) - -def sender(launcher, args, uri): - # Ensure a http:// or https:// at the front: - if not (uri.startswith("http://") or uri.startswith("https://")): - uri = "http://%s" % uri - - # Add the main URL - launcher.launch(uri, args.source, isSeed=args.seed, forceFetch=args.forceFetch, - recrawl_interval=args.recrawl_interval, sheets=args.sheets, reset_quotas=args.reset_quotas, - webrender_this=args.webrender_this, launch_ts=args.launch_ts, parallel_queues=args.parallel_queues) +class Launcher(): + + def __init__(self, args): + self.input_file = args.crawl_feed_file + self.kafka_server = args.kafka_server + self.queue = args.queue + + def run(self, now=None): + # Set up launcher: + self.launcher = KafkaLauncher(kafka_server=self.kafka_server, topic=self.queue) + + # Get current time + if not now or now == 'now': + #now = datetime.now(tz=timezone.utc) + now = datetime.now() + logger.debug("Now timestamp: %s" % str(now)) + + # Process looking for due: + self.i_launches = 0 + self.target_errors = 0 + for t in self._all_targets(): + logger.debug("----------") + logger.debug("Looking at %s (tid:%d)" % (t['title'], t['id'])) + + # Look out for problems: + if len(t['seeds']) == 0: + logger.error("This target has no seeds! tid: %d" % t['id']) + self.target_errors += 1 + continue + + # Add a source tag if this is a watched target: + source = "tid:%d:%s" % (t['id'], t['seeds'][0]) + + # Check the scheduling: + for schedule in t['schedules']: + # Skip if target schedule outside of start/end range + if schedule['startDate']: + startDate = datetime.strptime(schedule['startDate'], "%Y-%m-%d %H:%M:%S") + logger.debug("Target schedule start date: %s" % str(startDate)) + if (now < startDate): + logger.debug("Start date %s not yet reached" % startDate) + continue + else: + logger.debug("Skipping target schedule start date: %s" % schedule['startDate']) + continue + endDate = 'N/S' + if schedule['endDate']: + endDate = datetime.strptime(schedule['endDate'], "%Y-%m-%d %H:%M:%S") + if now > endDate: + logger.debug("End date %s passed" % endDate) + continue + logger.debug("Target schedule end date: %s" % str(endDate)) + logger.debug("Target frequency: %s" % schedule['frequency']) + + # Check if the frequency and date match up: + if schedule['frequency'] == "DAILY": + self.launch_by_hour(now, startDate, endDate, t, source, 'DAILY') + + elif schedule['frequency'] == "WEEKLY": + if now.isoweekday() == startDate.isoweekday(): + self.launch_by_hour(now, startDate, endDate, t, source, 'WEEKLY') + else: + logger.debug("WEEKLY: isoweekday %s differs from schedule %s" % ( + now.isoweekday(), startDate.isoweekday())) + + elif schedule['frequency'] == "MONTHLY": + if now.day == startDate.day: + self.launch_by_hour(now, startDate, endDate, t, source, 'MONTHLY') + else: + logger.debug("MONTHLY: date %s does not match schedule %s" % ( + now, startDate)) + logger.debug("MONTHLY: day %s differs from schedule %s" % (now.day, startDate.day)) + + elif schedule['frequency'] == "QUARTERLY": + if now.day == startDate.day and now.month % 3 == startDate.month % 3: + self.launch_by_hour(now, startDate, endDate, t, source, 'QUARTERLY') + else: + logger.debug("QUARTERLY: date %s does not match schedule %s" % ( + now, startDate)) + logger.debug( + "QUARTERLY: month3 %s versus schedule %s" % (now.month % 3, startDate.month % 3)) + + elif schedule['frequency'] == "SIXMONTHLY": + if now.day == startDate.day and now.month % 6 == startDate.month % 6: + self.launch_by_hour(now, startDate, endDate, t, source, 'SIXMONTHLY') + else: + logger.debug("SIXMONTHLY: date %s does not match schedule %s" % ( + now, startDate)) + logger.debug( + "SIXMONTHLY: month6 %s versus schedule %s" % (now.month % 6, startDate.month % 6)) + + elif schedule['frequency'] == "ANNUAL": + if now.day == startDate.day and now.month == startDate.month: + self.launch_by_hour(now, startDate, endDate, t, source, 'ANNUAL') + else: + logger.debug("ANNUAL: date %s does not match schedule %s" % ( + now, startDate)) + logger.debug("ANNUAL: month %s versus schedule %s" % (now.month, startDate.month)) + elif schedule['frequency'] == "DOMAINCRAWL": + logger.debug("Skipping crawl frequency " + schedule['frequency']) + else: + logger.error("Don't understand crawl frequency " + schedule['frequency']) + + logger.info("Closing the launcher to ensure everything is pushed to Kafka...") + self.launcher.flush() + #self.launcher.close() + + logger.info("Completed. Launches this hour: %s" % self.i_launches) + + def _all_targets(self): + with open(self.input_file,'r') as fin: + for line in fin: + item = json.loads(line) + yield item + + def get_metrics(self, registry): + # type: (CollectorRegistry) -> None + + g = Gauge('ukwa_seeds_launched', + 'Total number of seeds launched.', + labelnames=['stream'], registry=registry) + g.labels(stream=self.frequency).set(self.i_launches) + + g = Gauge('ukwa_target_errors', + 'Total number of targets that appear malformed.', + labelnames=['stream'], registry=registry) + g.labels(stream=self.frequency).set(self.target_errors) + + def launch_by_hour(self, now, startDate, endDate, t, source, freq): + # Is it the current hour? + if now.hour is startDate.hour: + logger.info( + "%s target %s (tid: %s) scheduled to crawl (now: %s, start: %s, end: %s), sending to FC-3-uris-to-crawl" % ( + freq, t['title'], t['id'], now, startDate, endDate)) + counter = 0 + for seed in t['seeds']: + # Assume all are true seeds, which will over-crawl where sites have aliases that are being treated as seeds. + # TODO Add handling of aliases + isSeed = True + + # Set-up sheets + sheets = [] + + # Robots.txt + if t['ignoreRobotsTxt']: + sheets.append('ignoreRobots') + # Scope + if t['scope'] == 'subdomains': + sheets.append('subdomainsScope') + elif t['scope'] == 'plus1Scope': + sheets.append('plus1Scope') + # Limits + if t['depth'] == 'CAPPED_LARGE': + sheets.append('higherLimit') + elif t['depth'] == 'DEEP': + sheets.append('noLimit') + + # Set up the launch_ts: (Should be startDate but if that happens to be in the future this will all break) + launch_timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime(time.mktime(now.timetuple()))) + + # How many parallel queues: + parallel_queues = 1 + if 'twitter.com' in seed: + parallel_queues = 2 + + # And send launch message, always resetting any crawl quotas: + self.launcher.launch(seed, source, isSeed, forceFetch=True, sheets=sheets, + reset_quotas=True, launch_ts=launch_timestamp, + inherit_launch_ts=False,parallel_queues=parallel_queues) + counter = counter + 1 + self.i_launches = self.i_launches + 1 + + else: + logger.debug("The hour (%s) is not current." % startDate.hour) def main(argv=None): parser = argparse.ArgumentParser('(Re)Launch URIs into crawl queues.') parser.add_argument('-k', '--kafka-bootstrap-server', dest='kafka_server', type=str, default="localhost:9092", help="Kafka bootstrap server(s) to use [default: %(default)s]") - parser.add_argument("-L", "--launch-datetime", dest="launch_dt", default=None, required=False, type=str, - help="Launch request timestamp as 14-character datetime e.g. '20190301120000' or use 'now' to use the current time. [default: %(default)s]") + parser.add_argument("-L", "--launch-datetime", dest="launch_dt", default='now', required=False, type=str, + help="Launch request datetime e.g. '2019-03-01T12:00:00Z' or use 'now' to use the current time. [default: %(default)s]") parser.add_argument('queue', help="Name of queue to send URIs too, e.g. 'fc.tocrawl.npld'.") parser.add_argument('crawl_feed_file', help="Crawl feed file, containing crawl job definitions.") args = parser.parse_args() - # Set up launcher: - launcher = KafkaLauncher(kafka_server=args.kafka_server, topic=args.queue) - - # Read from a file, if the input is a file: - if os.path.isfile(args.uri_or_filename): - with open(args.uri_or_filename,'r') as f: - for line in f: - sent = False - while not sent: - try: - uri = line.strip() - sender(launcher, args, uri) - sent = True - except Exception as e: - logger.error("Exception while submitting: %s" % line) - logger.exception(e) - logger.info("Sleeping for ten seconds...") - time.sleep(10) - else: - # Or send one URI - sender(launcher, args, args.uri_or_filename) + # Configure the launcher based on the args: + cl = Launcher(args) - # Wait for send to complete: - launcher.flush() + # Set the launch timestamp: + if args.launch_dt != 'now': + launch_datetime = datetime.fromisoformat(args.launch_dt) + else: + launch_datetime = 'now' + # Run the launcher: + cl.run(launch_datetime) if __name__ == "__main__": sys.exit(main())