Skip to content

Commit

Permalink
Add concurrency to the sqlite->postgres migration
Browse files Browse the repository at this point in the history
This runs the migration of single sqlite database files in separate
processes. As such the migration script interface has changed to use
use the procedure as a generator rather than a simple function.
  • Loading branch information
mmulich authored and karenc committed Jan 28, 2021
1 parent acdcb87 commit cb2ec2a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 46 deletions.
26 changes: 25 additions & 1 deletion wbia/cli/migrate_sqlite_to_postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import re
import subprocess
import sys
from pathlib import Path

Expand Down Expand Up @@ -98,8 +99,31 @@ def main(db_dir, db_uri, force, verbose):
db_infos[1].engine.execute(f'DROP TABLE {table_name} CASCADE')

# Migrate
problems = {}
with click.progressbar(length=100000, show_eta=True) as bar:
copy_sqlite_to_postgres(Path(db_dir), db_uri, progress_update=bar.update)
for path, completed_future, db_size, total_size in copy_sqlite_to_postgres(
Path(db_dir), db_uri
):
try:
completed_future.result()
except Exception as exc:
logger.info(
f'\nfailed while processing {str(path)}\n{completed_future.exception()}'
)
problems[path] = exc
else:
logger.info(f'\nfinished processing {str(path)}')
finally:
bar.update(int(db_size / total_size * bar.length))

# Report problems
for path, exc in problems.items():
logger.info('*' * 60)
logger.info(f'There was a problem migrating {str(path)}')
logger.exception(exc)
if isinstance(exc, subprocess.CalledProcessError):
logger.info('-' * 30)
logger.info(exc.stdout.decode())

# Verify the migration
differences = compare_databases(*db_infos)
Expand Down
103 changes: 59 additions & 44 deletions wbia/dtool/copy_sqlite_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import subprocess
import tempfile
import typing
from concurrent.futures import as_completed, Future, ProcessPoolExecutor
from functools import wraps
from pathlib import Path

import numpy as np
Expand Down Expand Up @@ -424,7 +426,7 @@ def before_pgloader(engine, schema):
"""


def run_pgloader(sqlite_uri: str, postgres_uri: str) -> typing.NoReturn:
def run_pgloader(sqlite_uri: str, postgres_uri: str) -> subprocess.CompletedProcess:
"""Configure and run ``pgloader``.
If there is a problem this will raise a ``CalledProcessError``
from ``Process.check_returncode``.
Expand All @@ -448,8 +450,8 @@ def run_pgloader(sqlite_uri: str, postgres_uri: str) -> typing.NoReturn:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
logger.debug(proc.stdout.decode())
proc.check_returncode()
proc.check_returncode() # raises subprocess.CalledProcessError
return proc


def after_pgloader(sqlite_engine, pg_engine, schema):
Expand Down Expand Up @@ -494,54 +496,67 @@ def drop_schema(engine, schema_name):
connection.execute(f'DROP SCHEMA {schema_name} CASCADE')


def _use_copy_of_sqlite_database(f):
"""Makes a copy of the sqlite database given as the first argument in URI form"""

@wraps(f)
def wrapper(*args):
uri = args[0]
db = Path(uri.split(':')[-1]).resolve()
with tempfile.TemporaryDirectory() as tempdir:
temp_db = Path(tempdir) / db.name
shutil.copy(db, temp_db)
new_uri = f'sqlite:///{str(temp_db)}'
return f(new_uri, *args[1:])

return wrapper


@_use_copy_of_sqlite_database
def migrate(sqlite_uri: str, postgres_uri: str):
logger.info(f'\nworking on {sqlite_uri} ...')
schema_name = get_schema_name_from_uri(sqlite_uri)
sl_info = SqliteDatabaseInfo(sqlite_uri)
pg_info = PostgresDatabaseInfo(postgres_uri)
sl_engine = create_engine(sqlite_uri)
pg_engine = create_engine(postgres_uri)

