Skip to content

Commit

Permalink
chore: Remove Hadoop sequence table and UDF DB (#236)
Browse files Browse the repository at this point in the history
* chore: Remove unused Impala sequence table code

* chore: Remove test table gen code insert_literal_values

* chore: Remove legacy unused UDF installation code

* chore: Remove more unused test code

* chore: Remove unused GOE UDF code, UDFs were used to support hybrid queries

* chore: Remove udf db config item

* chore: Remove legacy capability constant and update UDF_DB tests
  • Loading branch information
nj1973 authored Jan 6, 2025
1 parent b59c1bf commit 446b29e
Show file tree
Hide file tree
Showing 30 changed files with 51 additions and 1,515 deletions.
2 changes: 0 additions & 2 deletions src/goe/config/orchestration_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@
"teradata_app_pass",
"teradata_server",
"teradata_repo_user",
"udf_db",
"use_ssl",
"use_oracle_wallet",
"webhdfs_host",
Expand Down Expand Up @@ -825,7 +824,6 @@ def from_dict(config_dict: dict, do_not_connect=False):
"teradata_repo_user",
orchestration_defaults.teradata_repo_user_default(),
),
udf_db=config_dict.get("udf_db", orchestration_defaults.udf_db_default()),
use_ssl=config_dict.get(
"use_ssl", orchestration_defaults.use_ssl_default()
),
Expand Down
4 changes: 0 additions & 4 deletions src/goe/config/orchestration_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,6 @@ def ldap_password_file_default():
return os.environ.get("HIVE_SERVER_LDAP_PASSWORD_FILE")


def udf_db_default():
return os.environ.get("OFFLOAD_UDF_DB")


def use_ssl_default():
return bool(os.environ.get("SSL_ACTIVE", "").lower() == "true" or ca_cert_default())

Expand Down
14 changes: 0 additions & 14 deletions src/goe/connect/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,20 +486,6 @@ def get_connect_opts():
help="Adds missing configuration variables from the environment file template to the environment file",
)
# Hidden options to keep TeamCity testing functioning
opt.add_option(
"--validate-udfs",
dest="validate_udfs",
default=False,
action="store_true",
help=SUPPRESS_HELP,
)
opt.add_option(
"--install-udfs",
dest="install_udfs",
default=False,
action="store_true",
help=SUPPRESS_HELP,
)
opt.add_option(
"--create-backend-db",
dest="create_backend_db",
Expand Down
77 changes: 0 additions & 77 deletions src/goe/offload/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@
CAPABILITY_FS_SCHEME_INHERIT,
CAPABILITY_FS_SCHEME_WASB,
CAPABILITY_GOE_COLUMN_TRANSFORMATIONS,
CAPABILITY_GOE_JOIN_PUSHDOWN,
CAPABILITY_GOE_MATERIALIZED_JOIN,
CAPABILITY_GOE_PARTITION_FUNCTIONS,
CAPABILITY_GOE_SEQ_TABLE,
CAPABILITY_GOE_UDFS,
CAPABILITY_LOAD_DB_TRANSPORT,
CAPABILITY_NAN,
CAPABILITY_NANOSECONDS,
Expand Down Expand Up @@ -121,10 +117,6 @@ class BackendStatsException(Exception):
pass


class MissingSequenceTableException(Exception):
pass


class BackendApiConnectionException(Exception):
pass

Expand Down Expand Up @@ -1595,25 +1587,6 @@ def get_table_stats_partitions(self, db_name, table_name):
def get_user_name(self):
pass

@abstractmethod
def insert_literal_values(
self,
db_name,
table_name,
literal_list,
column_list=None,
max_rows_per_insert=250,
split_by_cr=True,
):
"""Used to insert specific data into a table. The table should already exist.
literal_list: A list of rows to insert (a list of lists).
The row level lists should contain the exact right number of columns.
column_list: The column specs for the columns to be inserted, if left blank this will default to all
columns in the table
Disclaimer: This code is used in testing and generating the sequence table. It is not robust enough to
be a part of any Offload Transport
"""

@abstractmethod
def is_nan_sql_expression(self, column_expr):
"""Return a SQL expression testing if an expression is NaN (not-a-number)."""
Expand Down Expand Up @@ -1738,18 +1711,6 @@ def partition_column_requires_synthetic_column(self, backend_column, granularity
"""Returns True if the backend column requires a synthetic column if it is used for partitioning."""
pass

@abstractmethod
def populate_sequence_table(
self, db_name, table_name, starting_seq, target_seq, split_by_cr=False
):
"""Populate options.sequence_table_name up to target_seq.
split_by_cr: This option is a bit of a fudge but allows a calling program to parse the output.
The idea being that, on some backends, this method may produce lengthy output
which we may not want on screen. With split_by_cr the calling program has the
opportunity to split the executed text by cr and only log some lines.
"""
pass

@abstractmethod
def refresh_table_files(self, db_name, table_name, sync=None):
pass
Expand All @@ -1770,10 +1731,6 @@ def role_exists(self, role_name):
"""Check the role exists, returns True/False"""
pass

@abstractmethod
def sequence_table_max(self, db_name, table_name):
pass

@abstractmethod
def set_column_stats(
self, db_name, table_name, new_column_stats, ndv_cap, num_null_factor
Expand Down Expand Up @@ -1886,28 +1843,6 @@ def udf_details(self, db_name, udf_name):
"""
pass

@abstractmethod
def udf_installation_os(self, user_udf_version):
"""Copies any libraries required to support UDFs from our software package to wherever they should be.
Returns a list of commands executed.
"""
pass

@abstractmethod
def udf_installation_sql(self, create_udf_db, udf_db=None):
"""Executes any SQL commands required for UDF installation.
Returns a list of commands executed, in dry_run mode that's all it does.
udf_db can be empty which means we won't specify one and pickup a default.
"""
pass

@abstractmethod
def udf_installation_test(self, udf_db=None):
"""Executes any SQL commands to test each UDF.
udf_db can be empty which means we won't specify one and pickup a default.
"""
pass

@abstractmethod
def valid_canonical_override(self, column, canonical_override):
"""Present has a number of options for overriding the default canonical mapping in to_canonical_column().
Expand Down Expand Up @@ -2008,21 +1943,9 @@ def filesystem_scheme_wasb_supported(self):
def goe_column_transformations_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_COLUMN_TRANSFORMATIONS)

def goe_join_pushdown_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_JOIN_PUSHDOWN)

def goe_materialized_join_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_MATERIALIZED_JOIN)

