Skip to content

Commit

Permalink
update boundaries command
Browse files Browse the repository at this point in the history
  • Loading branch information
drkane committed Feb 22, 2023
1 parent 83059d2 commit a6236a6
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 79 deletions.
151 changes: 72 additions & 79 deletions findthatpostcode/commands/boundaries.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""
Import commands for area boundaries
"""
import glob
import io
import json
import os

import click
import requests
import requests_cache
import shapely.geometry
import tqdm

from findthatpostcode import db, settings
from findthatpostcode.documents import Area, Entity
from findthatpostcode.utils import BulkImporter
from findthatpostcode.documents import Area

AREA_INDEX = Area.Index.name

Expand All @@ -19,30 +21,52 @@
@click.option("--es-index", default=AREA_INDEX)
@click.option("--code-field", default=None)
@click.option("--examine/--no-examine", default=False)
@click.option("--remove/--no-remove", default=False)
@click.argument("urls", nargs=-1)
def import_boundaries(urls, examine=False, code_field=None, es_index=AREA_INDEX):
def import_boundaries(
urls, examine=False, code_field=None, es_index=AREA_INDEX, remove=False
):

es = db.get_db()

if remove:
# update all instances of area to remove the "boundary" key
es.update_by_query(
index=es_index,
body={
"script": 'ctx._source.remove("boundary")',
"query": {"exists": {"field": "boundary"}},
},
)
return

if settings.DEBUG:
requests_cache.install_cache()

es = db.get_db()
# initialise the boto3 session
client = db.get_s3_client()

for url in urls:
import_boundary(es, url, examine, es_index, code_field)


def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None):
r = requests.get(url, stream=True)
boundaries = r.json()
if url.startswith("http"):
import_boundary(client, url, examine, code_field)
else:
files = glob.glob(url, recursive=True)
for file in files:
import_boundary(client, file, examine, code_field)


def import_boundary(client, url, examine=False, code_field=None):
if url.startswith("http"):
r = requests.get(url, stream=True)
boundaries = r.json()
elif os.path.isfile(url):
with open(url, encoding="latin1") as f:
boundaries = json.load(f)
errors = []

# find the code field for a boundary
if len(boundaries.get("features", [])) == 0:
errors.append(
"[ERROR][{}] Features not found in file".format(
url,
)
)
errors.append("[ERROR][%s] Features not found in file" % (url,))
if len(boundaries.get("features", [])) > 0 and not code_field:
test_boundary = boundaries.get("features", [])[0]
code_fields = []
Expand All @@ -52,20 +76,10 @@ def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None
if len(code_fields) == 1:
code_field = code_fields[0]
elif len(code_fields) == 0:
errors.append(
"[ERROR][{}] No code field found in file".format(
url,
)
)
errors.append("[ERROR][%s] No code field found in file" % (url,))
else:
errors.append(
"[ERROR][{}] Too many code fields found in file".format(
url,
)
)
errors.append(
"[ERROR][{}] Code fields: {}".format(url, "; ".join(code_fields))
)
errors.append("[ERROR][%s] Too many code fields found in file" % (url,))
errors.append("[ERROR][%s] Code fields: %s" % (url, "; ".join(code_fields)))

if len(errors) > 0:
if examine:
Expand All @@ -77,69 +91,48 @@ def import_boundary(es, url, examine=False, es_index=AREA_INDEX, code_field=None
code = code_field.lower().replace("cd", "")

if examine:
print("[{}] Opened file: [{}]".format(code, url))
print("[{}] Looking for code field: [{}]".format(code, code_field))
print("[{}] Geojson type: [{}]".format(code, boundaries["type"]))
print("[{}] Number of features [{}]".format(code, len(boundaries["features"])))
print("[%s] Opened file: [%s]" % (code, url))
print("[%s] Looking for code field: [%s]" % (code, code_field))
print("[%s] Geojson type: [%s]" % (code, boundaries["type"]))
print("[%s] Number of features [%s]" % (code, len(boundaries["features"])))
for k, i in enumerate(boundaries["features"][:5]):
print("[{}] Feature {} type {}".format(code, k, i["type"]))
print(
"[{}] Feature {} properties {}".format(
code, k, list(i["properties"].keys())
)
)
print("[%s] Feature %s type %s" % (code, k, i["type"]))
print(
"[{}] Feature {} geometry type {}".format(
code, k, i["geometry"]["type"]
)
"[%s] Feature %s properties %s"
% (code, k, list(i["properties"].keys()))
)
print("[%s] Feature %s geometry type %s" % (code, k, i["geometry"]["type"]))
print(
"[{}] Feature {} geometry length {}".format(
code, k, len(str(i["geometry"]["coordinates"]))
)
"[%s] Feature %s geometry length %s"
% (code, k, len(str(i["geometry"]["coordinates"])))
)
if code_field in i["properties"]:
print(
"[{}] Feature {} Code {}".format(
code, k, i["properties"][code_field]
)
"[%s] Feature %s Code %s" % (code, k, i["properties"][code_field])
)
else:
print(
"[ERROR][{}] Feature {} Code field not found".format(
"[ERROR][%s] Feature %s Code field not found"
% (
code,
k,
)
)

else:
print("[{}] Opened file: [{}]".format(code, url))
print("[{}] {} features to import".format(code, len(boundaries["features"])))

with BulkImporter(name="boundaries", es=es) as importer:
for k, i in tqdm.tqdm(
enumerate(boundaries["features"]), total=len(boundaries["features"])
):
shp = shapely.geometry.shape(i["geometry"]).buffer(0)
boundary = {
"_index": es_index,
"_type": "_doc",
"_op_type": "update",
"_id": i["properties"][code_field],
"doc": {
"boundary": shp.wkt,
"has_boundary": True,
},
}
importer.add(boundary)

for error in importer.errors:
print(
" - {} {}".format(
error.get("update", {}).get("_id", ""),
error.get("update", {})
.get("error", {})
.get("caused_by", {})
.get("type", ""),
)
)
print("[%s] Opened file: [%s]" % (code, url))
print("[%s] %s features to import" % (code, len(boundaries["features"])))
boundary_count = 0
errors = []
for k, i in tqdm.tqdm(
enumerate(boundaries["features"]), total=len(boundaries["features"])
):
area_code = i["properties"][code_field]
prefix = area_code[0:3]
client.upload_fileobj(
io.BytesIO(json.dumps(i).encode("utf-8")),
settings.S3_BUCKET,
"%s/%s.json" % (prefix, area_code),
)
boundary_count += 1
print("[%s] %s boundaries imported" % (code, boundary_count))
16 changes: 16 additions & 0 deletions findthatpostcode/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import click
from boto3 import session
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Index

Expand All @@ -24,3 +25,18 @@ def init_db(reset=False):
click.echo(f"[elasticsearch] response: '{res}'")
click.echo(f"[elasticsearch] creating '{DocumentType.Index.name}' index...")
DocumentIndex.create(using=es)


def get_s3_client():
s3_session = session.Session()
return s3_session.client(
"s3",
region_name=settings.S3_REGION,
endpoint_url=settings.S3_ENDPOINT,
aws_access_key_id=settings.S3_ACCESS_ID,
aws_secret_access_key=settings.S3_SECRET_KEY,
)


def close_s3_client(e=None):
pass
7 changes: 7 additions & 0 deletions findthatpostcode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def get_es_url(default):
}
DEFAULT_ENCODING = "latin1"

# S3 Storage settings
S3_REGION = os.environ.get("S3_REGION")
S3_ENDPOINT = os.environ.get("S3_ENDPOINT")
S3_ACCESS_ID = os.environ.get("S3_ACCESS_ID")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY")
S3_BUCKET = os.environ.get("S3_BUCKET", "geo-boundaries")

# postcode data URLs
NSPL_URL = "https://www.arcgis.com/sharing/rest/content/items/677cfc3ef56541999314efc795664ce9/data"
ONSPD_URL = "https://www.arcgis.com/sharing/rest/content/items/a644dd04d18f4592b7d36705f93270d8/data"
Expand Down

0 comments on commit a6236a6

Please sign in to comment.