Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44620: Add schema migration script for APDB schema 2.0.0 #4

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/lsst.dax.apdb_migrate/migrations/sql/schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ No additional parameters or packages are needed for this script.
An example of migration::

$ apdb-migrate-sql upgrade -s SCHEMA_NAME $APDB_URL schema_1.1.0


Upgrade from 1.1.0 to 2.0.0
===========================

Migration script: `schema_2.0.0.py <https://github.com/lsst-dm/dax_apdb_migrate/blob/main/migrations/sql/schema/schema_2.0.0.py>`_

This migration drops ``x`` and ``y`` columns from ``DiaForcedSource`` table and adds ``ra`` and ``dec`` columns.
New columns are populated from the same columns of the matching ``DiaObject`` records.
No additional parameters or packages are needed for this script.

An example of migration::

$ apdb-migrate-sql upgrade -s SCHEMA_NAME $APDB_URL schema_2.0.0
140 changes: 140 additions & 0 deletions migrations/sql/schema/schema_2.0.0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Migration script for schema 2.0.0.

Revision ID: schema_2.0.0
Revises: schema_1.1.0
Create Date: 2024-06-05 11:26:16.103693
"""

import logging

import alembic
import sqlalchemy
from lsst.dax.apdb_migrate.sql.context import Context

# revision identifiers, used by Alembic.
revision = "schema_2.0.0"
down_revision = "schema_1.1.0"
branch_labels = None
depends_on = None

_LOG = logging.getLogger(__name__)


def upgrade() -> None:
"""Upgrade 'schema' tree from 1.1.0 to 2.0.0 (ticket DM-44620).

Summary of changes:

