Skip to content

Commit

Permalink
refactor: Support updating and dropping multiple materialized columns…
Browse files Browse the repository at this point in the history
… at once (#26707)
  • Loading branch information
tkaemming authored Dec 6, 2024
1 parent dcb77c3 commit c565f10
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 65 deletions.
57 changes: 34 additions & 23 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import re
from collections.abc import Callable, Iterator
from collections.abc import Callable, Iterable, Iterator
from copy import copy
from dataclasses import dataclass, replace
from datetime import timedelta
Expand Down Expand Up @@ -289,26 +289,35 @@ def materialize(
@dataclass
class UpdateColumnCommentTask:
table: str
column: MaterializedColumn
columns: list[MaterializedColumn]

def execute(self, client: Client) -> None:
actions = []
parameters = {}
for i, column in enumerate(self.columns):
parameter_name = f"comment_{i}"
actions.append(f"COMMENT COLUMN {column.name} %({parameter_name})s")
parameters[parameter_name] = column.details.as_column_comment()

client.execute(
f"ALTER TABLE {self.table} COMMENT COLUMN {self.column.name} %(comment)s",
{"comment": self.column.details.as_column_comment()},
f"ALTER TABLE {self.table} " + ", ".join(actions),
parameters,
settings={"alter_sync": 2 if TEST else 1},
)


def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: str, is_disabled: bool) -> None:
def update_column_is_disabled(
table: TablesWithMaterializedColumns, column_names: Iterable[str], is_disabled: bool
) -> None:
cluster = get_cluster()
table_info = tables[table]

column = MaterializedColumn.get(table, column_name)
columns = [MaterializedColumn.get(table, column_name) for column_name in column_names]

cluster.map_all_hosts(
UpdateColumnCommentTask(
table_info.read_table,
replace(column, details=replace(column.details, is_disabled=is_disabled)),
[replace(column, details=replace(column.details, is_disabled=is_disabled)) for column in columns],
).execute
).result()

Expand Down Expand Up @@ -342,25 +351,26 @@ def check_column_exists(client: Client, table: str, column: str) -> bool:
@dataclass
class DropColumnTask:
table: str
column_name: str
column_names: list[str]
try_drop_index: bool

def execute(self, client: Client) -> None:
actions = []

if self.try_drop_index:
index_name = get_minmax_index_name(self.column_name)
drop_index_action = f"DROP INDEX IF EXISTS {index_name}"
if check_index_exists(client, self.table, index_name):
actions.append(drop_index_action)
for column_name in self.column_names:
if self.try_drop_index:
index_name = get_minmax_index_name(column_name)
drop_index_action = f"DROP INDEX IF EXISTS {index_name}"
if check_index_exists(client, self.table, index_name):
actions.append(drop_index_action)
else:
logger.info("Skipping %r, nothing to do...", drop_index_action)

drop_column_action = f"DROP COLUMN IF EXISTS {column_name}"
if check_column_exists(client, self.table, column_name):
actions.append(drop_column_action)
else:
logger.info("Skipping %r, nothing to do...", drop_index_action)

drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}"
if check_column_exists(client, self.table, self.column_name):
actions.append(drop_column_action)
else:
logger.info("Skipping %r, nothing to do...", drop_column_action)
logger.info("Skipping %r, nothing to do...", drop_column_action)

if actions:
client.execute(
Expand All @@ -369,15 +379,16 @@ def execute(self, client: Client) -> None:
)


def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
def drop_column(table: TablesWithMaterializedColumns, column_names: Iterable[str]) -> None:
cluster = get_cluster()
table_info = tables[table]
column_names = [*column_names]

if isinstance(table_info, ShardedTableInfo):
cluster.map_all_hosts(
DropColumnTask(
table_info.dist_table,
column_name,
column_names,
try_drop_index=False, # no indexes on distributed tables
).execute
).result()
Expand All @@ -386,7 +397,7 @@ def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
cluster,
DropColumnTask(
table_info.data_table,
column_name,
column_names,
try_drop_index=True,
).execute,
).result()
Expand Down
80 changes: 46 additions & 34 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,45 +306,57 @@ def _get_column_types(self, column: str):

def test_lifecycle(self):
table: TablesWithMaterializedColumns = "events"
property: PropertyName = "myprop"
property_names = ["foo", "bar"]
source_column: TableColumn = "properties"

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=True)
assert destination_column is not None

# ensure it exists everywhere
key = (property, source_column)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)
# create materialized columns
materialized_columns = {}
for property_name in property_names:
destination_column = materialize(table, property_name, table_column=source_column, create_minmax_index=True)
if destination_column is not None:
materialized_columns[property_name] = destination_column

assert set(property_names) == materialized_columns.keys()

# ensure they exist everywhere
for property_name, destination_column in materialized_columns.items():
key = (property_name, source_column)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property_name, is_disabled=False),
is_nullable=False,
)

# disable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=True)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=True),
is_nullable=False,
)
# disable them and ensure updates apply as needed
update_column_is_disabled(table, materialized_columns.values(), is_disabled=True)
for property_name, destination_column in materialized_columns.items():
key = (property_name, source_column)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property_name, is_disabled=True),
is_nullable=False,
)

# re-enable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=False)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)
# re-enable them and ensure updates apply as needed
update_column_is_disabled(table, materialized_columns.values(), is_disabled=False)
for property_name, destination_column in materialized_columns.items():
key = (property_name, source_column)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property_name, is_disabled=False),
is_nullable=False,
)

# drop it and ensure updates apply as needed
drop_column(table, destination_column)
assert key not in get_materialized_columns(table)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)
# drop them and ensure updates apply as needed
drop_column(table, materialized_columns.values())
for property_name, destination_column in materialized_columns.items():
key = (property_name, source_column)
assert key not in get_materialized_columns(table)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)

def _get_latest_mutation_id(self, table: str) -> str:
[(mutation_id,)] = sync_execute(
Expand Down
18 changes: 10 additions & 8 deletions ee/management/commands/update_materialized_column.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import logging

from typing import Any
from collections.abc import Callable
from collections.abc import Callable, Iterable
from django.core.management.base import BaseCommand, CommandParser

from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns
from ee.clickhouse.materialized_columns.columns import update_column_is_disabled, drop_column

logger = logging.getLogger(__name__)

COLUMN_OPERATIONS: dict[str, Callable[[TablesWithMaterializedColumns, ColumnName], Any]] = {
"enable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=False),
"disable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=True),
COLUMN_OPERATIONS: dict[str, Callable[[TablesWithMaterializedColumns, Iterable[ColumnName]], Any]] = {
"enable": lambda table, column_names: update_column_is_disabled(table, column_names, is_disabled=False),
"disable": lambda table, column_names: update_column_is_disabled(table, column_names, is_disabled=True),
"drop": drop_column,
}

Expand All @@ -20,10 +20,12 @@ class Command(BaseCommand):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument("operation", choices=COLUMN_OPERATIONS.keys())
parser.add_argument("table")
parser.add_argument("column_name")
parser.add_argument("column_names", nargs="+", metavar="column")

def handle(self, operation: str, table: TablesWithMaterializedColumns, column_name: ColumnName, **options):
logger.info("Running %r for %r.%r...", operation, table, column_name)
def handle(
self, operation: str, table: TablesWithMaterializedColumns, column_names: Iterable[ColumnName], **options
):
logger.info("Running %r on %r for %r...", operation, table, column_names)
fn = COLUMN_OPERATIONS[operation]
fn(table, column_name)
fn(table, column_names)
logger.info("Success!")

0 comments on commit c565f10

Please sign in to comment.