Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds helper functions for migrations #31303

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 204 additions & 10 deletions superset/migrations/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect
from sqlalchemy import Column, inspect
from sqlalchemy.dialects.mysql.base import MySQLDialect
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.schema import SchemaItem

from superset.utils import json

GREEN = "\033[32m"
RESET = "\033[0m"
YELLOW = "\033[33m"
RED = "\033[31m"
LRED = "\033[91m"

logger = logging.getLogger(__name__)

DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000))
Expand Down Expand Up @@ -185,15 +194,200 @@
return table_exists


def add_column_if_not_exists(table_name: str, column: sa.Column) -> None:
def drop_fks_for_table(table_name: str) -> None:
"""
Drop all foreign key constraints for a table if it exist and the database
is not sqlite.

:param table_name: The table name to drop foreign key constraints for
"""
connection = op.get_bind()
inspector = Inspector.from_engine(connection)

Check warning on line 205 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L204-L205

Added lines #L204 - L205 were not covered by tests

if isinstance(connection.dialect, SQLiteDialect):
return # sqlite doesn't like constraints

Check warning on line 208 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L207-L208

Added lines #L207 - L208 were not covered by tests

if has_table(table_name):
foreign_keys = inspector.get_foreign_keys(table_name)

Check warning on line 211 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L210-L211

Added lines #L210 - L211 were not covered by tests

for fk in foreign_keys:
luizotavio32 marked this conversation as resolved.
Show resolved Hide resolved
op.drop_constraint(fk["name"], table_name, type_="foreignkey")

Check warning on line 214 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L213-L214

Added lines #L213 - L214 were not covered by tests


logger = logging.getLogger("alembic")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved


def create_table(table_name: str, *columns: SchemaItem) -> None:
"""
Creates a database table with the specified name and columns.

This function checks if a table with the given name already exists in the database.
If the table exists, it logs an informational message and skips the creation process.
Otherwise, it proceeds to create a new table using the provided name and schema columns.

:param table_name: The name of the table to be created.
:param columns: A variable number of arguments representing the schema just like when calling alembic's method create_table()
"""

if has_table(table_name=table_name):
logger.info(f"Table {LRED}{table_name}{RESET} already exists Skipping...")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
return

Check warning on line 234 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L232-L234

Added lines #L232 - L234 were not covered by tests

logger.info(f"Creating table {GREEN}{table_name}{RESET}...")
op.create_table(table_name, *columns)
logger.info(f"Table {GREEN}{table_name}{RESET} created")

Check warning on line 238 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L236-L238

Added lines #L236 - L238 were not covered by tests


def drop_table(table_name: str) -> None:
"""
Adds a column to a table if it does not already exist.
Drops a database table with the specified name.

This function checks if a table with the given name exists in the database.
If the table does not exist, it logs an informational message and skips the dropping process.
If the table exists, it first attempts to drop all foreign key constraints associated with the table
(handled by `drop_fks_for_table`) and then proceeds to drop the table.

:param table_name: Name of the table.
:param column: SQLAlchemy Column object.
:param table_name: The name of the table to be dropped.
"""
if not table_has_column(table_name, column.name):
print(f"Adding column '{column.name}' to table '{table_name}'.\n")
op.add_column(table_name, column)
else:
print(f"Column '{column.name}' already exists in table '{table_name}'.\n")

if not has_table(table_name=table_name):
logger.info(f"Table {GREEN}{table_name}{RESET} doesn't exist Skipping...")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
return

Check warning on line 255 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L253-L255

Added lines #L253 - L255 were not covered by tests

logger.info(f"Dropping table {GREEN}{table_name}{RESET}...")
drop_fks_for_table(table_name)
op.drop_table(table_name=table_name)
logger.info(f"Table {GREEN}{table_name}{RESET} dropped")

Check warning on line 260 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L257-L260

Added lines #L257 - L260 were not covered by tests


def batch_operation(
callable: Callable[[int, int], None], count: int, batch_size: int
) -> None:
for offset in range(0, count, batch_size):
percentage = (offset / count) * 100 if count else 0
logger.info(f"Progress: {offset:,}/{count:,} ({percentage:.2f}%)")
callable(offset, min(offset + batch_size, count))

Check warning on line 269 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L266-L269

Added lines #L266 - L269 were not covered by tests

logger.info(f"Progress: {count:,}/{count:,} (100%)")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
logger.info(

Check warning on line 272 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L271-L272

Added lines #L271 - L272 were not covered by tests
f"End: {callable.__name__} batch operation {GREEN}succesfully{RESET} executed"
)


def add_columns(table_name: str, columns: list[Column]) -> None:
"""
Adds new columns to an existing database table.

For each column on columns list variable, it checks if the column already exists in the table. If a column already exists, it logs an informational
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
message and skips the addition of that column. Otherwise, it proceeds to add the new column to the table.

The operation is performed within a batch operation context which allows a more performant approach
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved

:param table_name: The name of the table to which the columns will be added.
:param columns: A list of SQLAlchemy Column objects that define the name, type, and other attributes of the columns to be added.
"""

cols_to_add = []
for col in columns:
if table_has_column(table_name=table_name, column_name=col.name):
logger.info(

Check warning on line 293 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L290-L293

Added lines #L290 - L293 were not covered by tests
f"Column {LRED}{col.name}{RESET} already present on table {LRED}{table_name}{RESET} Skipping..."
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
)
else:
cols_to_add.append(col)

Check warning on line 297 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L297

Added line #L297 was not covered by tests