if not compare_databases(sl_info, pg_info, exact=False):
pass
logger.info(f'{sl_info} already migrated to {pg_info}')

if schema_name in pg_info.get_schema():
logger.warning(f'Dropping schema "{schema_name}"')
drop_schema(pg_engine, schema_name)

# Add sqlite built-in rowid column to tables
add_rowids(sl_engine)
before_pgloader(pg_engine, schema_name)
run_pgloader(sqlite_uri, postgres_uri)
after_pgloader(sl_engine, pg_engine, schema_name)


def copy_sqlite_to_postgres(
db_dir: Path,
postgres_uri: str,
progress_update: typing.Optional[typing.Callable[[int], None]] = None,
) -> None:
num_procs: int = 6,
) -> typing.Generator[typing.Tuple[Path, Future], None, None]:
"""Copies all the sqlite databases into a single postgres database
Args:
db_dir: the colloquial dbdir (i.e. directory containing '_ibsdb', 'smart_patrol', etc.)
postgres_uri: a postgres connection uri without the database name
num_procs: number of concurrent processes to use
"""
# Done within a temporary directory for writing pgloader configuration files
pg_info = PostgresDatabaseInfo(postgres_uri)
pg_schema = pg_info.get_schema()
with tempfile.TemporaryDirectory() as tempdir:
db_paths = get_sqlite_db_paths(db_dir)
total_size = sum(a[1] for a in db_paths)
for sqlite_db_path, db_size in db_paths:
try:
temp_db_path = Path(tempdir) / sqlite_db_path.name
shutil.copy(sqlite_db_path, temp_db_path)

logger.debug('\n' + '*' * 60) # b/c pgloader debug output can be lengthy
logger.info(f'\nworking on {sqlite_db_path} as {temp_db_path} ...')

sqlite_uri = f'sqlite:///{temp_db_path}'
sl_info = SqliteDatabaseInfo(sqlite_uri)
if not compare_databases(sl_info, pg_info, exact=False):
logger.info(f'{sl_info} already migrated to {pg_info}')
if progress_update:
progress_update(int(db_size / total_size * 100000))
continue

sqlite_engine = create_engine(sqlite_uri)
schema_name = get_schema_name_from_uri(sqlite_uri)
engine = create_engine(postgres_uri)
if schema_name in pg_schema:
logger.warning(f'Dropping schema "{schema_name}"')
drop_schema(engine, schema_name)
# Add sqlite built-in rowid column to tables
add_rowids(sqlite_engine)
before_pgloader(engine, schema_name)
run_pgloader(sqlite_uri, postgres_uri)
after_pgloader(sqlite_engine, engine, schema_name)
if progress_update:
progress_update(int(db_size / total_size * 100000))
except KeyboardInterrupt:
raise
except Exception:
logger.exception(f'Exception when migrating {sqlite_db_path.name}')
executor = ProcessPoolExecutor(max_workers=num_procs)
sqlite_dbs = dict(get_sqlite_db_paths(db_dir))
total_size = sum(sqlite_dbs.values())
migration_futures_to_paths = {
executor.submit(migrate, f'sqlite://{str(p)}', postgres_uri): p
for p in sqlite_dbs
}
for future in as_completed(migration_futures_to_paths):
path = migration_futures_to_paths[future]
db_size = sqlite_dbs[path]
yield (path, future, db_size, total_size)
3 changes: 2 additions & 1 deletion wbia/init/sysres.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,8 @@ def ensure_db_from_url(zipped_db_url):
uri = get_wbia_db_uri(dbdir)
if uri:
logger.info(f"Copying '{dbdir}' databases to the database at: {uri}")
copy_sqlite_to_postgres(Path(dbdir), uri)
for _, future, _, _ in copy_sqlite_to_postgres(Path(dbdir), uri):
future.result() # will raise if there is a problem

logger.info('have %s=%r' % (zipped_db_url, dbdir))
return dbdir
Expand Down

0 comments on commit cb2ec2a

Please sign in to comment.