def goe_partition_functions_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_PARTITION_FUNCTIONS)

def goe_sequence_table_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_SEQ_TABLE)

def goe_udfs_supported(self):
return self.is_capability_supported(CAPABILITY_GOE_UDFS)

def load_db_transport_supported(self):
return self.is_capability_supported(CAPABILITY_LOAD_DB_TRANSPORT)

Expand Down
32 changes: 3 additions & 29 deletions src/goe/offload/backend_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ def __init__(
self._offload_staging_format = getattr(
self._orchestration_config, "offload_staging_format", None
)
self._udf_db = getattr(self._orchestration_config, "udf_db", None)
# If orchestration_operation is not set then we are not doing anything significant by way of offload/present
self._ipa_predicate_type = None
self._offload_distribute_enabled = None
Expand Down Expand Up @@ -1044,12 +1043,12 @@ def _partition_column_requires_synthetic_column(

def _partition_function_sql_expression(self, partition_info, sql_input_expression):
"""Return a string containing a call to a partition function UDF.
e.g. UDF_DB.UDF_NAME(sql_input_expression)
e.g. SCHEMA.UDF_NAME(sql_input_expression)
"""
assert partition_info
udf_db, udf_name = partition_info.function.split(".")
udf_schema, udf_name = partition_info.function.split(".")
return "{}({})".format(
self._db_api.enclose_object_reference(udf_db, udf_name),
self._db_api.enclose_object_reference(udf_schema, udf_name),
sql_input_expression,
)

Expand Down Expand Up @@ -1807,10 +1806,6 @@ def db_name_label(self, initcap=False):
def default_date_based_partition_granularity(self):
return self._db_api.default_date_based_partition_granularity()

def default_udf_db_name(self):
"""By default we support UDF_DB but individual backends may have their own override"""
return self._udf_db

def delta_table_exists(self):
return False

Expand Down Expand Up @@ -2375,18 +2370,6 @@ def result_cache_area_exists(self):
def staging_area_exists(self):
pass

@abstractmethod
def synthetic_bucket_data_type(self):
pass

@abstractmethod
def synthetic_bucket_filter_capable_column(self, backend_column):
"""Returns the metadata method and SQL expression pattern used to generate the bucket id
Currently only enabling bucket filtering for integral data
When we consider opening this up to other types we should consider excluding TZ aware data because
a backend upgrade could change the result of CASTs
"""

###########################################################################
# PUBLIC METHODS - HIGH LEVEL STEP METHODS AND SUPPORTING ABSTRACT METHODS
###########################################################################
Expand Down Expand Up @@ -2545,18 +2528,9 @@ def create_database_supported(self):
def goe_column_transformations_supported(self):
return self._db_api.goe_column_transformations_supported()

def goe_join_pushdown_supported(self):
return self._db_api.goe_join_pushdown_supported()

def goe_materialized_join_supported(self):
return self._db_api.goe_materialized_join_supported()

def goe_partition_functions_supported(self):
return self._db_api.goe_partition_functions_supported()

def goe_udfs_supported(self):
return self._db_api.goe_udfs_supported()

def nan_supported(self):
return self._db_api.nan_supported()

Expand Down
84 changes: 2 additions & 82 deletions src/goe/offload/bigquery/bigquery_backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from datetime import datetime
import logging
from math import ceil
import pprint
import re
from textwrap import dedent
import traceback
Expand Down Expand Up @@ -255,9 +254,9 @@ def _bq_table_id(self, db_name: str, table_name: str) -> str:
def _check_kms_key_name(self, kms_key_name: str, key_type="job"):
"""Use startswith() to verify custom key for key name.
Example of custom key name:
projects/goe-teamcity/locations/us-west3/keyRings/krname/cryptoKeys/etl5
projects/goe-test/locations/us-west3/keyRings/krname/cryptoKeys/etl5
Example of what we get from a query_job object:
projects/goe-teamcity/locations/us-west3/keyRings/krname/cryptoKeys/etl5/cryptoKeyVersions/1
projects/goe-test/locations/us-west3/keyRings/krname/cryptoKeys/etl5/cryptoKeyVersions/1
It's worth noting that this has not always been the case, BigQuery behaviour has changed in the past.
"""
if self._kms_key_name and not (kms_key_name or "").startswith(
Expand Down Expand Up @@ -1814,68 +1813,6 @@ def get_user_name(self):
row = self.execute_query_fetch_one("SELECT SESSION_USER()", log_level=None)
return row[0] if row else None

def insert_literal_values(
self,
db_name,
table_name,
literal_list,
column_list=None,
max_rows_per_insert=250,
split_by_cr=True,
):
"""Insert an array of literals into a table using INSERT...VALUES(...),(...),(...).
We've done this for simplicity due to two issues:
1) BigQuery issue: https://github.com/googleapis/google-cloud-python/issues/5539
"The issue is eventual consistency with the backend. Replacing the table, while it has the
same table_id, represents a new table in terms of it's internal UUID and thus backends may
deliver to the "old" table for a short period (typically a few minutes)."
This means we can have problems with this method of inserting rows, it is likely only suitable for testing.
2) Python does not support encoding Decimal to JSON until Python 3.10:
https://bugs.python.org/issue16535
Which gives us issue with this API:
site-packages/google/cloud/bigquery/client.py", line 3421, in insert_rows
return self.insert_rows_json(table, json_rows, **kwargs)
...
json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Decimal is not JSON serializable
"""

def gen_literal(py_val, data_type):
return str(self.to_backend_literal(py_val, data_type))

assert db_name
assert table_name
assert literal_list and isinstance(literal_list, list)
assert isinstance(literal_list[0], list)

column_list = column_list or self.get_columns(db_name, table_name)
column_names = [_.name for _ in column_list]
data_type_strs = [_.format_data_type() for _ in column_list]

cmds = []
remaining_rows = literal_list[:]
while remaining_rows:
this_chunk = remaining_rows[:max_rows_per_insert]
remaining_rows = remaining_rows[max_rows_per_insert:]
formatted_rows = []
for row in this_chunk:
formatted_rows.append(
",".join(
gen_literal(py_val, data_type)
for py_val, data_type in zip(row, data_type_strs)
)
)
sql = self._insert_literals_using_insert_values_sql_text(
db_name,
table_name,
column_names,
formatted_rows,
split_by_cr=split_by_cr,
)
cmds.extend(self.execute_dml(sql, log_level=VVERBOSE))
return cmds

def is_nan_sql_expression(self, column_expr):
"""is_nan for BigQuery"""
return "is_nan(%s)" % column_expr
Expand Down Expand Up @@ -2086,11 +2023,6 @@ def partition_range_min(self):
"""
return -(2**63)

def populate_sequence_table(
self, db_name, table_name, starting_seq, target_seq, split_by_cr=False
):
raise NotImplementedError("Sequence table does not apply for BigQuery")

def refresh_table_files(self, db_name, table_name, sync=None):
"""No requirement to re-scan files for a table on BigQuery but drop from cache because that will be stale"""
self.drop_state()
Expand Down Expand Up @@ -2128,9 +2060,6 @@ def role_exists(self, role_name):
"""No roles in BigQuery"""
pass

def sequence_table_max(self, db_name, table_name):
raise NotImplementedError("Sequence table does not apply for BigQuery")

def set_column_stats(
self, db_name, table_name, new_column_stats, ndv_cap, num_null_factor
):
Expand Down Expand Up @@ -2287,15 +2216,6 @@ def udf_exists(self, db_name, udf_name):
except NotFound:
return False

def udf_installation_os(self, user_udf_version):
raise NotImplementedError("GOE UDFs are not supported on BigQuery")

def udf_installation_sql(self, create_udf_db, udf_db=None):
raise NotImplementedError("GOE UDFs are not supported on BigQuery")

def udf_installation_test(self, udf_db=None):
raise NotImplementedError("GOE UDFs are not supported on BigQuery")

def valid_canonical_override(self, column, canonical_override):
assert isinstance(column, BigQueryColumn)
if isinstance(canonical_override, CanonicalColumn):
Expand Down
Loading

0 comments on commit 446b29e

Please sign in to comment.