with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_add:
logger.info(

Check warning on line 301 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L299-L301

Added lines #L299 - L301 were not covered by tests
f"Adding column {GREEN}{col.name}{RESET} to table {GREEN}{table_name}{RESET}"
)
batch_op.add_column(col)

Check warning on line 304 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L304

Added line #L304 was not covered by tests


def drop_columns(table_name: str, columns: list[str]) -> None:
"""
Drops specified columns from an existing database table.

For each column, it first checks if the column exists in the table. If a column does not exist, it logs an informational
message and skips the dropping of that column. Otherwise, it proceeds to remove the column from the table.

The operation is performed within a batch operation context which allows a more performant approach

:param table_name: The name of the table from which the columns will be removed.
:param columns: A list of column names to be dropped.
"""

cols_to_drop = []
for col in columns:
if not table_has_column(table_name=table_name, column_name=col):
logger.info(

Check warning on line 323 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L320-L323

Added lines #L320 - L323 were not covered by tests
f"Column {LRED}{col}{RESET} is not present on table {LRED}{table_name}{RESET} Skipping..."
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
)
else:
cols_to_drop.append(col)

Check warning on line 327 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L327

Added line #L327 was not covered by tests

with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_drop:
logger.info(

Check warning on line 331 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L329-L331

Added lines #L329 - L331 were not covered by tests
f"Dropping column {GREEN}{col}{RESET} from table {GREEN}{table_name}{RESET}"
)
batch_op.drop_column(col)

Check warning on line 334 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L334

Added line #L334 was not covered by tests


def create_index(table_name: str, index_name: str, columns: list[Column]) -> None:
"""
Creates an index on specified columns of an existing database table.

This function checks if an index with the given name already exists on the specified table.
If so, it logs an informational message and skips the index creation process.
Otherwise, it proceeds to create a new index with the specified name on the given columns of the table.

The operation is performed within a batch operation context which allows a more performant approach

:param table_name: The name of the table on which the index will be created.
:param index_name: The name of the index to be created.
:param columns: A list of column names (as strings) that the index will cover. This list should contain
the names of existing columns in the table.
"""

if table_has_index(table=table_name, index=index_name):
logger.info(

Check warning on line 354 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L353-L354

Added lines #L353 - L354 were not covered by tests
f"Table {LRED}{table_name}{RESET} already has index {LRED}{index_name}{RESET} Skipping..."
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
)
return

Check warning on line 357 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L357

Added line #L357 was not covered by tests

logger.info(

Check warning on line 359 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L359

Added line #L359 was not covered by tests
f"Creating index {GREEN}{index_name}{RESET} on table {GREEN}{table_name}{RESET}"
)

with op.batch_alter_table(table_name) as batch_op:
batch_op.create_index(index_name=index_name, columns=columns)

Check warning on line 364 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L363-L364

Added lines #L363 - L364 were not covered by tests
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved


def drop_index(table_name: str, index_name: str) -> None:
"""
Drops an index from an existing database table.

Before attempting to drop the index, this function checks if an index with the given name
exists on the specified table. If not, it logs an informational message and skips the index
dropping process. If the index exists, it proceeds with the removal operation.

The operation is performed within a batch operation context which allows a more performant approach


:param table_name: The name of the table from which the index will be dropped.
:param index_name: The name of the index to be dropped.
"""

if not table_has_index(table=table_name, index=index_name):
logger.info(

Check warning on line 383 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L382-L383

Added lines #L382 - L383 were not covered by tests
f"Table {LRED}{table_name}{RESET} doesn't have index {LRED}{index_name}{RESET} Skipping..."
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
)
return

Check warning on line 386 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L386

Added line #L386 was not covered by tests

logger.info(

Check warning on line 388 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L388

Added line #L388 was not covered by tests
f"Dropping index {GREEN}{index_name}{RESET} from table {GREEN}{table_name}{RESET}"
)

with op.batch_alter_table(table_name) as batch_op:
batch_op.drop_index(index_name=index_name)

Check warning on line 393 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L392-L393

Added lines #L392 - L393 were not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "c22cb5c2e546"
down_revision = "678eefb4ab44"


def upgrade():
add_column_if_not_exists(
add_columns(
"user_attribute",
sa.Column("avatar_url", sa.String(length=100), nullable=True),
[sa.Column("avatar_url", sa.String(length=100), nullable=True)],
)


def downgrade():
op.drop_column("user_attribute", "avatar_url")
drop_columns("user_attribute", ["avatar_url"])
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "5f57af97bc3f"
Expand All @@ -36,12 +36,9 @@

def upgrade():
for table in tables:
add_column_if_not_exists(
table,
sa.Column("catalog", sa.String(length=256), nullable=True),
)
add_columns(table, [sa.Column("catalog", sa.String(length=256), nullable=True)])


def downgrade():
for table in reversed(tables):
op.drop_column(table, "catalog")
drop_columns(table, ["catalog"])
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,24 @@
downgrade_catalog_perms,
upgrade_catalog_perms,
)
from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "58d051681a3b"
down_revision = "4a33124c18ad"


def upgrade():
add_column_if_not_exists(
"tables",
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
add_columns(
"tables", [sa.Column("catalog_perm", sa.String(length=1000), nullable=True)]
)
add_column_if_not_exists(
"slices",
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
add_columns(
"slices", [sa.Column("catalog_perm", sa.String(length=1000), nullable=True)]
)
upgrade_catalog_perms(engines={"postgresql"})


def downgrade():
downgrade_catalog_perms(engines={"postgresql"})
op.drop_column("slices", "catalog_perm")
op.drop_column("tables", "catalog_perm")
drop_columns("slices", ["catalog_perm"])
drop_columns("tables", ["catalog_perm"])
Loading