Skip to content

Commit

Permalink
feat: Default --sort-columns to source primary key for BigQuery target (
Browse files Browse the repository at this point in the history
#224)

* feat: BigQuery cluster columns default to frontend primary key

* feat: Default --sort-columns to source primary key for BigQuery target
  • Loading branch information
nj1973 authored Aug 23, 2024
1 parent d481f3b commit fd43124
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 45 deletions.
19 changes: 9 additions & 10 deletions src/goe/goe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,17 +2009,17 @@ def validate_offload_by_subpartition(

def validate_sort_columns(
self,
rdbms_column_names,
messages,
offload_options,
offload_source_table: OffloadSourceTableInterface,
messages: OffloadMessages,
offload_options: "OrchestrationConfig",
backend_cols,
hybrid_metadata,
backend_api=None,
metadata_refresh=False,
):
"""Default sort_columns for storage index benefit if not specified by the user.
sort_columns_csv: The incoming option string which can be a CSV list of column names or the special token
SORT_COLUMNS_NO_CHANGE which identifies the user has not asked for a change.
sort_columns_csv: The incoming option string which can be a CSV list of column names or:
- the special token SORT_COLUMNS_NO_CHANGE which identifies the user has not asked for a change.
- the special token SORT_COLUMNS_NONE which identifies the user has not asked for no sort columns.
sort_columns: A Python list of column names defined by the user.
backend_cols: A standalone parameter because this function may be used on tables that do not yet exist.
"""
Expand Down Expand Up @@ -2057,10 +2057,9 @@ def validate_sort_columns(
self.sort_columns = sort_columns_csv_to_sort_columns(
self.sort_columns_csv,
hybrid_metadata,
rdbms_column_names,
offload_source_table,
backend_cols,
backend_api,
metadata_refresh,
messages,
)
finally:
Expand Down Expand Up @@ -2621,7 +2620,7 @@ def offload_operation_logic(
)

offload_operation.validate_sort_columns(
offload_source_table.get_column_names(),
offload_source_table,
messages,
offload_options,
offload_target_table.get_columns(),
Expand Down Expand Up @@ -3363,7 +3362,7 @@ def get_options(usage=None, operation_name=None):
"--sort-columns",
dest="sort_columns_csv",
default=orchestration_defaults.sort_columns_default(),
help="CSV list of sort/cluster columns to use when storing data in a backend table",
help=f'CSV list of sort/cluster columns to use when storing data in a backend table or "{offload_constants.SORT_COLUMNS_NONE}" to force no sort columns',
)
opt.add_option(
"--offload-distribute-enabled",
Expand Down
4 changes: 4 additions & 0 deletions src/goe/offload/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,10 @@ def column_stats_set_supported(self):
def create_database_supported(self):
return self.is_capability_supported(CAPABILITY_CREATE_DB)

def default_sort_columns_to_primary_key(self) -> bool:
"""Does this backend suit clustering on primary key if there's no user specified alternative."""
return False

def drop_column_supported(self):
"""Note that there is an Impala override for this"""
return self.is_capability_supported(CAPABILITY_DROP_COLUMN)
Expand Down
4 changes: 4 additions & 0 deletions src/goe/offload/bigquery/bigquery_backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,10 @@ def db_name_label(self, initcap=False):
def default_date_based_partition_granularity(self):
return PART_COL_GRANULARITY_MONTH

def default_sort_columns_to_primary_key(self) -> bool:
"""Rule of thumb on BigQuery is to cluster by primary key if no better alternative."""
return True

def default_storage_compression(self, user_requested_codec, user_requested_format):
if user_requested_codec in ["HIGH", "MED", "NONE", None]:
return None
Expand Down
3 changes: 2 additions & 1 deletion src/goe/offload/offload_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@

RDBMS_MAX_DESCRIPTOR_LENGTH = 30

SORT_COLUMNS_NO_CHANGE = "NONE"
SORT_COLUMNS_NO_CHANGE = "GOE_SORT_NOT_SET"
SORT_COLUMNS_NONE = "NONE"

PRESENT_OP_NAME = "present"

Expand Down
1 change: 0 additions & 1 deletion src/goe/offload/offload_metadata_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from goe.config.orchestration_config import OrchestrationConfig
from goe.offload.backend_table import BackendTableInterface
from goe.offload.offload_messages import OffloadMessages
from goe.offload.offload_source_table import OffloadSourceTableInterface
from goe.persistence.orchestration_repo_client import (
OrchestrationRepoClientInterface,
)
Expand Down
62 changes: 36 additions & 26 deletions src/goe/offload/operation/sort_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
""" sort_columns: Library of functions used in GOE to process sort column controls
"""

from typing import Optional, TYPE_CHECKING

from goe.offload.column_metadata import match_table_column
from goe.offload.offload_constants import SORT_COLUMNS_NO_CHANGE
from goe.offload import offload_constants
from goe.offload.offload_functions import expand_columns_csv
from goe.offload.offload_messages import VVERBOSE
from goe.offload.offload_metadata_functions import offload_sort_columns_to_csv
from goe.util.misc_functions import csv_split

if TYPE_CHECKING:
from goe.persistence.orchestration_metadata import OrchestrationMetadata
from goe.offload.backend_api import BackendApiInterface
from goe.offload.offload_messages import OffloadMessages
from goe.offload.offload_source_table import OffloadSourceTableInterface


class OffloadSortColumnsException(Exception):
pass
Expand All @@ -47,19 +55,15 @@ class OffloadSortColumnsException(Exception):


def default_sort_columns_from_metadata(
hybrid_metadata, rdbms_column_names, revalidate_columns=False
hybrid_metadata: "OrchestrationMetadata",
):
sort_columns = None
if hybrid_metadata and hybrid_metadata.offload_sort_columns:
sort_columns = csv_split(hybrid_metadata.offload_sort_columns)
if revalidate_columns:
# If this is a re-present then ensure the sort columns still exist (schema evolution)
if sort_columns:
validate_sort_columns_exist(sort_columns, rdbms_column_names)
return sort_columns


def validate_sort_columns_exist(sort_columns, rdbms_column_names):
def validate_sort_columns_exist(sort_columns: list, rdbms_column_names: list):
assert isinstance(sort_columns, list)
assert isinstance(rdbms_column_names, list)
bad_cols = list(set(sort_columns) - set(rdbms_column_names))
Expand All @@ -69,7 +73,9 @@ def validate_sort_columns_exist(sort_columns, rdbms_column_names):
)


def validate_sort_column_types(sort_columns, backend_cols, backend_api):
def validate_sort_column_types(
sort_columns: list, backend_cols: list, backend_api: "BackendApiInterface"
):
assert isinstance(sort_columns, list)
assert isinstance(backend_cols, list)
for sort_col in sort_columns:
Expand All @@ -86,32 +92,34 @@ def validate_sort_column_types(sort_columns, backend_cols, backend_api):


def sort_columns_csv_to_sort_columns(
sort_columns_csv,
hybrid_metadata,
rdbms_column_names,
backend_cols,
backend_api,
metadata_refresh,
messages,
):
sort_columns_csv: str,
hybrid_metadata: Optional["OrchestrationMetadata"],
offload_source_table: "OffloadSourceTableInterface",
backend_cols: list,
backend_api: "BackendApiInterface",
messages: "OffloadMessages",
) -> Optional[list]:
assert isinstance(sort_columns_csv, (str, type(None)))
sort_columns = None
if metadata_refresh:
sort_columns = default_sort_columns_from_metadata(
hybrid_metadata, rdbms_column_names, revalidate_columns=True
)
elif sort_columns_csv == SORT_COLUMNS_NO_CHANGE:
rdbms_column_names = offload_source_table.get_column_names()
if sort_columns_csv == offload_constants.SORT_COLUMNS_NO_CHANGE:
if hybrid_metadata and hybrid_metadata.offload_sort_columns:
# Use existing metadata to continue with existing configuration
sort_columns = default_sort_columns_from_metadata(
hybrid_metadata, rdbms_column_names
)
sort_columns = default_sort_columns_from_metadata(hybrid_metadata)
messages.log(
"Retaining SORT BY columns from previous offload: %s" % sort_columns,
detail=VVERBOSE,
)
elif not hybrid_metadata:
# First time Offload with default options, we might want to set a default
# based on the frontend/backend combination.
if backend_api.default_sort_columns_to_primary_key():
sort_columns = offload_source_table.get_primary_key_columns()
elif sort_columns_csv == offload_constants.SORT_COLUMNS_NONE:
# The user requested no sorting.
sort_columns = None
elif sort_columns_csv:
# The user gave us a list so use that and ensure all stated SORT BY columns exist
# The user gave us a list so use that and ensure all stated SORT BY columns exist.
sort_columns = expand_columns_csv(
sort_columns_csv, rdbms_column_names, retain_non_matching_names=True
)
Expand All @@ -134,7 +142,9 @@ def sort_columns_csv_to_sort_columns(
return sort_columns


def sort_columns_have_changed(offload_source_table, offload_operation):
def sort_columns_have_changed(
offload_source_table: "OffloadSourceTableInterface", offload_operation
):
if not offload_source_table.sorted_table_supported():
return False
existing_metadata = offload_operation.pre_offload_hybrid_metadata
Expand Down
56 changes: 51 additions & 5 deletions tests/integration/scenarios/test_offload_sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def test_offload_sorting_dim(config, schema, data_db):
config,
messages,
frontend_sqls=frontend_api.standard_dimension_frontend_ddl(
schema, OFFLOAD_DIM
schema, OFFLOAD_DIM, pk_col_name="ID"
),
python_fns=[
lambda: drop_backend_test_table(
Expand All @@ -177,7 +177,6 @@ def test_offload_sorting_dim(config, schema, data_db):
)

# Default Offload Of Dimension.
# No column defaults for a non-partitioned table.
options = {
"owner_table": schema + "." + OFFLOAD_DIM,
"sort_columns_csv": offload_constants.SORT_COLUMNS_NO_CHANGE,
Expand All @@ -186,6 +185,9 @@ def test_offload_sorting_dim(config, schema, data_db):
"execute": True,
}
run_offload(options, config, messages)
expected_sort_cols = (
"ID" if backend_api.default_sort_columns_to_primary_key() else "NULL"
)
assert sort_story_assertion(
schema,
OFFLOAD_DIM,
Expand All @@ -194,12 +196,10 @@ def test_offload_sorting_dim(config, schema, data_db):
backend_api,
messages,
repo_client,
offload_sort_columns="NULL",
offload_sort_columns=expected_sort_cols,
)

if not dim_sorted_table_supported(backend_api):
# Connections are being left open, explicitly close them.
frontend_api.close()
return

# Offload table with 2 sort columns.
Expand Down Expand Up @@ -270,6 +270,25 @@ def test_offload_sorting_dim(config, schema, data_db):
offload_sort_columns="TXN_RATE",
)

# Offload table explicitly with no sort columns.
options = {
"owner_table": schema + "." + OFFLOAD_DIM,
"sort_columns_csv": offload_constants.SORT_COLUMNS_NONE,
"reset_backend_table": True,
"execute": True,
}
run_offload(options, config, messages)
assert sort_story_assertion(
schema,
OFFLOAD_DIM,
backend_name,
data_db,
backend_api,
messages,
repo_client,
offload_sort_columns="NULL",
)


def test_offload_sorting_fact(config, schema, data_db):
id = "test_offload_sorting_fact"
Expand Down Expand Up @@ -301,6 +320,33 @@ def test_offload_sorting_fact(config, schema, data_db):
),
)

if backend_api.default_sort_columns_to_primary_key():
# Initial offload with no sorting, no primary therefore no cluster columns.
options = {
"owner_table": schema + "." + OFFLOAD_FACT,
"older_than_date": test_constants.SALES_BASED_FACT_HV_1,
"offload_partition_columns": offload_sorting_fact_offload1_partition_columns(
backend_api
),
"offload_partition_granularity": offload_sorting_fact_offload1_granularity(
backend_api
),
"reset_backend_table": True,
"create_backend_db": True,
"execute": True,
}
run_offload(options, config, messages)
assert sort_story_assertion(
schema,
OFFLOAD_FACT,
backend_name,
data_db,
backend_api,
messages,
repo_client,
offload_sort_columns="NULL",
)

# Initial offload with custom sorting.
options = {
"owner_table": schema + "." + OFFLOAD_FACT,
Expand Down
3 changes: 3 additions & 0 deletions tests/testlib/test_framework/backend_testing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ def database_exists(self, db_name):
def default_date_based_partition_granularity(self):
return self._db_api.default_date_based_partition_granularity()

def default_sort_columns_to_primary_key(self):
return self._db_api.default_sort_columns_to_primary_key()

def default_storage_compression(self, storage_compression, storage_format):
return self._db_api.default_storage_compression(
storage_compression, storage_format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,7 @@ def standard_dimension_frontend_ddl(
table_name: str,
extra_col_tuples: Optional[list] = None,
empty: bool = False,
pk_col_name: str = None,
) -> list:
extra_cols = ""
if extra_col_tuples:
Expand Down Expand Up @@ -2304,7 +2305,7 @@ def standard_dimension_frontend_ddl(
"""
)
return self.gen_ctas_from_subquery(
schema, table_name, subquery, with_stats_collection=True
schema, table_name, subquery, pk_col_name=pk_col_name, with_stats_collection=True
)

def table_row_count_from_stats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,7 @@ def standard_dimension_frontend_ddl(
table_name: str,
extra_col_tuples: Optional[list] = None,
empty: bool = False,
pk_col_name: str = None,
) -> list:
extra_cols = ""
if extra_col_tuples:
Expand Down Expand Up @@ -2124,7 +2125,7 @@ def standard_dimension_frontend_ddl(
"""
)
return self.gen_ctas_from_subquery(
schema, table_name, subquery, with_stats_collection=True
schema, table_name, subquery, pk_col_name=pk_col_name, with_stats_collection=True
)

def table_row_count_from_stats(
Expand Down

0 comments on commit fd43124

Please sign in to comment.