diff --git a/wbia/cli/migrate_sqlite_to_postgres.py b/wbia/cli/migrate_sqlite_to_postgres.py index 1774146bd8..9e43f80e2c 100644 --- a/wbia/cli/migrate_sqlite_to_postgres.py +++ b/wbia/cli/migrate_sqlite_to_postgres.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging import re +import subprocess import sys from pathlib import Path @@ -98,8 +99,31 @@ def main(db_dir, db_uri, force, verbose): db_infos[1].engine.execute(f'DROP TABLE {table_name} CASCADE') # Migrate + problems = {} with click.progressbar(length=100000, show_eta=True) as bar: - copy_sqlite_to_postgres(Path(db_dir), db_uri, progress_update=bar.update) + for path, completed_future, db_size, total_size in copy_sqlite_to_postgres( + Path(db_dir), db_uri + ): + try: + completed_future.result() + except Exception as exc: + logger.info( + f'\nfailed while processing {str(path)}\n{completed_future.exception()}' + ) + problems[path] = exc + else: + logger.info(f'\nfinished processing {str(path)}') + finally: + bar.update(int(db_size / total_size * bar.length)) + + # Report problems + for path, exc in problems.items(): + logger.info('*' * 60) + logger.info(f'There was a problem migrating {str(path)}') + logger.exception(exc) + if isinstance(exc, subprocess.CalledProcessError): + logger.info('-' * 30) + logger.info(exc.stdout.decode()) # Verify the migration differences = compare_databases(*db_infos) diff --git a/wbia/dtool/copy_sqlite_to_postgres.py b/wbia/dtool/copy_sqlite_to_postgres.py index 516550341e..34bfbe37cb 100644 --- a/wbia/dtool/copy_sqlite_to_postgres.py +++ b/wbia/dtool/copy_sqlite_to_postgres.py @@ -9,6 +9,8 @@ import subprocess import tempfile import typing +from concurrent.futures import as_completed, Future, ProcessPoolExecutor +from functools import wraps from pathlib import Path import numpy as np @@ -424,7 +426,7 @@ def before_pgloader(engine, schema): """ -def run_pgloader(sqlite_uri: str, postgres_uri: str) -> typing.NoReturn: +def run_pgloader(sqlite_uri: str, postgres_uri: str) -> subprocess.CompletedProcess: """Configure and run ``pgloader``. If there is a problem this will raise a ``CalledProcessError`` from ``Process.check_returncode``. @@ -448,8 +450,8 @@ def run_pgloader(sqlite_uri: str, postgres_uri: str) -> typing.NoReturn: stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) - logger.debug(proc.stdout.decode()) - proc.check_returncode() + proc.check_returncode() # raises subprocess.CalledProcessError + return proc def after_pgloader(sqlite_engine, pg_engine, schema): @@ -494,54 +496,67 @@ def drop_schema(engine, schema_name): connection.execute(f'DROP SCHEMA {schema_name} CASCADE') +def _use_copy_of_sqlite_database(f): + """Makes a copy of the sqlite database given as the first argument in URI form""" + + @wraps(f) + def wrapper(*args): + uri = args[0] + db = Path(uri.split(':')[-1]).resolve() + with tempfile.TemporaryDirectory() as tempdir: + temp_db = Path(tempdir) / db.name + shutil.copy(db, temp_db) + new_uri = f'sqlite:///{str(temp_db)}' + return f(new_uri, *args[1:]) + + return wrapper + + +@_use_copy_of_sqlite_database +def migrate(sqlite_uri: str, postgres_uri: str): + logger.info(f'\nworking on {sqlite_uri} ...') + schema_name = get_schema_name_from_uri(sqlite_uri) + sl_info = SqliteDatabaseInfo(sqlite_uri) + pg_info = PostgresDatabaseInfo(postgres_uri) + sl_engine = create_engine(sqlite_uri) + pg_engine = create_engine(postgres_uri) + + if not compare_databases(sl_info, pg_info, exact=False): + pass + logger.info(f'{sl_info} already migrated to {pg_info}') + + if schema_name in pg_info.get_schema(): + logger.warning(f'Dropping schema "{schema_name}"') + drop_schema(pg_engine, schema_name) + + # Add sqlite built-in rowid column to tables + add_rowids(sl_engine) + before_pgloader(pg_engine, schema_name) + run_pgloader(sqlite_uri, postgres_uri) + after_pgloader(sl_engine, pg_engine, schema_name) + + def copy_sqlite_to_postgres( db_dir: Path, postgres_uri: str, - progress_update: typing.Optional[typing.Callable[[int], None]] = None, -) -> None: + num_procs: int = 6, +) -> typing.Generator[typing.Tuple[Path, Future], None, None]: """Copies all the sqlite databases into a single postgres database Args: db_dir: the colloquial dbdir (i.e. directory containing '_ibsdb', 'smart_patrol', etc.) postgres_uri: a postgres connection uri without the database name + num_procs: number of concurrent processes to use """ - # Done within a temporary directory for writing pgloader configuration files - pg_info = PostgresDatabaseInfo(postgres_uri) - pg_schema = pg_info.get_schema() - with tempfile.TemporaryDirectory() as tempdir: - db_paths = get_sqlite_db_paths(db_dir) - total_size = sum(a[1] for a in db_paths) - for sqlite_db_path, db_size in db_paths: - try: - temp_db_path = Path(tempdir) / sqlite_db_path.name - shutil.copy(sqlite_db_path, temp_db_path) - - logger.debug('\n' + '*' * 60) # b/c pgloader debug output can be lengthy - logger.info(f'\nworking on {sqlite_db_path} as {temp_db_path} ...') - - sqlite_uri = f'sqlite:///{temp_db_path}' - sl_info = SqliteDatabaseInfo(sqlite_uri) - if not compare_databases(sl_info, pg_info, exact=False): - logger.info(f'{sl_info} already migrated to {pg_info}') - if progress_update: - progress_update(int(db_size / total_size * 100000)) - continue - - sqlite_engine = create_engine(sqlite_uri) - schema_name = get_schema_name_from_uri(sqlite_uri) - engine = create_engine(postgres_uri) - if schema_name in pg_schema: - logger.warning(f'Dropping schema "{schema_name}"') - drop_schema(engine, schema_name) - # Add sqlite built-in rowid column to tables - add_rowids(sqlite_engine) - before_pgloader(engine, schema_name) - run_pgloader(sqlite_uri, postgres_uri) - after_pgloader(sqlite_engine, engine, schema_name) - if progress_update: - progress_update(int(db_size / total_size * 100000)) - except KeyboardInterrupt: - raise - except Exception: - logger.exception(f'Exception when migrating {sqlite_db_path.name}') + executor = ProcessPoolExecutor(max_workers=num_procs) + sqlite_dbs = dict(get_sqlite_db_paths(db_dir)) + total_size = sum(sqlite_dbs.values()) + migration_futures_to_paths = { + executor.submit(migrate, f'sqlite://{str(p)}', postgres_uri): p + for p in sqlite_dbs + } + for future in as_completed(migration_futures_to_paths): + path = migration_futures_to_paths[future] + db_size = sqlite_dbs[path] + yield (path, future, db_size, total_size) diff --git a/wbia/init/sysres.py b/wbia/init/sysres.py index d6c1900513..c485836ff1 100644 --- a/wbia/init/sysres.py +++ b/wbia/init/sysres.py @@ -909,7 +909,8 @@ def ensure_db_from_url(zipped_db_url): uri = get_wbia_db_uri(dbdir) if uri: logger.info(f"Copying '{dbdir}' databases to the database at: {uri}") - copy_sqlite_to_postgres(Path(dbdir), uri) + for _, future, _, _ in copy_sqlite_to_postgres(Path(dbdir), uri): + future.result() # will raise if there is a problem logger.info('have %s=%r' % (zipped_db_url, dbdir)) return dbdir