Skip to content

Commit

Permalink
Implement solution to write mapped data to storage
Browse files Browse the repository at this point in the history
  • Loading branch information
= committed Apr 3, 2024
1 parent ae18705 commit 50c15d6
Show file tree
Hide file tree
Showing 127 changed files with 706 additions and 319 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,6 @@ node_modules/

# Mac Desktop Services Store
*.DS_Store

# Landing zone data storage
# /data/landing_zone/*
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ docker exec nad-ch-dev-local poetry run alembic downgrade <enter down_revision i

## Testing

Some tests in the test suite are dependent on Minio operations and access key is required. To Create a Minio access key, visit the Minio webui at [minio-webui](localhost:9001) and under User/Access Keys, click Create access key. Save the credentials to your .env file under S3_ACCESS_KEY and S3_SECRET_ACCESS_KEY.

Run the test suite as follows:

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-14 08:40:16.897824
"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/68982ccf2c7c_add_column_map_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-03-07 16:33:09.710215
"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/851709d3a162_add_report_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-06 13:22:25.266733
"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/945ca77479d1_create_users_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-16 16:16:13.902219
"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-01-26 12:20:03.153358
"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-01-26 12:25:47.404511
"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-03-05 13:21:29.812837
"""

from typing import Sequence, Union

from alembic import op
Expand Down
7 changes: 7 additions & 0 deletions data/landing_zone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# 10x National Address Database Collaboration Hub (NAD-CH)

## Data Landing Zone

The purpose of the landing zone is to provide a location to store intemediate mapped data that is generated during the validation step and needs to be written to a remote cloud storage such as AWS s3.

NOTE: This folder will likely not be deployed to production environments because it is only needed for local development. Instead a folder should be created outside of the repository and in the file system used by the server hosting the application in production. The path to that folder will be configured within the application so that the app can recognize the location and writes data to the correct path.
113 changes: 113 additions & 0 deletions nad_ch/application/data_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
from geopandas import GeoDataFrame, read_file
import fiona
from typing import Optional, Dict, Iterator
import shutil
from zipfile import ZipFile


class DataHandler(object):
def __init__(
self, column_map: Dict[str, str], mapped_data_dir: Optional[str] = None
) -> None:
self.column_map = column_map
self.mapped_data_dir = mapped_data_dir
self.mapped_data_path = (
os.path.join(
self.mapped_data_dir,
self.mapped_data_dir.split("/")[-1] + ".shp",
)
if self.mapped_data_dir
else None
)
self.zip_file_path = (
os.path.join(
self.mapped_data_dir,
self.mapped_data_dir.split("/")[-1] + ".zip",
)
if self.mapped_data_dir
else None
)
self.valid_renames = {}
self.__validate_column_map()

def __validate_column_map(self):
column_map_reverse = {}

for key, value in self.column_map.items():
if value:
value_lcase = value.lower()
if value_lcase in column_map_reverse:
column_map_reverse[value_lcase].append(key)
else:
column_map_reverse[value_lcase] = [key]
duplicates = {k: v for k, v in column_map_reverse.items() if len(v) > 1}
if duplicates:
duplicate_nad_fields = ", ".join(
[" & ".join(nad_fields) for nad_fields in list(duplicates.values())]
)
raise Exception(
f"Duplicate inputs found for destination fields: {duplicate_nad_fields}"
)

def __rename_columns(self, gdf: GeoDataFrame) -> GeoDataFrame:
column_map = self.column_map
column_map["geometry"] = "geometry"
original_names = {col.lower(): col for col in gdf.columns}
for nad_column, raw_field in column_map.items():
orig_matched_name = original_names.get(nad_column.lower())
if orig_matched_name:
self.valid_renames[orig_matched_name] = nad_column
continue
if raw_field:
orig_matched_name = original_names.get(raw_field.lower())
if orig_matched_name:
self.valid_renames[orig_matched_name] = nad_column
gdf = gdf.rename(columns=self.valid_renames)
return gdf[[col for col in self.valid_renames.values()]]

def read_file_in_batches(
self, path: str, table_name: Optional[str] = None, batch_size: int = 100000
) -> Iterator[GeoDataFrame]:
# TODO: Modify to return a joined table; for cases where 1 or more tables
# are needed to get all fields from source file.
if table_name and table_name not in fiona.listlayers(path):
raise Exception(f"Table name {table_name} does not exist")
i = 0
while True:
gdf = read_file(path, rows=slice(i, i + batch_size))
if gdf.shape[0] == 0:
if self.mapped_data_dir:
# No more batches to process, create zip file
self.__zip_mapped_data()
break
gdf = self.__rename_columns(gdf)
if self.mapped_data_dir:
self.__write_mapped_batch(gdf, i == 0)
yield gdf
i += batch_size

def __write_mapped_batch(self, gdf: GeoDataFrame, first_batch: bool):
write_mode = "a"
if first_batch:
write_mode = "w"
os.makedirs(self.mapped_data_dir, exist_ok=True)
try:
gdf.to_file(
filename=self.mapped_data_path,
index=False,
mode=write_mode,
engine="fiona",
)
except Exception:
shutil.rmtree(self.mapped_data_dir)
raise

def __zip_mapped_data(self):
with ZipFile(self.zip_file_path, "w") as zipf:
# Walk through all the files and subdirectories in the given directory
for root, dirs, files in os.walk(self.mapped_data_dir):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, self.mapped_data_dir)
zipf.write(file_path, arcname=relative_path)
67 changes: 0 additions & 67 deletions nad_ch/application/data_reader.py

This file was deleted.

38 changes: 27 additions & 11 deletions nad_ch/application/use_cases/data_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DataSubmissionViewModel,
)
from nad_ch.core.entities import DataSubmission, ColumnMap
from nad_ch.config import LANDING_ZONE


def ingest_data_submission(
Expand Down Expand Up @@ -85,14 +86,29 @@ def validate_data_submission(

# Using version 1 for column maps for now, may add feature for user to select
# version later
column_map = ctx.column_maps.get_by_name_and_version(column_map_name, 1)
report = ctx.task_queue.run_load_and_validate(
ctx.submissions,
submission.id,
download_result.extracted_dir,
column_map.mapping,
)

ctx.logger.info(f"Total number of features: {report.overview.feature_count}")

ctx.storage.cleanup_temp_dir(download_result.temp_dir)
try:
column_map = ctx.column_maps.get_by_name_and_version(column_map_name, 1)
mapped_data_local_dir = submission.get_mapped_data_dir(
download_result.extracted_dir, LANDING_ZONE
)
mapped_data_remote_dir = submission.get_mapped_data_dir(
download_result.extracted_dir, LANDING_ZONE, True
)
report = ctx.task_queue.run_load_and_validate(
ctx.submissions,
submission.id,
download_result.extracted_dir,
column_map.mapping,
mapped_data_local_dir,
)
_ = ctx.task_queue.run_copy_mapped_data_to_remote(
mapped_data_local_dir,
mapped_data_remote_dir,
)

ctx.logger.info(f"Total number of features: {report.overview.feature_count}")
except Exception:
raise
finally:
ctx.storage.cleanup_temp_dir(download_result.temp_dir)
ctx.storage.cleanup_temp_dir(mapped_data_local_dir)
9 changes: 4 additions & 5 deletions nad_ch/config/development_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@
f"postgresql+psycopg2://{postgres_user}:{postgres_password}"
f"@{postgres_host}:{postgres_port}/test_database"
)
# TEST_DATABASE_URL = (
# f"postgresql://{postgres_user}"
# f"@localhost:/var/run/postgresql/.s.PGSQL.5432/test_database"
# )
QUEUE_BROKER_URL = os.getenv("QUEUE_BROKER_URL")
QUEUE_BACKEND_URL = os.getenv("QUEUE_BACKEND_URL")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_ACCESS_KEY = os.getenv("S3_SECRET_ACCESS_KEY")
S3_REGION = os.getenv("S3_REGION")
LANDING_ZONE = os.path.join(os.getcwd(), "data/landing_zone")


class DevLocalApplicationContext(ApplicationContext):
Expand Down Expand Up @@ -68,11 +65,13 @@ def create_column_map_repository(self):
def create_logger(self):
return BasicLogger(__name__, logging.DEBUG)

def create_storage(self):
@staticmethod
def create_storage():
return MinioStorage(
S3_ENDPOINT,
S3_ACCESS_KEY,
S3_SECRET_ACCESS_KEY,
S3_REGION,
S3_BUCKET_NAME,
)

Expand Down
6 changes: 5 additions & 1 deletion nad_ch/config/development_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from nad_ch.infrastructure.storage import S3Storage


LANDING_ZONE = os.path.join(os.getcwd(), "data/landing_zone")


def get_credentials(service_name, default={}):
service = vcap_services.get(service_name, [default])
return service[0].get("credentials", default) if service else default
Expand Down Expand Up @@ -67,7 +70,8 @@ def create_column_map_repository(self):
def create_logger(self):
return BasicLogger(__name__)

def create_storage(self):
@staticmethod
def create_storage():
return S3Storage(
S3_ACCESS_KEY,
S3_SECRET_ACCESS_KEY,
Expand Down
4 changes: 3 additions & 1 deletion nad_ch/config/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DATABASE_URL = os.getenv("DATABASE_URL")
QUEUE_BROKER_URL = os.getenv("QUEUE_BROKER_URL")
QUEUE_BACKEND_URL = os.getenv("QUEUE_BACKEND_URL")
LANDING_ZONE = os.path.join(os.getcwd(), "data/landing_zone")


class TestApplicationContext(ApplicationContext):
Expand Down Expand Up @@ -45,7 +46,8 @@ def create_column_map_repository(self):
def create_logger(self):
return BasicLogger(__name__, logging.DEBUG)

def create_storage(self):
@staticmethod
def create_storage():
return FakeStorage()

def create_task_queue(self):
Expand Down
Loading

0 comments on commit 50c15d6

Please sign in to comment.