Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement solution to write mapped data to storage #51

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

- name: Test backend code
shell: bash
run: poetry run pytest
run: poetry run pytest -k 'not test_task_queue'

- name: Read .nvmrc
run: echo "##[set-output name=NVMRC;]$(cat .nvmrc)"
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,7 @@ node_modules/

# Mac Desktop Services Store
*.DS_Store

# Landing zone data storage
data/landing_zone/*
!data/landing_zone/README.md
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ docker exec nad-ch-dev-local poetry run alembic downgrade <enter down_revision i

## Testing

Some tests in the test suite are dependent on Minio operations and access key is required. To Create a Minio access key, visit the Minio webui at [minio-webui](localhost:9001) and under User/Access Keys, click Create access key. Save the credentials to your .env file under S3_ACCESS_KEY and S3_SECRET_ACCESS_KEY.

Run the test suite as follows:

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-14 08:40:16.897824
"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/68982ccf2c7c_add_column_map_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-03-07 16:33:09.710215

"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/851709d3a162_add_report_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-06 13:22:25.266733

"""

from typing import Sequence, Union

from alembic import op
Expand Down
1 change: 1 addition & 0 deletions alembic/versions/945ca77479d1_create_users_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-02-16 16:16:13.902219

"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-03-27 14:21:24.094899

"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-01-26 12:20:03.153358

"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-01-26 12:25:47.404511

"""

from typing import Sequence, Union

from alembic import op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Create Date: 2024-03-05 13:21:29.812837

"""

from typing import Sequence, Union

from alembic import op
Expand Down
7 changes: 7 additions & 0 deletions data/landing_zone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# 10x National Address Database Collaboration Hub (NAD-CH)

## Data Landing Zone

The purpose of the landing zone is to provide a location to store intemediate mapped data that is generated during the validation step and needs to be written to a remote cloud storage such as AWS s3.

NOTE: This folder will likely not be deployed to production environments because it is only needed for local development. Instead a folder should be created outside of the repository and in the file system used by the server hosting the application in production. The path to that folder will be configured within the application so that the app can recognize the location and writes data to the correct path.
113 changes: 113 additions & 0 deletions nad_ch/application/data_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
from geopandas import GeoDataFrame, read_file
import fiona
from typing import Optional, Dict, Iterator
import shutil
from zipfile import ZipFile


class DataHandler(object):
def __init__(
self, column_map: Dict[str, str], mapped_data_dir: Optional[str] = None
) -> None:
self.column_map = column_map
self.mapped_data_dir = mapped_data_dir
self.mapped_data_path = (
os.path.join(
self.mapped_data_dir,
self.mapped_data_dir.split("/")[-1] + ".shp",
)
if self.mapped_data_dir
else None
)
self.zip_file_path = (
os.path.join(
self.mapped_data_dir,
self.mapped_data_dir.split("/")[-1] + ".zip",
)
if self.mapped_data_dir
else None
)
self.valid_renames = {}
self.__validate_column_map()

def __validate_column_map(self):
column_map_reverse = {}

for key, value in self.column_map.items():
if value:
value_lcase = value.lower()
if value_lcase in column_map_reverse:
column_map_reverse[value_lcase].append(key)
else:
column_map_reverse[value_lcase] = [key]
duplicates = {k: v for k, v in column_map_reverse.items() if len(v) > 1}
if duplicates:
duplicate_nad_fields = ", ".join(
[" & ".join(nad_fields) for nad_fields in list(duplicates.values())]
)
raise Exception(
f"Duplicate inputs found for destination fields: {duplicate_nad_fields}"
)

def __rename_columns(self, gdf: GeoDataFrame) -> GeoDataFrame:
column_map = self.column_map
column_map["geometry"] = "geometry"
original_names = {col.lower(): col for col in gdf.columns}
for nad_column, raw_field in column_map.items():
orig_matched_name = original_names.get(nad_column.lower())
if orig_matched_name:
self.valid_renames[orig_matched_name] = nad_column
continue
if raw_field:
orig_matched_name = original_names.get(raw_field.lower())
if orig_matched_name:
self.valid_renames[orig_matched_name] = nad_column
gdf = gdf.rename(columns=self.valid_renames)
return gdf[[col for col in self.valid_renames.values()]]

def read_file_in_batches(
self, path: str, table_name: Optional[str] = None, batch_size: int = 100000
) -> Iterator[GeoDataFrame]:
# TODO: Modify to return a joined table; for cases where 1 or more tables
# are needed to get all fields from source file.
if table_name and table_name not in fiona.listlayers(path):
raise Exception(f"Table name {table_name} does not exist")
i = 0
while True:
gdf = read_file(path, rows=slice(i, i + batch_size))
if gdf.shape[0] == 0:
if self.mapped_data_dir:
# No more batches to process, create zip file
self.__zip_mapped_data()
break
gdf = self.__rename_columns(gdf)
if self.mapped_data_dir:
self.__write_mapped_batch(gdf, i == 0)
yield gdf
i += batch_size

def __write_mapped_batch(self, gdf: GeoDataFrame, first_batch: bool):
write_mode = "a"
if first_batch:
write_mode = "w"
os.makedirs(self.mapped_data_dir, exist_ok=True)
try:
gdf.to_file(
filename=self.mapped_data_path,
index=False,
mode=write_mode,
engine="fiona",
)
except Exception:
shutil.rmtree(self.mapped_data_dir)
raise

def __zip_mapped_data(self):
with ZipFile(self.zip_file_path, "w") as zipf:
# Walk through all the files and subdirectories in the given directory
for root, dirs, files in os.walk(self.mapped_data_dir):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, self.mapped_data_dir)
zipf.write(file_path, arcname=relative_path)
67 changes: 0 additions & 67 deletions nad_ch/application/data_reader.py

This file was deleted.

38 changes: 27 additions & 11 deletions nad_ch/application/use_cases/data_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DataSubmissionViewModel,
)
from nad_ch.core.entities import DataSubmission, ColumnMap
from nad_ch.config import LANDING_ZONE


def ingest_data_submission(
Expand Down Expand Up @@ -85,14 +86,29 @@ def validate_data_submission(

# Using version 1 for column maps for now, may add feature for user to select
# version later
column_map = ctx.column_maps.get_by_name_and_version(column_map_name, 1)
report = ctx.task_queue.run_load_and_validate(
ctx.submissions,
submission.id,
download_result.extracted_dir,
column_map.mapping,
)

ctx.logger.info(f"Total number of features: {report.overview.feature_count}")

ctx.storage.cleanup_temp_dir(download_result.temp_dir)
try:
column_map = ctx.column_maps.get_by_name_and_version(column_map_name, 1)
mapped_data_local_dir = submission.get_mapped_data_dir(
download_result.extracted_dir, LANDING_ZONE
)
mapped_data_remote_dir = submission.get_mapped_data_dir(
download_result.extracted_dir, LANDING_ZONE, True
)
report = ctx.task_queue.run_load_and_validate(
ctx.submissions,
submission.id,
download_result.extracted_dir,
column_map.mapping,
mapped_data_local_dir,
)
_ = ctx.task_queue.run_copy_mapped_data_to_remote(
mapped_data_local_dir,
mapped_data_remote_dir,
)

ctx.logger.info(f"Total number of features: {report.overview.feature_count}")
except Exception:
raise
finally:
ctx.storage.cleanup_temp_dir(download_result.temp_dir)
ctx.storage.cleanup_temp_dir(mapped_data_local_dir)
9 changes: 4 additions & 5 deletions nad_ch/config/development_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@
f"postgresql+psycopg2://{postgres_user}:{postgres_password}"
f"@{postgres_host}:{postgres_port}/test_database"
)
# TEST_DATABASE_URL = (
# f"postgresql://{postgres_user}"
# f"@localhost:/var/run/postgresql/.s.PGSQL.5432/test_database"
# )
QUEUE_BROKER_URL = os.getenv("QUEUE_BROKER_URL")
QUEUE_BACKEND_URL = os.getenv("QUEUE_BACKEND_URL")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_ACCESS_KEY = os.getenv("S3_SECRET_ACCESS_KEY")
S3_REGION = os.getenv("S3_REGION")
LANDING_ZONE = os.path.join(os.getcwd(), "data/landing_zone")


class DevLocalApplicationContext(ApplicationContext):
Expand Down Expand Up @@ -68,11 +65,13 @@ def create_column_map_repository(self):
def create_logger(self):
return BasicLogger(__name__, logging.DEBUG)

def create_storage(self):
@staticmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the create_storage method here and in the other ApplicationContext classes necessarily need to be static? Just curious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it static because I need to use a storage class instance in the task_queue.py file and I don't want to create an application context just for that use case. I wasn't able to pass the application context to the celery task like we discussed before due to serialization so this was a work around.

def create_storage():
return MinioStorage(
S3_ENDPOINT,
S3_ACCESS_KEY,
S3_SECRET_ACCESS_KEY,
S3_REGION,
S3_BUCKET_NAME,
)

Expand Down
6 changes: 5 additions & 1 deletion nad_ch/config/development_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from nad_ch.infrastructure.storage import S3Storage


LANDING_ZONE = os.path.join(os.getcwd(), "data/landing_zone")


def get_credentials(service_name, default={}):
service = vcap_services.get(service_name, [default])
return service[0].get("credentials", default) if service else default
Expand Down Expand Up @@ -67,7 +70,8 @@ def create_column_map_repository(self):
def create_logger(self):
return BasicLogger(__name__)

def create_storage(self):
@staticmethod
def create_storage():
return S3Storage(
S3_ACCESS_KEY,
S3_SECRET_ACCESS_KEY,
Expand Down
Loading
Loading