- Drop x/y columns from DiaForcedSource table
- Add ra/dec columns to DiaForcedSource table
- Populate new ra/dec columns from their matching DiaObject values.
"""
ctx = Context()

# Alter table schema.
_LOG.info("Dropping and adding columns to DiaForcedSource table.")
fsources = ctx.get_table("DiaForcedSource")
with ctx.batch_alter_table("DiaForcedSource", copy_from=fsources) as batch_op:
batch_op.drop_column("x")
batch_op.drop_column("y")
# ra/dec are initially nullable, will make them not-null after filling.
batch_op.add_column(sqlalchemy.Column("ra", sqlalchemy.types.Double, nullable=True))
batch_op.add_column(sqlalchemy.Column("dec", sqlalchemy.types.Double, nullable=True))

# To populate ra/dec we need to find matching DiaObject and use its ra/dec.
# Matching DiaObject is the one with the latest validityStart which is
# still earlier than source processing time. Complication here is that for
# some sources their processing time happens few milliseconds earlier than
# earliest validityStart of matching DiaObject. In that case we take
# matching DiaObject with earliest validityStart.

# The query to do that is rather complicated, split it into CTEs.
objects = ctx.get_table("DiaObject")
fsources = ctx.get_table("DiaForcedSource", reload=True)
# In offline mode reflected schema is wrong, need a small fix.
if alembic.context.is_offline_mode():
fsources.append_column(sqlalchemy.Column("ra", sqlalchemy.types.Double, nullable=True))
fsources.append_column(sqlalchemy.Column("dec", sqlalchemy.types.Double, nullable=True))

# Scalar subquery for matching object
o1 = objects.alias("o1")
f1 = fsources.alias("f1")
# Sub-query to calculate latest validityStart earlier that source.
max_validity = (
sqlalchemy.select(sqlalchemy.func.max(o1.columns.validityStart))
.where(
sqlalchemy.and_(
o1.columns.diaObjectId == f1.columns.diaObjectId,
o1.columns.validityStart <= f1.columns.time_processed,
)
)
.scalar_subquery()
.correlate(f1)
)
# Sub-query to calculate earliest validityStart.
min_validity = (
sqlalchemy.select(sqlalchemy.func.min(o1.columns.validityStart))
.where(o1.columns.diaObjectId == f1.columns.diaObjectId)
.scalar_subquery()
.correlate(f1)
)
f2o = sqlalchemy.select(
f1.columns.diaForcedSourceId,
f1.columns.diaObjectId,
sqlalchemy.func.coalesce(max_validity, min_validity).label("validityStart"),
).cte("f2o")

o2 = objects.alias("o1")
f2radec = (
sqlalchemy.select(f2o.columns.diaForcedSourceId, o2.columns.ra, o2.columns.dec)
.select_from(
o2.join(
f2o,
sqlalchemy.and_(
o2.columns.diaObjectId == f2o.columns.diaObjectId,
o2.columns.validityStart == f2o.columns.validityStart,
),
)
)
.cte("f2radec")
)

# Now we are ready for update
update = (
fsources.update()
.values(ra=f2radec.columns.ra, dec=f2radec.columns.dec)
.where(f2radec.columns.diaForcedSourceId == fsources.columns.diaForcedSourceId)
)
_LOG.info("Filling ra/dec columns in DiaForcedSource table.")
_LOG.debug("update: %s", update)
result = ctx.bind.execute(update)
if not alembic.context.is_offline_mode():
_LOG.info("Updated %s rows in DiaForcedSource table.", result.rowcount)

# Chech that all ra/dec are filled.
query = sqlalchemy.select(fsources.columns.diaForcedSourceId).where(
fsources.columns.ra == None # noqa: E711
)
result = ctx.bind.execute(query)
ids = list(result.scalars())
if ids:
_LOG.error("Some ra/dec are not filled, count: %s", len(ids))
_LOG.error("ids: %s", ids[:10])
raise RuntimeError("cannot continue")

# Make ra/dec columns not null.
_LOG.info("Making ra/dec columns non-nullable.")
with ctx.batch_alter_table("DiaForcedSource", copy_from=fsources) as batch_op:
batch_op.alter_column("ra", nullable=False)
batch_op.alter_column("dec", nullable=False)

# Update metadata version.
tree, _, version = revision.partition("_")
ctx.apdb_meta.update_tree_version(tree, version)


def downgrade() -> None:
"""Downgrade is not implemented as it is impossible to recover x/y
values after upgrade.
"""
raise NotImplementedError()
12 changes: 10 additions & 2 deletions python/lsst/dax/apdb_migrate/sql/apdb_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ class ApdbMetadata:
def __init__(self, connection: sqlalchemy.engine.Connection, schema: str | None = None):
self._connection = connection
metadata = sqlalchemy.schema.MetaData(schema=schema)
self._table = sqlalchemy.schema.Table("metadata", metadata, autoload_with=connection, schema=schema)
self._table = sqlalchemy.schema.Table(
"metadata",
metadata,
sqlalchemy.schema.Column("name", sqlalchemy.Text, primary_key=True),
sqlalchemy.schema.Column("value", sqlalchemy.Text, nullable=False),
schema=schema,
)

def get(self, key: str) -> str | None:
"""Retrieve values of the specified key.
Expand Down Expand Up @@ -91,7 +97,9 @@ def update(self, key: str, value: str) -> int:
"""
# update version
sql = self._table.update().where(self._table.columns.name == key).values(value=value)
return self._connection.execute(sql).rowcount
result = self._connection.execute(sql)
# result may be None in offline mode, assume that we updated something
return 1 if result is None else result.rowcount

def update_tree_version(self, tree: str, version: str, *, insert: bool = False) -> None:
"""Update version for the specified tree.
Expand Down
30 changes: 27 additions & 3 deletions python/lsst/dax/apdb_migrate/sql/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from collections.abc import Iterable
from typing import Any

import click
Expand All @@ -29,12 +30,35 @@

CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}

_log_format = "%(asctime)s %(levelname)s %(name)s - %(message)s"


