diff --git a/docker-compose.yaml b/docker-compose.yaml index 28d2a227..00227c2f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,9 +2,6 @@ version: '3' services: database: - # at time of writing this, ARM64 is not supported so we make sure to use - # a supported platform: https://github.com/postgis/docker-postgis/issues/216 - # Could possibly switch to https://github.com/vincentsarago/containers platform: linux/amd64 image: postgis/postgis:15-3.4 environment: @@ -13,6 +10,23 @@ services: - POSTGRES_DB=postgis ports: - 5439:5432 - command: postgres -N 500 + command: > + postgres -N 500 + -c checkpoint_timeout=30min + -c synchronous_commit=off + -c max_wal_senders=0 + -c max_connections=8 + -c shared_buffers=2GB + -c effective_cache_size=6GB + -c maintenance_work_mem=512MB + -c checkpoint_completion_target=0.9 + -c wal_buffers=16MB + -c default_statistics_target=100 + -c random_page_cost=1.1 + -c effective_io_concurrency=200 + -c work_mem=256MB + -c huge_pages=off + -c min_wal_size=1GB + -c max_wal_size=4GB volumes: - - ./.pgdata:/var/lib/postgresql/data + - ./.pgdata:/var/lib/postgresql/data \ No newline at end of file diff --git a/docs/acceptance/db.md b/docs/acceptance/db.md index d05d2e52..51c9da4d 100644 --- a/docs/acceptance/db.md +++ b/docs/acceptance/db.md @@ -54,32 +54,15 @@ You can use the CLI tool for data ingestion. First, ensure you have the required poetry install ``` -To download the Parquet file from S3 and load it into the database, run the following command: +To load a Parquet file it into the database, run the following command: ```bash -poetry run space2stats-ingest download-and-load \ - "s3:///space2stats.parquet" \ +poetry run space2stats-ingest load \ "postgresql://username:password@localhost:5439/postgres" \ - "/space2stats.json" \ - --parquet-file "local.parquet" + "" \ + "local.parquet" ``` -Alternatively, you can run the `download` and `load` commands separately: - -1. **Download the Parquet file**: - ```bash - poetry run space2stats-ingest download "s3:///space2stats.parquet" --local-path "local.parquet" - ``` - -2. **Load the Parquet file into the database**: - ```bash - poetry run space2stats-ingest download-and-load \ - "s3:///space2stats.parquet" \ - "postgresql://username:password@localhost:5439/postgres" \ - "/space2stats.json" \ - --parquet-file "local.parquet" - ``` - ### Database Configuration Once connected to the database via `psql` or a PostgreSQL client (e.g., `pgAdmin`), execute the following SQL command to create an index on the `space2stats` table: @@ -110,3 +93,28 @@ SELECT sum_pop_2020 FROM space2stats WHERE hex_id IN ('86beabd8fffffff', '86beab ### Conclusion Ensure all steps are followed to verify the ETL process, database setup, and data ingestion pipeline. Reach out to the development team for any further assistance or troubleshooting. + + +#### Updating test + +- Spin up database with docker: +``` +docker-compose up +``` +- Download initial dataset: +``` +aws s3 cp s3://wbg-geography01/Space2Stats/parquet/GLOBAL/space2stats.parquet . +download: s3://wbg-geography01/Space2Stats/parquet/GLOBAL/space2stats.parquet to ./space2stats.parquet +``` +- Upload initial dataset: +``` +space2stats-ingest ./space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_population_2020.json space2stats.parquet +``` +- Generate second dataset: +``` +python space2stats_ingest/METADATA/generate_test_data.py +``` +- Upload second dataset: +``` +space2stats-ingest ./space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_reupload_test.json space2stats_test.parquet +``` \ No newline at end of file diff --git a/space2stats_api/src/poetry.lock b/space2stats_api/src/poetry.lock index d144f105..4bfda86e 100644 --- a/space2stats_api/src/poetry.lock +++ b/space2stats_api/src/poetry.lock @@ -3152,6 +3152,26 @@ files = [ [package.dependencies] certifi = "*" +[[package]] +name = "pystac" +version = "1.11.0" +description = "Python library for working with the SpatioTemporal Asset Catalog (STAC) specification" +optional = false +python-versions = ">=3.10" +files = [ + {file = "pystac-1.11.0-py3-none-any.whl", hash = "sha256:10ac7c7b4ea6c5ec8333829a09ec1a33b596f02d1a97ffbbd72cd1b6c10598c1"}, + {file = "pystac-1.11.0.tar.gz", hash = "sha256:acb1e04be398a0cda2d8870ab5e90457783a8014a206590233171d8b2ae0d9e7"}, +] + +[package.dependencies] +python-dateutil = ">=2.7.0" + +[package.extras] +jinja2 = ["jinja2 (<4.0)"] +orjson = ["orjson (>=3.5)"] +urllib3 = ["urllib3 (>=1.26)"] +validation = ["jsonschema (>=4.18,<5.0)"] + [[package]] name = "pytest" version = "8.3.3" @@ -4261,4 +4281,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "8d487ee99c98da57d1321c394fcbe7731416d449382e34472d70e9ccc6990c7c" +content-hash = "43616f559aa97cf647e903a8c1f9d74f1920328dd56853dfe99e811b017b62d1" diff --git a/space2stats_api/src/pyproject.toml b/space2stats_api/src/pyproject.toml index a86b837a..925a1568 100644 --- a/space2stats_api/src/pyproject.toml +++ b/space2stats_api/src/pyproject.toml @@ -34,6 +34,8 @@ boto3 = "^1.35.25" pyarrow = "^17.0.0" adbc-driver-postgresql = "^1.2.0" tqdm = "^4.66.5" +pystac = "^1.11.0" +jsonschema = "^4.23.0" [tool.poetry.group.notebook.dependencies] pandas = "*" diff --git a/space2stats_api/src/space2stats_ingest/METADATA/generate_test_data.py b/space2stats_api/src/space2stats_ingest/METADATA/generate_test_data.py new file mode 100644 index 00000000..f723a4cb --- /dev/null +++ b/space2stats_api/src/space2stats_ingest/METADATA/generate_test_data.py @@ -0,0 +1,23 @@ +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq + +# Load the original Parquet file +input_file = "space2stats.parquet" +table = pq.read_table(input_file) + +# Select only the 'hex_id' column +table = table.select(["hex_id"]) + +# Create the new 'test_column' with random values +num_rows = table.num_rows +test_column = pa.array(np.random.random(size=num_rows), type=pa.float64()) + +# Add 'test_column' to the table +table = table.append_column("test_column", test_column) + +# Save the modified table to a new Parquet file +output_file = "space2stats_test.parquet" +pq.write_table(table, output_file) + +print(f"Modified Parquet file saved as {output_file}") diff --git a/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_population_2020.json b/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_population_2020.json new file mode 100644 index 00000000..76a66214 --- /dev/null +++ b/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_population_2020.json @@ -0,0 +1,336 @@ +{ + "type": "Feature", + "stac_version": "1.0.0", + "stac_extensions": [ + "https://stac-extensions.github.io/table/v1.2.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "id": "space2stats_population_2020", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + -179.99999561620714, + -89.98750455101016 + ], + [ + -179.99999561620714, + 89.98750455101016 + ], + [ + 179.99999096313272, + 89.98750455101016 + ], + [ + 179.99999096313272, + -89.98750455101016 + ], + [ + -179.99999561620714, + -89.98750455101016 + ] + ] + ] + }, + "bbox": [ + -179.99999561620714, + -89.98750455101016, + 179.99999096313272, + 89.98750455101016 + ], + "properties": { + "name": "Population Data", + "description": "Gridded population disaggregated by gender for the year 2020, with data available for different age groups.", + "methodological_notes": "Global raster files are processed for each hexagonal grid using zonal statistics.", + "source_data": "WorldPop gridded population, 2020, Unconstrained, UN-Adjusted", + "sci:citation": "Stevens FR, Gaughan AE, Linard C, Tatem AJ (2015) Disaggregating Census Data for Population Mapping Using Random Forests with Remotely-Sensed and Ancillary Data.", + "organization": "WorldPop, https://www.worldpop.org", + "method": "sum", + "resolution": "100 meters", + "table:primary_geometry": "geometry", + "table:columns": [ + { + "name": "hex_id", + "description": "H3 unique identifier", + "type": "object" + }, + { + "name": "sum_pop_f_0_2020", + "description": "Total population female, ages 0 to 1, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_10_2020", + "description": "Total population female, ages 10 to 15, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_15_2020", + "description": "Total population female, ages 15 to 20, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_1_2020", + "description": "Total population female, ages 1 to 10, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_20_2020", + "description": "Total population female, ages 20 to 25, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_25_2020", + "description": "Total population female, ages 25 to 30, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_30_2020", + "description": "Total population female, ages 30 to 35, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_35_2020", + "description": "Total population female, ages 35 to 40, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_40_2020", + "description": "Total population female, ages 40 to 45, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_45_2020", + "description": "Total population female, ages 45 to 50, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_50_2020", + "description": "Total population female, ages 50 to 55, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_55_2020", + "description": "Total population female, ages 55 to 60, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_5_2020", + "description": "Total population female, ages 5 to 10, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_60_2020", + "description": "Total population female, ages 60 to 65, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_65_2020", + "description": "Total population female, ages 65 to 70, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_70_2020", + "description": "Total population female, ages 70 to 75, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_75_2020", + "description": "Total population female, ages 75 to 80, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_80_2020", + "description": "Total population female, ages 80 and above, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_0_2020", + "description": "Total population male, ages 0 to 1, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_10_2020", + "description": "Total population male, ages 10 to 15, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_15_2020", + "description": "Total population male, ages 15 to 20, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_1_2020", + "description": "Total population male, ages 1 to 10, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_20_2020", + "description": "Total population male, ages 20 to 25, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_25_2020", + "description": "Total population male, ages 25 to 30, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_30_2020", + "description": "Total population male, ages 30 to 35, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_35_2020", + "description": "Total population male, ages 35 to 40, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_40_2020", + "description": "Total population male, ages 40 to 45, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_45_2020", + "description": "Total population male, ages 45 to 50, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_50_2020", + "description": "Total population male, ages 50 to 55, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_55_2020", + "description": "Total population male, ages 55 to 60, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_5_2020", + "description": "Total population male, ages 5 to 10, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_60_2020", + "description": "Total population male, ages 60 to 65, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_65_2020", + "description": "Total population male, ages 65 to 70, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_70_2020", + "description": "Total population male, ages 70 to 75, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_75_2020", + "description": "Total population male, ages 75 to 80, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_80_2020", + "description": "Total population male, ages 80 and above, 2020", + "type": "float64" + }, + { + "name": "sum_pop_f_2020", + "description": "Total population female, 2020", + "type": "float64" + }, + { + "name": "sum_pop_m_2020", + "description": "Total population male, 2020", + "type": "float64" + }, + { + "name": "sum_pop_2020", + "description": "Total population, 2020", + "type": "float64" + } + ], + "vector:layers": { + "space2stats": { + "hex_id": "object", + "sum_pop_f_0_2020": "float64", + "sum_pop_f_10_2020": "float64", + "sum_pop_f_15_2020": "float64", + "sum_pop_f_1_2020": "float64", + "sum_pop_f_20_2020": "float64", + "sum_pop_f_25_2020": "float64", + "sum_pop_f_30_2020": "float64", + "sum_pop_f_35_2020": "float64", + "sum_pop_f_40_2020": "float64", + "sum_pop_f_45_2020": "float64", + "sum_pop_f_50_2020": "float64", + "sum_pop_f_55_2020": "float64", + "sum_pop_f_5_2020": "float64", + "sum_pop_f_60_2020": "float64", + "sum_pop_f_65_2020": "float64", + "sum_pop_f_70_2020": "float64", + "sum_pop_f_75_2020": "float64", + "sum_pop_f_80_2020": "float64", + "sum_pop_m_0_2020": "float64", + "sum_pop_m_10_2020": "float64", + "sum_pop_m_15_2020": "float64", + "sum_pop_m_1_2020": "float64", + "sum_pop_m_20_2020": "float64", + "sum_pop_m_25_2020": "float64", + "sum_pop_m_30_2020": "float64", + "sum_pop_m_35_2020": "float64", + "sum_pop_m_40_2020": "float64", + "sum_pop_m_45_2020": "float64", + "sum_pop_m_50_2020": "float64", + "sum_pop_m_55_2020": "float64", + "sum_pop_m_5_2020": "float64", + "sum_pop_m_60_2020": "float64", + "sum_pop_m_65_2020": "float64", + "sum_pop_m_70_2020": "float64", + "sum_pop_m_75_2020": "float64", + "sum_pop_m_80_2020": "float64", + "sum_pop_f_2020": "float64", + "sum_pop_m_2020": "float64", + "sum_pop_2020": "float64", + "geometry": "geometry" + } + }, + "themes": [ + "Demographics", + "Population" + ], + "datetime": "2024-10-24T14:54:26.131129Z" + }, + "links": [ + { + "rel": "root", + "href": "../../catalog.json", + "type": "application/json", + "title": "Space2Stats Database" + }, + { + "rel": "parent", + "href": "../collection.json", + "type": "application/json", + "title": "Space2Stats Collection" + }, + { + "rel": "collection", + "href": "../collection.json", + "type": "application/json", + "title": "Space2Stats Collection" + } + ], + "assets": { + "api-docs": { + "href": "https://space2stats.ds.io/docs", + "type": "text/html", + "title": "API Documentation", + "roles": [ + "metadata" + ] + } + }, + "collection": "space2stats-collection" + } \ No newline at end of file diff --git a/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_reupload_test.json b/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_reupload_test.json new file mode 100644 index 00000000..a527395e --- /dev/null +++ b/space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats/space2stats_population_2020/space2stats_reupload_test.json @@ -0,0 +1,30 @@ +{ + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_test", + "properties": { + "table:columns": [ + { + "name": "hex_id", + "type": "string", + "description": "Unique identifier for hexagonal regions." + }, + { + "name": "test_column", + "type": "float", + "description": "Random test values for validation of re-upload capabilities." + } + ], + "datetime": "2024-11-05T00:00:00Z" + }, + "geometry": null, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": { + "data": { + "href": "s3://your-bucket/path/to/space2stats.parquet", + "title": "Space2Stats Parquet File", + "type": "application/x-parquet" + } + } + } \ No newline at end of file diff --git a/space2stats_api/src/space2stats_ingest/cli.py b/space2stats_api/src/space2stats_ingest/cli.py index de23b9c1..85934a7a 100644 --- a/space2stats_api/src/space2stats_ingest/cli.py +++ b/space2stats_api/src/space2stats_ingest/cli.py @@ -2,7 +2,7 @@ import typer -from .main import download_parquet_from_s3, load_parquet_to_db +from .main import load_parquet_to_db app = typer.Typer() @@ -22,52 +22,17 @@ def wrapper(*args, **kwargs): return wrapper -@app.command() -@handle_errors -def download(s3_path: str, local_path: str = typer.Option("local.parquet")): - """ - Download a Parquet file from an S3 bucket. - """ - typer.echo(f"Starting download from S3: {s3_path}") - download_parquet_from_s3(s3_path, local_path) - typer.echo(f"Download complete: {local_path}") - - @app.command() @handle_errors def load( connection_string: str, - stac_metadata_file: str, # Add the STAC metadata file path as an argument - parquet_file: str = typer.Option("local.parquet"), + stac_item_path: str, # Add the STAC metadata file path as an argument + parquet_file: str, chunksize: int = 64_000, ): """ Load a Parquet file into a PostgreSQL database after verifying columns with the STAC metadata. """ typer.echo(f"Loading data into PostgreSQL database from {parquet_file}") - load_parquet_to_db(parquet_file, connection_string, stac_metadata_file, chunksize) + load_parquet_to_db(parquet_file, connection_string, stac_item_path, chunksize) typer.echo("Data loaded successfully to PostgreSQL!") - - -@app.command() -@handle_errors -def download_and_load( - s3_path: str, - connection_string: str, - stac_metadata_file: str, # Add the STAC metadata file path as an argument - parquet_file: str = typer.Option("local.parquet"), - chunksize: int = 64_000, -): - """ - Download a Parquet file from S3, verify columns with the STAC metadata, and load it into a PostgreSQL database. - """ - download( - s3_path=s3_path, - local_path=parquet_file, - ) - load( - parquet_file=parquet_file, - connection_string=connection_string, - stac_metadata_file=stac_metadata_file, # Ensure this is passed along - chunksize=chunksize, - ) diff --git a/space2stats_api/src/space2stats_ingest/main.py b/space2stats_api/src/space2stats_ingest/main.py index 2c8fe4fe..e8632b27 100644 --- a/space2stats_api/src/space2stats_ingest/main.py +++ b/space2stats_api/src/space2stats_ingest/main.py @@ -1,124 +1,200 @@ -import json import tempfile +from typing import Set import adbc_driver_postgresql.dbapi as pg import boto3 +import pyarrow as pa import pyarrow.parquet as pq +from pystac import Item, STACValidationError from tqdm import tqdm TABLE_NAME = "space2stats" -def read_parquet_file(file_path: str): - """ - Reads a Parquet file either from a local path or an S3 path. - - Args: - file_path (str): Path to the Parquet file, either local or S3. - - Returns: - pyarrow.Table: Parquet table object. - """ +def read_parquet_file(file_path: str) -> pa.Table: + """Reads a Parquet file either from a local path or an S3 path.""" if file_path.startswith("s3://"): - # Read from S3 s3 = boto3.client("s3") bucket, key = file_path[5:].split("/", 1) with tempfile.NamedTemporaryFile() as tmp_file: s3.download_file(bucket, key, tmp_file.name) table = pq.read_table(tmp_file.name) else: - # Read from local path table = pq.read_table(file_path) - return table -def read_stac_metadata_file(file_path: str): - """ - Reads a STAC metadata file either from a local path or an S3 path. +def get_stac_fields_from_item(stac_item_path: str) -> Set[str]: + item = Item.from_file(stac_item_path) + columns = [c["name"] for c in item.properties.get("table:columns")] + return set(columns) - Args: - file_path (str): Path to the STAC metadata file, either local or S3. - Returns: - dict: Parsed JSON content of the STAC metadata. - """ - if file_path.startswith("s3://"): - s3 = boto3.client("s3") - bucket, key = file_path[5:].split("/", 1) - with tempfile.NamedTemporaryFile() as tmp_file: - s3.download_file(bucket, key, tmp_file.name) - with open(tmp_file.name, "r") as f: - stac_metadata = json.load(f) - else: - with open(file_path, "r") as f: - stac_metadata = json.load(f) +def validate_stac_item(stac_item_path: str) -> bool: + item = Item.from_file(stac_item_path) + try: + item.validate() + return True + except STACValidationError as e: + raise STACValidationError(f"Expected valid STAC item, error: {e}") - return stac_metadata +def verify_columns( + parquet_file: str, stac_item_path: str, connection_string: str +) -> bool: + """Verifies that the Parquet file columns match the STAC item metadata columns, + ensures that 'hex_id' column is present, and checks that new columns don't already exist in the database.""" -def verify_columns(parquet_file: str, stac_metadata_file: str) -> bool: - """ - Verifies that the Parquet file columns match the STAC item metadata columns. - - Args: - parquet_file (str): Path to the Parquet file. - stac_metadata_file (str): Path to the STAC item metadata JSON file. - - Returns: - bool: True if the columns match, False otherwise. - """ + # Read Parquet columns and STAC fields parquet_table = read_parquet_file(parquet_file) parquet_columns = set(parquet_table.column_names) + stac_fields = get_stac_fields_from_item(stac_item_path) - stac_metadata = read_stac_metadata_file(stac_metadata_file) - stac_columns = { - column["name"] for column in stac_metadata["properties"]["table:columns"] - } + # Check if 'hex_id' is present in the Parquet columns + if "hex_id" not in parquet_columns: + raise ValueError("The 'hex_id' column is missing from the Parquet file.") - if parquet_columns != stac_columns: - extra_in_parquet = parquet_columns - stac_columns - extra_in_stac = stac_columns - parquet_columns + # Verify Parquet columns match the STAC fields + if parquet_columns != stac_fields: + extra_in_parquet = parquet_columns - stac_fields + extra_in_stac = stac_fields - parquet_columns raise ValueError( f"Column mismatch: Extra in Parquet: {extra_in_parquet}, Extra in STAC: {extra_in_stac}" ) - return True - + # Retrieve columns already present in the main table in the database + with pg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute(f""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = '{TABLE_NAME}' + """) + existing_columns = set(row[0] for row in cur.fetchall()) + + # Check for overlap in columns (excluding 'hex_id') + overlapping_columns = parquet_columns.intersection(existing_columns) - {"hex_id"} + if overlapping_columns: + raise ValueError( + f"Columns already exist in the database: {overlapping_columns}" + ) -def download_parquet_from_s3(s3_path: str, local_path: str): - """ - Downloads a Parquet file from an S3 bucket and saves it locally. - """ - s3 = boto3.client("s3") + return True - # Split the S3 path into bucket and key - if s3_path.startswith("s3://"): - s3_path = s3_path[5:] - bucket, key = s3_path.split("/", 1) - s3.download_file(bucket, key, local_path) +def merge_tables(db_table: pa.Table, parquet_table: pa.Table) -> pa.Table: + """Adds columns from the Parquet table to the database table in memory.""" + for column in parquet_table.column_names: + if column != "hex_id": # Exclude hex_id to prevent duplicates + db_table = db_table.append_column(column, parquet_table[column]) + return db_table def load_parquet_to_db( parquet_file: str, connection_string: str, - stac_metadata_file: str, + stac_item_path: str, chunksize: int = 64_000, ): - # Verify column consistency between Parquet file and STAC metadata - if not verify_columns(parquet_file, stac_metadata_file): - raise ValueError("Column mismatch between Parquet file and STAC metadata") - - table = pq.read_table(parquet_file) - with ( - pg.connect(connection_string) as conn, - conn.cursor() as cur, - tqdm(total=table.num_rows, desc="Loading to PostgreSQL", unit="rows") as pbar, - ): - cur.adbc_ingest(TABLE_NAME, table.slice(0, 0), mode="replace") - for batch in table.to_batches(max_chunksize=chunksize): - count = cur.adbc_ingest(TABLE_NAME, batch, mode="append") - pbar.update(count) - cur.execute("CREATE INDEX ON space2stats (hex_id);") + """Main function to load and update data in PostgreSQL using Arrow in replace mode.""" + validate_stac_item(stac_item_path) + verify_columns(parquet_file, stac_item_path, connection_string) + + # Check if the table already exists in the database + with pg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT to_regclass('{TABLE_NAME}');") + table_exists = cur.fetchone()[0] is not None + + if not table_exists: + # If the table does not exist, directly ingest the Parquet file in batches + parquet_table = read_parquet_file(parquet_file) + + with pg.connect(connection_string) as conn, tqdm( + total=parquet_table.num_rows, desc="Ingesting Data", unit="rows" + ) as pbar: + with conn.cursor() as cur: + # Create an empty table with the same schema + cur.adbc_ingest(TABLE_NAME, parquet_table.slice(0, 0), mode="replace") + + for batch in parquet_table.to_batches(max_chunksize=chunksize): + cur.adbc_ingest(TABLE_NAME, batch, mode="append") + pbar.update(batch.num_rows) + + # Create an index on hex_id for future joins + print("Creating index") + cur.execute( + f"CREATE INDEX idx_{TABLE_NAME}_hex_id ON {TABLE_NAME} (hex_id)" + ) + conn.commit() + return + + # Load Parquet file into a temporary table + parquet_table = read_parquet_file(parquet_file) + temp_table = f"{TABLE_NAME}_temp" + with pg.connect(connection_string) as conn, tqdm( + total=parquet_table.num_rows, desc="Ingesting Temporary Table", unit="rows" + ) as pbar: + with conn.cursor() as cur: + cur.adbc_ingest(temp_table, parquet_table.slice(0, 0), mode="replace") + + for batch in parquet_table.to_batches(max_chunksize=chunksize): + cur.adbc_ingest(temp_table, batch, mode="append") + pbar.update(batch.num_rows) + + conn.commit() + + # Fetch columns to add to the main table + with pg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute(f""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = '{temp_table}' + AND column_name NOT IN ( + SELECT column_name FROM information_schema.columns WHERE table_name = '{TABLE_NAME}' + ) + """) + new_columns = cur.fetchall() + + # Add new columns and attempt to update in a transaction + try: + with pg.connect(connection_string) as conn: + with conn.cursor() as cur: + # Add new columns to the main table + for column, column_type in new_columns: + cur.execute( + f"ALTER TABLE {TABLE_NAME} ADD COLUMN IF NOT EXISTS {column} {column_type}" + ) + + print(f"Adding new columns: {[c[0] for c in new_columns]}...") + + # Construct the SET clause for the update query + update_columns = [ + f"{column} = temp.{column}" for column, _ in new_columns + ] + set_clause = ", ".join(update_columns) + + # Update TABLE_NAME with data from temp_table based on matching hex_id + print( + "Adding columns to dataset... All or nothing operation may take some time." + ) + cur.execute(f""" + UPDATE {TABLE_NAME} AS main + SET {set_clause} + FROM {temp_table} AS temp + WHERE main.hex_id = temp.hex_id + """) + + conn.commit() # Commit transaction if all operations succeed + except Exception as e: + # Rollback if any error occurs during the update + print("An error occurred during update. Rolling back changes.") + conn.rollback() + raise e # Re-raise the exception to alert calling code + + # Drop the temporary table + with pg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute(f"DROP TABLE IF EXISTS {temp_table}") conn.commit() diff --git a/space2stats_api/src/tests/conftest.py b/space2stats_api/src/tests/conftest.py index 9c02a1c1..cd1c5c11 100644 --- a/space2stats_api/src/tests/conftest.py +++ b/space2stats_api/src/tests/conftest.py @@ -29,6 +29,19 @@ def aws_credentials(): os.environ["AWS_DEFAULT_REGION"] = "us-east-1" +@pytest.fixture(scope="function") +def clean_database(postgresql_proc): + with DatabaseJanitor( + user=postgresql_proc.user, + host=postgresql_proc.host, + port=postgresql_proc.port, + dbname="cleantestdb", + version=postgresql_proc.version, + password="password", + ) as jan: + yield jan + + @pytest.fixture(scope="function") def database(postgresql_proc): """Set up a PostgreSQL database for testing and clean up afterwards.""" @@ -111,3 +124,19 @@ def aoi_example(): }, properties={}, ) + + +@pytest.fixture +def stac_catalog_path(): + return "./space2stats_ingest/METADATA/stac/catalog.json" + + +@pytest.fixture +def stac_item_path(): + current_dir = os.path.dirname(os.path.abspath(__file__)) + root_dir = os.path.abspath(os.path.join(current_dir, "../../..")) + json_file_path = os.path.join( + root_dir, + "space2stats_api/src/space2stats_ingest/METADATA/stac/space2stats-collection/space2stats_population_2020/space2stats_population_2020.json", + ) + return json_file_path diff --git a/space2stats_api/src/tests/test_ingest.py b/space2stats_api/src/tests/test_ingest.py index fc91e2fd..4b99f2d5 100644 --- a/space2stats_api/src/tests/test_ingest.py +++ b/space2stats_api/src/tests/test_ingest.py @@ -1,59 +1,296 @@ -import os +import json import psycopg import pyarrow as pa import pyarrow.parquet as pq -from space2stats_ingest.main import download_parquet_from_s3, load_parquet_to_db +from space2stats_ingest.main import load_parquet_to_db -def test_download_parquet_from_s3(s3_mock): - s3_path = "s3://mybucket/myfile.parquet" - parquet_file = "local.parquet" +def test_load_parquet_to_db(clean_database, tmpdir): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" - s3_mock.put_object( - Bucket="mybucket", Key="myfile.parquet", Body=b"mock_parquet_data" - ) + parquet_file = tmpdir.join("local.parquet") + item_file = tmpdir.join("space2stats_population_2020.json") - download_parquet_from_s3(s3_path, parquet_file) + data = { + "hex_id": ["hex_1", "hex_2"], + "sum_pop_f_10_2020": [100, 200], + "sum_pop_m_10_2020": [150, 250], + } - assert os.path.exists(parquet_file) + table = pa.table(data) + pq.write_table(table, parquet_file) + + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2020", + "properties": { + "table:columns": [ + {"name": "hex_id", "type": "string"}, + {"name": "sum_pop_f_10_2020", "type": "int64"}, + {"name": "sum_pop_m_10_2020", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + with open(item_file, "w") as f: + json.dump(stac_item, f) + + load_parquet_to_db(str(parquet_file), connection_string, str(item_file)) + + with psycopg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM space2stats WHERE hex_id = 'hex_1'") + result = cur.fetchone() + assert result == ("hex_1", 100, 150) + + cur.execute("SELECT * FROM space2stats WHERE hex_id = 'hex_2'") + result = cur.fetchone() + assert result == ("hex_2", 200, 250) + + +def test_updating_table(clean_database, tmpdir): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" -def test_load_parquet_to_db(database, tmpdir): - connection_string = f"postgresql://{database.user}:{database.password}@{database.host}:{database.port}/{database.dbname}" parquet_file = tmpdir.join("local.parquet") - stac_metadata_file = tmpdir.join("stac_metadata.json") + item_file = tmpdir.join("space2stats_population_2020.json") data = { "hex_id": ["hex_1", "hex_2"], - "sum_pop_2020": [100, 200], - "sum_pop_f_10_2020": [300, 400], + "sum_pop_f_10_2020": [100, 200], + "sum_pop_m_10_2020": [150, 250], } + table = pa.table(data) pq.write_table(table, parquet_file) - with open(stac_metadata_file, "w") as f: - f.write(""" - { - "type": "Feature", - "properties": { - "table:columns": [ - {"name": "hex_id", "type": "string"}, - {"name": "sum_pop_2020", "type": "int64"}, - {"name": "sum_pop_f_10_2020", "type": "int64"} - ] - } - } - """) - - load_parquet_to_db(str(parquet_file), connection_string, str(stac_metadata_file)) + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2020", + "properties": { + "table:columns": [ + {"name": "hex_id", "type": "string"}, + {"name": "sum_pop_f_10_2020", "type": "int64"}, + {"name": "sum_pop_m_10_2020", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + with open(item_file, "w") as f: + json.dump(stac_item, f) + + load_parquet_to_db(str(parquet_file), connection_string, str(item_file)) + + update_item_file = tmpdir.join("space2stats_population_2020.json") + update_parquet_file = tmpdir.join("update_local_parquet.json") + update_data = { + "hex_id": ["hex_1", "hex_2"], + "nighttime_lights": [10_000, 20_000], + } + update_table = pa.table(update_data) + pq.write_table(update_table, update_parquet_file) + + update_stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_nighttime_lights_2020", + "properties": { + "table:columns": [ + {"name": "hex_id", "type": "string"}, + {"name": "nighttime_lights", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + with open(update_item_file, "w") as f: + json.dump(update_stac_item, f) + + load_parquet_to_db( + str(update_parquet_file), connection_string, str(update_item_file) + ) with psycopg.connect(connection_string) as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM space2stats WHERE hex_id = 'hex_1'") result = cur.fetchone() - assert result == ("hex_1", 100, 300) + assert result == ("hex_1", 100, 150, 10_000) cur.execute("SELECT * FROM space2stats WHERE hex_id = 'hex_2'") result = cur.fetchone() - assert result == ("hex_2", 200, 400) + assert result == ("hex_2", 200, 250, 20_000) + + +def test_columns_already_exist_in_db(clean_database, tmpdir): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" + + parquet_file = tmpdir.join("local.parquet") + data = { + "hex_id": ["hex_1", "hex_2"], + "existing_column": [123, 456], # Simulates an existing column in DB + "new_column": [789, 1011], + } + table = pa.table(data) + pq.write_table(table, parquet_file) + + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2020", + "properties": { + "table:columns": [ + {"name": "hex_id", "type": "string"}, + {"name": "existing_column", "type": "int64"}, + {"name": "new_column", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + item_file = tmpdir.join("space2stats_population_2020.json") + with open(item_file, "w") as f: + json.dump(stac_item, f) + + load_parquet_to_db(str(parquet_file), connection_string, str(item_file)) + + with psycopg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM space2stats WHERE hex_id = 'hex_1'") + result = cur.fetchone() + assert result == ("hex_1", 123, 789) # Verify no duplicates + + +def test_rollback_on_update_failure(clean_database, tmpdir): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" + + parquet_file = tmpdir.join("local.parquet") + data = { + "hex_id": ["hex_1", "hex_2"], + "sum_pop_f_10_2020": [100, 200], + "sum_pop_m_10_2020": [150, 250], + } + table = pa.table(data) + pq.write_table(table, parquet_file) + + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2020", + "properties": { + "table:columns": [ + {"name": "hex_id", "type": "string"}, + {"name": "sum_pop_f_10_2020", "type": "int64"}, + {"name": "sum_pop_m_10_2020", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + item_file = tmpdir.join("space2stats_population_2020.json") + with open(item_file, "w") as f: + json.dump(stac_item, f) + + load_parquet_to_db(str(parquet_file), connection_string, str(item_file)) + + # Invalid Parquet without `hex_id` + update_parquet_file = tmpdir.join("update_local.parquet") + update_data = { + "new_column": [1000, 2000], + } + update_table = pa.table(update_data) + pq.write_table(update_table, update_parquet_file) + + update_item_file = tmpdir.join("update_item.json") + update_stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2021", + "properties": { + "table:columns": [{"name": "new_column", "type": "int64"}], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + with open(update_item_file, "w") as f: + json.dump(update_stac_item, f) + + try: + load_parquet_to_db( + str(update_parquet_file), connection_string, str(update_item_file) + ) + except ValueError: + pass + + with psycopg.connect(connection_string) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'space2stats'" + ) + columns = [row[0] for row in cur.fetchall()] + assert "new_column" not in columns # Verify no unwanted columns were added + + +def test_hex_id_column_mandatory(clean_database, tmpdir): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" + + parquet_file = tmpdir.join("missing_hex_id.parquet") + data = { + "sum_pop_f_10_2020": [100, 200], + "sum_pop_m_10_2020": [150, 250], + } + table = pa.table(data) + pq.write_table(table, parquet_file) + + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "space2stats_population_2020", + "properties": { + "table:columns": [ + {"name": "sum_pop_f_10_2020", "type": "int64"}, + {"name": "sum_pop_m_10_2020", "type": "int64"}, + ], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + + item_file = tmpdir.join("space2stats_population_2020.json") + with open(item_file, "w") as f: + json.dump(stac_item, f) + + try: + load_parquet_to_db(str(parquet_file), connection_string, str(item_file)) + except ValueError as e: + assert "The 'hex_id' column is missing from the Parquet file." in str(e) diff --git a/space2stats_api/src/tests/test_ingest_cli.py b/space2stats_api/src/tests/test_ingest_cli.py index f05f6846..2e146e35 100644 --- a/space2stats_api/src/tests/test_ingest_cli.py +++ b/space2stats_api/src/tests/test_ingest_cli.py @@ -1,4 +1,4 @@ -import os +import json import pyarrow as pa import pyarrow.parquet as pq @@ -13,54 +13,77 @@ def create_mock_parquet_file(parquet_file, columns): pq.write_table(table, parquet_file) -def test_download_command(tmpdir, s3_mock): - s3_path = "s3://mybucket/myfile.parquet" - parquet_file = tmpdir.join("local.parquet") - - s3_mock.put_object( - Bucket="mybucket", Key="myfile.parquet", Body=b"mock_parquet_data" - ) - - result = runner.invoke( - app, ["download", s3_path, "--local-path", str(parquet_file)] - ) - print(result.output) - - assert result.exit_code == 0 - assert "Starting download from S3" in result.stdout - assert "Download complete" in result.stdout - assert os.path.exists(parquet_file) +def create_stac_item(item_file, columns, item_id="space2stats_population_2020"): + stac_item = { + "type": "Feature", + "stac_version": "1.0.0", + "id": item_id, + "properties": { + "table:columns": [{"name": col[0], "type": col[1]} for col in columns], + "datetime": "2024-10-07T11:21:25.944150Z", + }, + "geometry": None, + "bbox": [-180, -90, 180, 90], + "links": [], + "assets": {}, + } + with open(item_file, "w") as f: + json.dump(stac_item, f) + + +def create_stac_collection(collection_file, item_file): + stac_collection = { + "type": "Collection", + "stac_version": "1.0.0", + "id": "space2stats-collection", + "description": "Test collection for Space2Stats.", + "license": "CC-BY-4.0", + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [["2020-01-01T00:00:00Z", None]]}, + }, + "links": [{"rel": "item", "href": str(item_file), "type": "application/json"}], + } + with open(collection_file, "w") as f: + json.dump(stac_collection, f) + + +def create_stac_catalog(catalog_file, collection_file): + stac_catalog = { + "type": "Catalog", + "stac_version": "1.0.0", + "id": "space2stats-catalog", + "description": "Test catalog for Space2Stats.", + "license": "CC-BY-4.0", + "links": [ + {"rel": "child", "href": str(collection_file), "type": "application/json"} + ], + } + with open(catalog_file, "w") as f: + json.dump(stac_catalog, f) -def test_load_command(tmpdir, database): - connection_string = f"postgresql://{database.user}:{database.password}@{database.host}:{database.port}/{database.dbname}" +def test_load_command(tmpdir, clean_database): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" parquet_file = tmpdir.join("local.parquet") - stac_metadata_file = tmpdir.join("stac_metadata.json") + catalog_file = tmpdir.join("catalog.json") + collection_file = tmpdir.join("collection.json") + item_file = tmpdir.join("space2stats_population_2020.json") create_mock_parquet_file( parquet_file, [("hex_id", pa.string()), ("mock_column", pa.float64())] ) - with open(stac_metadata_file, "w") as f: - f.write(""" - { - "type": "Feature", - "properties": { - "table:columns": [ - {"name": "hex_id", "type": "string"}, - {"name": "mock_column", "type": "float64"} - ] - } - } - """) + create_stac_item(item_file, [("hex_id", "string"), ("mock_column", "float64")]) + + create_stac_collection(collection_file, item_file) + create_stac_catalog(catalog_file, collection_file) result = runner.invoke( app, [ - "load", connection_string, - str(stac_metadata_file), - "--parquet-file", + str(item_file), str(parquet_file), ], ) @@ -70,80 +93,30 @@ def test_load_command(tmpdir, database): assert "Loading data into PostgreSQL" in result.stdout -def test_load_command_column_mismatch(tmpdir, database): - connection_string = f"postgresql://{database.user}:{database.password}@{database.host}:{database.port}/{database.dbname}" +def test_load_command_column_mismatch(tmpdir, clean_database): + connection_string = f"postgresql://{clean_database.user}:{clean_database.password}@{clean_database.host}:{clean_database.port}/{clean_database.dbname}" parquet_file = tmpdir.join("local.parquet") - stac_metadata_file = tmpdir.join("stac_metadata.json") - - create_mock_parquet_file(parquet_file, [("different_column", pa.float64())]) - - with open(stac_metadata_file, "w") as f: - f.write(""" - { - "type": "Feature", - "properties": { - "table:columns": [ - {"name": "mock_column", "type": "float64"} - ] - } - } - """) - - result = runner.invoke( - app, - [ - "load", - connection_string, - str(stac_metadata_file), - "--parquet-file", - str(parquet_file), - ], - ) - print(result.output) - - assert result.exit_code != 0 - assert "Column mismatch" in result.stdout - - -def test_download_and_load_command(tmpdir, database, s3_mock): - s3_path = "s3://mybucket/myfile.parquet" - parquet_file = tmpdir.join("local.parquet") - stac_metadata_file = tmpdir.join("stac_metadata.json") - connection_string = f"postgresql://{database.user}:{database.password}@{database.host}:{database.port}/{database.dbname}" + catalog_file = tmpdir.join("catalog.json") + collection_file = tmpdir.join("collection.json") + item_file = tmpdir.join("space2stats_population_2020.json") create_mock_parquet_file( - parquet_file, [("hex_id", pa.string()), ("mock_column", pa.float64())] + parquet_file, [("hex_id", pa.string()), ("different_column", pa.float64())] ) + create_stac_item(item_file, [("hex_id", "string"), ("mock_column", "float64")]) - with open(parquet_file, "rb") as f: - s3_mock.put_object(Bucket="mybucket", Key="myfile.parquet", Body=f.read()) - - with open(stac_metadata_file, "w") as f: - f.write(""" - { - "type": "Feature", - "properties": { - "table:columns": [ - {"name": "hex_id", "type": "string"}, - {"name": "mock_column", "type": "float64"} - ] - } - } - """) + create_stac_collection(collection_file, item_file) + create_stac_catalog(catalog_file, collection_file) result = runner.invoke( app, [ - "download-and-load", - s3_path, connection_string, - str(stac_metadata_file), - "--parquet-file", + str(item_file), str(parquet_file), ], ) print(result.output) - assert result.exit_code == 0 - assert "Starting download from S3" in result.stdout - assert "Loading data into PostgreSQL" in result.stdout + assert result.exit_code != 0 + assert "Column mismatch" in result.stdout