Skip to content

Commit

Permalink
added dry run mode for GIS
Browse files Browse the repository at this point in the history
  • Loading branch information
AKST committed Jan 13, 2025
1 parent 8c67e8d commit 9a30a0d
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 13 deletions.
3 changes: 3 additions & 0 deletions lib/pipeline/gis/feature_server_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from dataclasses import dataclass
from logging import getLogger
from pprint import pformat
Expand Down Expand Up @@ -155,6 +156,8 @@ async def get_json(self: Self,
self._logger.error(response)
raise GisNetworkError(response.status, response)
return await response.json()
except asyncio.CancelledError:
raise
except:
self._logger.error(f'failed on {url}')
raise
Expand Down
8 changes: 7 additions & 1 deletion lib/pipeline/gis/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any, Dict, List, Self, Set, Tuple, Optional

from lib.service.database import DatabaseService, PgClientException, log_exception_info_df
from lib.utility.df import prepare_postgis_insert, FieldFormat
from lib.utility.df import prepare_postgis_insert, FieldFormat, fmt_head

from .config import (
GisProjection,
Expand All @@ -25,6 +25,7 @@
class GisIngestionConfig:
api_workers: int
api_worker_backpressure: int
dry_run: bool
db_workers: int
chunk_size: Optional[int]

Expand Down Expand Up @@ -131,6 +132,11 @@ async def _save(self: Self, t_desc: IngestionTaskDescriptor.Save):
pass

df_copy, query = prepare_query(db_relation, proj, df)
if self.config.dry_run:
self._logger.info(fmt_head(df))
self.stop()
raise asyncio.CancelledError('dryrun')

async with self._db.async_connect() as conn:
async with conn.cursor() as cur:
slice, rows = [], df_copy.to_records(index=False).tolist()
Expand Down
5 changes: 3 additions & 2 deletions lib/service/database/util/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
OperationalError,
)

from lib.utility.df import fmt_head

def is_verbose_error(error: Error) -> bool:
match error:
case CancelledError():
Expand Down Expand Up @@ -40,8 +42,7 @@ def log_exception_info(logger: Logger, error: Error):

def log_exception_info_df(df: pd.DataFrame, logger: Logger, error: Error):
if is_verbose_error(error):
with pd.option_context('display.max_columns', None):
logger.error(str(df.head()))
logger.error(fmt_head(df))
logger.error(df.info())
logger.error(f"Columns: {df.columns}")
logger.error(f"Rows: {len(df)}")
Expand Down
32 changes: 22 additions & 10 deletions lib/tasks/ingest_gis.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
class IngestGisRunConfig:
db_workers: int
skip_save: bool
dry_run: bool
gis_params: List[DateRangeParam]
exp_backoff_attempts: int

Expand Down Expand Up @@ -102,11 +103,19 @@ async def run(
FeatureExpBackoff(cfg.exp_backoff_attempts),
clock, session, http_file_cache)
telemetry = GisPipelineTelemetry.create(clock)

if conf.dry_run:
api_workers, db_workers = 1, 1
else:
api_workers = http_limits_of(HOST_SEMAPHORE_CONFIG)
db_workers = conf.db_workers

ingestion = GisIngestion.create(
GisIngestionConfig(
api_workers=http_limits_of(HOST_SEMAPHORE_CONFIG),
api_worker_backpressure=conf.db_workers * 4,
db_workers=conf.db_workers,
api_workers=api_workers,
api_worker_backpressure=db_workers * 4,
dry_run=conf.dry_run,
db_workers=db_workers,
chunk_size=None),
feature_client,
db,
Expand All @@ -128,9 +137,10 @@ async def run_in_console(
db = DatabaseService.create(db_config, config.db_workers)
clock = ClockService()
controller = SchemaController(io, db, SchemaDiscovery.create(io))
await controller.command(Command.Drop(ns='nsw_spatial'))
await controller.command(Command.Create(ns='nsw_spatial'))
await run(io, db, clock, cfg)
if not config.dry_run:
await controller.command(Command.Drop(ns='nsw_spatial'))
await controller.command(Command.Create(ns='nsw_spatial'))
await run(io, db, clock, config)

if __name__ == '__main__':
import asyncio
Expand All @@ -142,6 +152,7 @@ async def run_in_console(

parser = argparse.ArgumentParser(description="db schema tool")
parser.add_argument("--debug", action='store_true', default=False)
parser.add_argument("--dry-run", action='store_true', required=False)
parser.add_argument("--gis-range", type=str)
parser.add_argument("--instance", type=int, required=True)
parser.add_argument("--skip-save", action='store_true', required=False)
Expand Down Expand Up @@ -170,10 +181,11 @@ async def run_in_console(
case None:
params = []
cfg = IngestGisRunConfig(
args.db_connections,
args.skip_save,
params,
args.exp_backoff_attempts,
db_workers=args.db_connections,
skip_save=args.skip_save,
dry_run=args.dry_run,
gis_params=params,
exp_backoff_attempts=args.exp_backoff_attempts,
)
asyncio.run(
run_in_console(
Expand Down
1 change: 1 addition & 0 deletions lib/utility/df/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .fmt import fmt_head
from .prepare_for_sql import FieldFormat, prepare_postgis_insert
5 changes: 5 additions & 0 deletions lib/utility/df/fmt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import pandas as pd

def fmt_head(df: pd.DataFrame) -> str:
with pd.option_context('display.max_columns', None):
return str(df.head())

0 comments on commit 9a30a0d

Please sign in to comment.