def _init_logging(args: Iterable[str]) -> None:
"""Configure Python logging based on command line options."""
global_level = logging.INFO
# Silence alembic by default
logger_levels: dict[str, int] = {"alembic": logging.WARNING}
for level_str in args:
for spec in level_str.split(","):
logger_name, sep, level_name = spec.rpartition("=")
level = logging.getLevelNamesMapping().get(level_name.upper())
if level is None:
raise ValueError(f"Unknown logging level {level_name!r} in {level_str!r}")
if logger_name:
logger_levels[logger_name] = level
else:
global_level = level

logging.basicConfig(level=global_level, format=_log_format)
for logger_name, level in logger_levels.items():
logging.getLogger(logger_name).setLevel(level)


@click.group(context_settings=CONTEXT_SETTINGS)
def cli() -> None:
@options.log_level
def cli(log_level: Iterable[str]) -> None:
"""APDB schema migration tools for SQL backend."""
logging.basicConfig(level=logging.INFO)
logging.getLogger("alembic").setLevel(logging.WARNING)
_init_logging(log_level)


@cli.command(short_help="Create new revision tree.")
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/dax/apdb_migrate/sql/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,10 @@
metavar="KEY=VALUE",
multiple=True,
)

log_level = click.option(
"--log-level",
help="Global or per-logger logging level, comma-separated and can be specified multiple times.",
metavar="LEVEL|LOGGER=LEVEL[,...]",
multiple=True,
)
38 changes: 34 additions & 4 deletions python/lsst/dax/apdb_migrate/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
__all__ = ("Context",)

import contextlib
from collections.abc import Iterator
from typing import Any

import alembic
import alembic.operations
Expand Down Expand Up @@ -59,7 +61,7 @@ def __init__(self) -> None:
# APDB metadata interface.
self.apdb_meta = ApdbMetadata(self.bind, self.schema)

def get_table(self, table_name: str) -> sqlalchemy.Table:
def get_table(self, table_name: str, reload: bool = False) -> sqlalchemy.Table:
"""Return SQLAlchemy table object for the current database.

Parameters
Expand All @@ -72,7 +74,13 @@ def get_table(self, table_name: str) -> sqlalchemy.Table:
table : `sqlalchemy.Table`
Table object.
"""
return sqlalchemy.schema.Table(table_name, self.metadata, autoload_with=self.bind, schema=self.schema)
if reload:
for table in self.metadata.tables.values():
if table.name == table_name:
self.metadata.remove(table)
break
with self._reflection_bind() as bind:
return sqlalchemy.schema.Table(table_name, self.metadata, autoload_with=bind, schema=self.schema)

def get_mig_option(self, option: str) -> str | None:
"""Retrieve option that was passed on the command line.
Expand All @@ -95,11 +103,33 @@ def get_mig_option(self, option: str) -> str | None:
return self.mig_context.config.get_section_option("dax_apdb_migrate_options", option)

def batch_alter_table(
self, table: str
self, table: str, **kwargs: Any
) -> contextlib.AbstractContextManager[alembic.operations.BatchOperations]:
"""Context manager for batch operations.

This is a shortcut for alembic method, is main purpose is not to forget
to pass schema name.
"""
return alembic.op.batch_alter_table(table, schema=self.schema)
return alembic.op.batch_alter_table(table, schema=self.schema, **kwargs)

@contextlib.contextmanager
def _reflection_bind(self) -> Iterator[sqlalchemy.engine.Connection]:
"""Return database connection to be used for reflection. In online mode
this returns connection instantiated by Alembic, in offline mode it
creates new engine using configured URL.

Yields
------
connection : `sqlalchemy.engine.Connection`
Actual connection to database to use for reflection.
"""
if alembic.context.is_offline_mode():
assert self.mig_context.config is not None
url = self.mig_context.config.get_main_option("sqlalchemy.url")
if url is None:
raise ValueError("sqlalchemy.url is missing from config")
engine = sqlalchemy.create_engine(url)
with engine.connect() as connection:
yield connection
else:
yield self.bind
Loading