From a6236a6a20bcf2512e0c0954e0d7b1c588557693 Mon Sep 17 00:00:00 2001 From: David Kane Date: Wed, 22 Feb 2023 13:52:36 +0000 Subject: [PATCH] update boundaries command --- findthatpostcode/commands/boundaries.py | 151 +++++++++++------------- findthatpostcode/db.py | 16 +++ findthatpostcode/settings.py | 7 ++ 3 files changed, 95 insertions(+), 79 deletions(-) diff --git a/findthatpostcode/commands/boundaries.py b/findthatpostcode/commands/boundaries.py index 88e81f2..9d7d171 100644 --- a/findthatpostcode/commands/boundaries.py +++ b/findthatpostcode/commands/boundaries.py @@ -1,16 +1,18 @@ """ Import commands for area boundaries """ +import glob +import io +import json +import os import click import requests import requests_cache -import shapely.geometry import tqdm from findthatpostcode import db, settings -from findthatpostcode.documents import Area, Entity -from findthatpostcode.utils import BulkImporter +from findthatpostcode.documents import Area AREA_INDEX = Area.Index.name @@ -19,30 +21,52 @@ @click.option("--es-index", default=AREA_INDEX) @click.option("--code-field", default=None) @click.option("--examine/--no-examine", default=False) +@click.option("--remove/--no-remove", default=False) @click.argument("urls", nargs=-1) -def import_boundaries(urls, examine=False, code_field=None, es_index=AREA_INDEX): +def import_boundaries( + urls, examine=False, code_field=None, es_index=AREA_INDEX, remove=False +): + + es = db.get_db() + + if remove: + # update all instances of area to remove the "boundary" key + es.update_by_query( + index=es_index, + body={ + "script": 'ctx._source.remove("boundary")', + "query": {"exists": {"field": "boundary"}}, + }, + ) + return if settings.DEBUG: requests_cache.install_cache() - es = db.get_db() + # initialise the boto3 session + client = db.get_s3_client() for url in urls: - import_boundary(es, url, examine, es_index, code_field) - - -def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None): - r = requests.get(url, stream=True) - boundaries = r.json() + if url.startswith("http"): + import_boundary(client, url, examine, code_field) + else: + files = glob.glob(url, recursive=True) + for file in files: + import_boundary(client, file, examine, code_field) + + +def import_boundary(client, url, examine=False, code_field=None): + if url.startswith("http"): + r = requests.get(url, stream=True) + boundaries = r.json() + elif os.path.isfile(url): + with open(url, encoding="latin1") as f: + boundaries = json.load(f) errors = [] # find the code field for a boundary if len(boundaries.get("features", [])) == 0: - errors.append( - "[ERROR][{}] Features not found in file".format( - url, - ) - ) + errors.append("[ERROR][%s] Features not found in file" % (url,)) if len(boundaries.get("features", [])) > 0 and not code_field: test_boundary = boundaries.get("features", [])[0] code_fields = [] @@ -52,20 +76,10 @@ def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None if len(code_fields) == 1: code_field = code_fields[0] elif len(code_fields) == 0: - errors.append( - "[ERROR][{}] No code field found in file".format( - url, - ) - ) + errors.append("[ERROR][%s] No code field found in file" % (url,)) else: - errors.append( - "[ERROR][{}] Too many code fields found in file".format( - url, - ) - ) - errors.append( - "[ERROR][{}] Code fields: {}".format(url, "; ".join(code_fields)) - ) + errors.append("[ERROR][%s] Too many code fields found in file" % (url,)) + errors.append("[ERROR][%s] Code fields: %s" % (url, "; ".join(code_fields))) if len(errors) > 0: if examine: @@ -77,69 +91,48 @@ def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None code = code_field.lower().replace("cd", "") if examine: - print("[{}] Opened file: [{}]".format(code, url)) - print("[{}] Looking for code field: [{}]".format(code, code_field)) - print("[{}] Geojson type: [{}]".format(code, boundaries["type"])) - print("[{}] Number of features [{}]".format(code, len(boundaries["features"]))) + print("[%s] Opened file: [%s]" % (code, url)) + print("[%s] Looking for code field: [%s]" % (code, code_field)) + print("[%s] Geojson type: [%s]" % (code, boundaries["type"])) + print("[%s] Number of features [%s]" % (code, len(boundaries["features"]))) for k, i in enumerate(boundaries["features"][:5]): - print("[{}] Feature {} type {}".format(code, k, i["type"])) - print( - "[{}] Feature {} properties {}".format( - code, k, list(i["properties"].keys()) - ) - ) + print("[%s] Feature %s type %s" % (code, k, i["type"])) print( - "[{}] Feature {} geometry type {}".format( - code, k, i["geometry"]["type"] - ) + "[%s] Feature %s properties %s" + % (code, k, list(i["properties"].keys())) ) + print("[%s] Feature %s geometry type %s" % (code, k, i["geometry"]["type"])) print( - "[{}] Feature {} geometry length {}".format( - code, k, len(str(i["geometry"]["coordinates"])) - ) + "[%s] Feature %s geometry length %s" + % (code, k, len(str(i["geometry"]["coordinates"]))) ) if code_field in i["properties"]: print( - "[{}] Feature {} Code {}".format( - code, k, i["properties"][code_field] - ) + "[%s] Feature %s Code %s" % (code, k, i["properties"][code_field]) ) else: print( - "[ERROR][{}] Feature {} Code field not found".format( + "[ERROR][%s] Feature %s Code field not found" + % ( code, k, ) ) else: - print("[{}] Opened file: [{}]".format(code, url)) - print("[{}] {} features to import".format(code, len(boundaries["features"]))) - - with BulkImporter(name="boundaries", es=es) as importer: - for k, i in tqdm.tqdm( - enumerate(boundaries["features"]), total=len(boundaries["features"]) - ): - shp = shapely.geometry.shape(i["geometry"]).buffer(0) - boundary = { - "_index": es_index, - "_type": "_doc", - "_op_type": "update", - "_id": i["properties"][code_field], - "doc": { - "boundary": shp.wkt, - "has_boundary": True, - }, - } - importer.add(boundary) - - for error in importer.errors: - print( - " - {} {}".format( - error.get("update", {}).get("_id", ""), - error.get("update", {}) - .get("error", {}) - .get("caused_by", {}) - .get("type", ""), - ) - ) + print("[%s] Opened file: [%s]" % (code, url)) + print("[%s] %s features to import" % (code, len(boundaries["features"]))) + boundary_count = 0 + errors = [] + for k, i in tqdm.tqdm( + enumerate(boundaries["features"]), total=len(boundaries["features"]) + ): + area_code = i["properties"][code_field] + prefix = area_code[0:3] + client.upload_fileobj( + io.BytesIO(json.dumps(i).encode("utf-8")), + settings.S3_BUCKET, + "%s/%s.json" % (prefix, area_code), + ) + boundary_count += 1 + print("[%s] %s boundaries imported" % (code, boundary_count)) diff --git a/findthatpostcode/db.py b/findthatpostcode/db.py index 2dbd001..d0a8a58 100644 --- a/findthatpostcode/db.py +++ b/findthatpostcode/db.py @@ -1,4 +1,5 @@ import click +from boto3 import session from elasticsearch import Elasticsearch from elasticsearch_dsl import Document, Index @@ -24,3 +25,18 @@ def init_db(reset=False): click.echo(f"[elasticsearch] response: '{res}'") click.echo(f"[elasticsearch] creating '{DocumentType.Index.name}' index...") DocumentIndex.create(using=es) + + +def get_s3_client(): + s3_session = session.Session() + return s3_session.client( + "s3", + region_name=settings.S3_REGION, + endpoint_url=settings.S3_ENDPOINT, + aws_access_key_id=settings.S3_ACCESS_ID, + aws_secret_access_key=settings.S3_SECRET_KEY, + ) + + +def close_s3_client(e=None): + pass diff --git a/findthatpostcode/settings.py b/findthatpostcode/settings.py index 538cdc2..0c48c33 100644 --- a/findthatpostcode/settings.py +++ b/findthatpostcode/settings.py @@ -27,6 +27,13 @@ def get_es_url(default): } DEFAULT_ENCODING = "latin1" +# S3 Storage settings +S3_REGION = os.environ.get("S3_REGION") +S3_ENDPOINT = os.environ.get("S3_ENDPOINT") +S3_ACCESS_ID = os.environ.get("S3_ACCESS_ID") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY") +S3_BUCKET = os.environ.get("S3_BUCKET", "geo-boundaries") + # postcode data URLs NSPL_URL = "https://www.arcgis.com/sharing/rest/content/items/677cfc3ef56541999314efc795664ce9/data" ONSPD_URL = "https://www.arcgis.com/sharing/rest/content/items/a644dd04d18f4592b7d36705f93270d8/data"