Skip to content

Commit

Permalink
avoid waiting on futures
Browse files Browse the repository at this point in the history
Motivation:

Waiting for a large number of futures is inefficient.

Modification:

Add counters to keep track of progress.

Result:

Hopefully code that scales better.
  • Loading branch information
paulmillar committed Oct 31, 2022
1 parent 67ca957 commit 6269bf0
Showing 1 changed file with 23 additions and 25 deletions.
48 changes: 23 additions & 25 deletions bin/validate
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ from urllib3.exceptions import (MaxRetryError, SSLError)
import argparse
import validators
import logging
import time

# Controlled vocabularies. The values are taken from:
#
Expand All @@ -38,7 +39,8 @@ VALID_ORG_RELATIONSHIP_TYPES = [ "Related",
"Child" ]

# Global list of pending URL-checking futures.
futures_not_done = []
futures_not_done_count = 0
futures_done_count = 0

# Whether the HTTP HEAD request result callback is active.
callback_active = 0
Expand Down Expand Up @@ -241,7 +243,7 @@ def is_certificate_SAN_problem(connection_error):


def check_result(future):
global callback_active
global callback_active, futures_done_count, futures_not_done_count
if future.cancelled():
return
problems = future.problems
Expand Down Expand Up @@ -360,9 +362,13 @@ def check_result(future):
"Something went wrong when contacting the web-server: " + str(re))
finally:
callback_active -= 1
futures_done_count += 1
futures_not_done_count -= 1
if not request_submitted:
finished_with_label(id, label)
else:
futures_done_count += 1
futures_not_done_count -= 1
logging.warning("Weird future state %s", future)
append_problem(problems, label, r.url,
"INTERNAL",
Expand Down Expand Up @@ -410,7 +416,7 @@ def finished_with_label(id, label):
def start_head_req(id, problems, label, url, log_success=None,
try_alternative_fqdn=False,
verify=True):
global futures_not_done
global futures_not_done_count
if is_loop(id, label, url, verify):
append_problem(problems, label, url, "LOOP",
"Vetoing requesting this URL: we have already tested it.")
Expand All @@ -425,7 +431,7 @@ def start_head_req(id, problems, label, url, log_success=None,
future.id = id
future.try_alternative_fqdn = try_alternative_fqdn
future.add_done_callback(check_result)
futures_not_done.append(future)
futures_not_done_count += 1
return True


Expand Down Expand Up @@ -475,8 +481,13 @@ def urls_to_verify(org):
def describe_delta(previous, current):
return (str(current) + " (+" + str(current - previous) + ")") if previous else str(current)

def print_progress(total, done):
def print_progress():
global futures_done_count, futures_not_done_count
global prev_done, prev_total

done = futures_done_count
total = done + futures_not_done_count

percent_done = 100.0*done/total
done_description = describe_delta(prev_done, done)
total_description = describe_delta(prev_total, total)
Expand Down Expand Up @@ -559,16 +570,8 @@ def validate_org(org_problems, org, name_by_id):
org_problems["relationships"] = build_missing_value_problem()


def update_progress(timeout):
global futures_not_done
wait_for = futures_not_done.copy()
(done,ignored) = concurrent.futures.wait(wait_for, timeout=timeout)
futures_not_done = [f for f in futures_not_done if f not in done]
return len(done)


def validate_json(data):
global futures_not_done
global futures_done_count, futures_not_done_count
if not args.offline:
logging.info("Scheduling URL checks for %d organisations.", len(data))

Expand All @@ -580,32 +583,27 @@ def validate_json(data):

problems = {}
try:
completed_checks_count=0
for org in data:
id = org["id"]
org_problems = {}
problems[id] = org_problems
validate_urls(org_problems, org)
validate_org(org_problems, org, name_by_id)

logging.info("%d URL checks have been queued.", len(futures_not_done))
logging.info("%d URL checks have been queued.", futures_not_done_count)
while True:
done_count = update_progress(2)
if len(futures_not_done) == 0 and callback_active == 0:
if futures_not_done_count == 0 and callback_active == 0:
break
completed_checks_count += done_count
total_checks = completed_checks_count + len(futures_not_done)
print_progress(total_checks, completed_checks_count)
print_progress()
time.sleep(2)
return problems;

except KeyboardInterrupt as e:
logging.warning("Shutting down...")
done_count = update_progress(0)
completed_checks_count += done_count
total_checks = completed_checks_count + len(futures_not_done)
executor.shutdown(wait=True, cancel_futures=True)
total = futures_done_count + futures_not_done_count
logging.info("Interrupted when %d%% URL checks have completed.",
100.0*(total_checks-len(futures_not_done))/total_checks)
100.0*(total-futures_not_done_count)/total)
return problems;


Expand Down

0 comments on commit 6269bf0

Please sign in to comment.