Skip to content

Commit

Permalink
Remove tmp_id logic and clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
myersCody committed Jan 16, 2025
1 parent e2430a6 commit 9ea7d2b
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 80 deletions.
2 changes: 1 addition & 1 deletion dev/scripts/load_test_customer_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ trigger_ocp_ingest() {

check_has_data() {
local source_name=$1
response=$(curl -s "${KOKU_URL_PREFIX}/v1/sources/")
response=$(curl -s "${KOKU_URL_PREFIX}/v1/sources/?type=OCP")
has_data=$(echo "$response" | jq -r --arg source_name "$source_name" '.data[] | select(.name == $source_name) | .has_data')
}

Expand Down
2 changes: 1 addition & 1 deletion koku/masu/database/aws_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def verify_populate_ocp_on_cloud_daily_trino(self, verification_tags: List[str],
"""
Verify the managed trino table population went successfully.
"""
params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"])
params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"])
params["matched_tag_array"] = verification_tags
verification_sql = pkgutil.get_data("masu.database", "trino_sql/verify/managed_ocp_on_aws_verification.sql")
verification_sql = verification_sql.decode("utf-8")
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/database/azure_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def verify_populate_ocp_on_cloud_daily_trino(self, verification_tags: List[str],
"""
Verify the managed trino table population went successfully.
"""
params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"])
params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"])
params["matched_tag_array"] = verification_tags
verification_sql = pkgutil.get_data("masu.database", "trino_sql/verify/managed_ocp_on_azure_verification.sql")
verification_sql = verification_sql.decode("utf-8")
Expand Down
66 changes: 19 additions & 47 deletions koku/masu/database/gcp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ def verify_populate_ocp_on_cloud_daily_trino(
Args:
verification_tags: List of all cluster's matchable kv pairs
"""
params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"])
params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"])
params["matched_tag_array"] = verification_tags
verify_path = "trino_sql/verify/gcp/"
cost_total_file = verify_path + "managed_ocp_on_gcp_verification.sql"
Expand All @@ -601,29 +601,14 @@ def verify_populate_ocp_on_cloud_daily_trino(
return
LOG.info(log_json(msg="Verification successful", **params))

def _clean_up_managed_temp_tables(self, sql_metadata: ManagedSqlMetadata) -> Any:
"""
Trino doesn't allow large deletes on non-transactioanl tables.
We usually work around this by deleting the data a day at a time.
However, that many deletes can be slow, since this data is
temporary, we can just drop the table to speed up the delete.
"""
temp_tables = [
f"hive.{sql_metadata.schema}.managed_gcp_uuid_temp_{sql_metadata.tmp_id}",
f"hive.{sql_metadata.schema}.managed_gcp_openshift_daily_temp_{sql_metadata.tmp_id}",
]
for temp_table in temp_tables:
sql = f"""DROP TABLE IF EXISTS {temp_table}"""
self._execute_trino_raw_sql_query(sql, log_ref=f"drop temporary table {temp_table}")

def _create_tables_and_generate_unique_id(self, sql_metadata: ManagedSqlMetadata) -> Any:
"""
The parquet generated for the gcp line item table does not
contain a unique identifer. Therefore, we create & populate
temporary tables to prevent cost duplication.
"""
params = sql_metadata.build_params(
["tmp_id", "schema", "cloud_provider_uuid", "year", "month", "start_date", "end_date"]
["schema", "cloud_provider_uuid", "year", "month", "start_date", "end_date"]
)
populate_uuid_sql = pkgutil.get_data(
"masu.database", "trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql"
Expand All @@ -643,7 +628,7 @@ def _populate_gcp_filtered_by_ocp_tmp_table(
(None)
"""
params = sql_metadata.build_params(
["tmp_id", "schema", "cloud_provider_uuid", "start_date", "end_date", "days_tup", "year", "month"]
["schema", "cloud_provider_uuid", "start_date", "end_date", "days_tup", "year", "month"]
)
params["ocp_source_uuid"] = ocp_provider_uuid
params["matched_tag_array"] = matched_tags_result
Expand All @@ -659,7 +644,6 @@ def _populate_final_managed_table(self, sql_metadata: ManagedSqlMetadata) -> Any
"""Populates the managed openshift on gcp table"""
params = sql_metadata.build_params(
[
"tmp_id",
"schema",
"start_date",
"year",
Expand All @@ -679,32 +663,20 @@ def _populate_final_managed_table(self, sql_metadata: ManagedSqlMetadata) -> Any

def populate_ocp_on_cloud_daily_trino(self, sql_metadata: ManagedSqlMetadata) -> Any:
"""Populate the managed_gcp_openshift_daily trino table for OCP on GCP"""
try:
self._clean_up_managed_temp_tables(sql_metadata)
self._create_tables_and_generate_unique_id(sql_metadata)
verification_tags = []
for ocp_provider_uuid in sql_metadata.ocp_source_uuids:
matched_tags_result = self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata)
verification_tags.extend(matched_tags_result)
self._populate_gcp_filtered_by_ocp_tmp_table(ocp_provider_uuid, matched_tags_result, sql_metadata)
self.delete_ocp_on_gcp_hive_partition_by_day(
sql_metadata.days_tup,
sql_metadata.cloud_provider_uuid,
ocp_provider_uuid,
sql_metadata.year,
sql_metadata.month,
TRINO_MANAGED_OCP_GCP_DAILY_TABLE,
)

self._populate_final_managed_table(sql_metadata)
self._clean_up_managed_temp_tables(sql_metadata)
verification_tags = list(dict.fromkeys(verification_tags))
self.verify_populate_ocp_on_cloud_daily_trino(verification_tags, sql_metadata)
except Exception as error:
message = "Unexpected error during ocp on gcp managed table population"
log_context = sql_metadata.build_params(
["schema", "start_date", "end_date", "ocp_source_uuid", "cloud_source_uuid"]
self._create_tables_and_generate_unique_id(sql_metadata)
verification_tags = []
for ocp_provider_uuid in sql_metadata.ocp_source_uuids:
matched_tags_result = self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata)
verification_tags.extend(matched_tags_result)
self._populate_gcp_filtered_by_ocp_tmp_table(ocp_provider_uuid, matched_tags_result, sql_metadata)
self.delete_ocp_on_gcp_hive_partition_by_day(
sql_metadata.days_tup,
sql_metadata.cloud_provider_uuid,
ocp_provider_uuid,
sql_metadata.year,
sql_metadata.month,
TRINO_MANAGED_OCP_GCP_DAILY_TABLE,
)
LOG.error(log_json(msg=message, context=log_context), exc_info=error)
finally:
self._clean_up_managed_temp_tables(sql_metadata)
self._populate_final_managed_table(sql_metadata)
verification_tags = list(dict.fromkeys(verification_tags))
self.verify_populate_ocp_on_cloud_daily_trino(verification_tags, sql_metadata)
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily
;

-- Note: We can remove the need for this table if we add in uuid during parquet creation
CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}}
CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp
(
row_uuid varchar,
invoice_month varchar,
Expand All @@ -54,9 +54,6 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp
cost double,
daily_credits double,
resource_global_name varchar,
resource_id_matched boolean,
matched_tag varchar,
ocp_source varchar,
source varchar,
year varchar,
month varchar,
Expand All @@ -66,7 +63,7 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp
partitioned_by=ARRAY['source', 'year', 'month', 'day']
);

CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}}
CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp
(
row_uuid varchar,
invoice_month varchar,
Expand Down Expand Up @@ -98,11 +95,17 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily
day varchar
) WITH(
format = 'PARQUET',
partitioned_by=ARRAY['source', 'year', 'month', 'day']
partitioned_by=ARRAY['ocp_source', 'source', 'year', 'month', 'day']
);

DELETE FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp
WHERE source = {{cloud_provider_uuid}}
AND year = {{year}}
AND month = {{month}}
;

-- Populate the possible rows of GCP data assigning a uuid to each row
INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} (
INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp (
row_uuid,
invoice_month,
billing_account_id,
Expand All @@ -124,10 +127,7 @@ INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}}
cost,
daily_credits,
resource_global_name,
resource_id_matched,
matched_tag,
source,
ocp_source,
year,
month,
day
Expand All @@ -153,10 +153,7 @@ SELECT cast(uuid() as varchar) as row_uuid,
gcp.cost,
gcp.daily_credits,
gcp.resource_global_name,
NULL as resource_id_matched,
NULL as matched_tag,
gcp.source as source,
NULL as ocp_source,
gcp.year,
gcp.month,
cast(day(gcp.usage_start_time) as varchar) as day
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
DELETE FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp
WHERE ocp_source = {{ocp_source_uuid}}
AND source = {{cloud_provider_uuid}}
AND year = {{year}}
AND month = {{month}};

-- Direct resource matching
INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}} (
INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp (
row_uuid,
invoice_month,
billing_account_id,
Expand Down Expand Up @@ -31,7 +37,7 @@ INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id
)
WITH cte_gcp_resource_names AS (
SELECT DISTINCT resource_name
FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}}
FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp
WHERE source = {{cloud_provider_uuid}}
AND year = {{year}}
AND month = {{month}}
Expand Down Expand Up @@ -109,7 +115,7 @@ SELECT gcp.row_uuid,
gcp.year,
gcp.month,
cast(day(gcp.usage_start_time) as varchar) as day
FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} AS gcp
FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp AS gcp
LEFT JOIN cte_matchable_resource_names AS resource_names
ON gcp.resource_name = resource_names.resource_name
LEFT JOIN cte_agg_tags AS tag_matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY row_uuid ORDER BY usage_amount_in_pricing_units) AS row_number
FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}} AS gcp
FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp AS gcp
WHERE gcp.source = {{cloud_provider_uuid}}
AND gcp.year = {{year}}
AND gcp.month = {{month}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ FROM
(
SELECT sum(cost) AS managed_total_cost
FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily as managed_ocpcloud
WHERE managed_ocpcloud.source = {{cloud_source_uuid}}
WHERE managed_ocpcloud.source = {{cloud_provider_uuid}}
AND managed_ocpcloud.year = {{year}}
AND managed_ocpcloud.month = {{month}}
AND (resource_id_matched = True or matched_tag != '')
Expand All @@ -21,7 +21,7 @@ FROM
LEFT JOIN cte_agg_tags AS tag_matches
ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.labels, x) != 0)
AND parquet_table.ocp_matched = False
WHERE parquet_table.source = {{cloud_source_uuid}}
WHERE parquet_table.source = {{cloud_provider_uuid}}
AND parquet_table.year = {{year}}
AND parquet_table.month = {{month}}
AND (ocp_matched = True or tag_matches.matched_tags IS NOT NULL)
Expand Down
6 changes: 3 additions & 3 deletions koku/masu/database/trino_sql/verify/gcp/managed_resources.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cte_resource_breakdown AS (
FROM (
SELECT 'parquet' AS source_type, resource_name, usage_start_time, cost
FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily parquet_table
WHERE source = {{cloud_source_uuid}}
WHERE source = {{cloud_provider_uuid}}
AND year = {{year}} AND month = {{month}}
AND (ocp_matched = TRUE OR EXISTS (
SELECT 1
Expand All @@ -20,7 +20,7 @@ cte_resource_breakdown AS (
UNION ALL
SELECT 'managed' AS source_type, resource_name, usage_start_time, cost
FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily
WHERE source = {{cloud_source_uuid}}
WHERE source = {{cloud_provider_uuid}}
AND year = {{year}} AND month = {{month}}
AND (resource_id_matched = TRUE OR matched_tag != '')
) aggregated_data
Expand Down Expand Up @@ -55,7 +55,7 @@ cte_initial_cost_check AS (
JOIN cte_discrepancies d
ON gcp.resource_name = d.resource_name
AND gcp.usage_start_time = d.usage_start_time
WHERE gcp.source = {{cloud_source_uuid}}
WHERE gcp.source = {{cloud_provider_uuid}}
AND gcp.year = {{year}} AND gcp.month = {{month}}
GROUP BY gcp.resource_name, gcp.usage_start_time
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ FROM
(
SELECT sum(lineitem_unblendedcost) AS managed_total_cost
FROM hive.{{schema | sqlsafe}}.managed_aws_openshift_daily as managed_ocpcloud
WHERE managed_ocpcloud.source = {{cloud_source_uuid}}
WHERE managed_ocpcloud.source = {{cloud_provider_uuid}}
AND managed_ocpcloud.year = {{year}}
AND managed_ocpcloud.month = {{month}}
AND (resource_id_matched = True or matched_tag != '')
Expand All @@ -25,7 +25,7 @@ FROM
LEFT JOIN cte_agg_tags AS tag_matches
ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.resourcetags, x) != 0)
AND parquet_table.resource_id_matched = False
WHERE parquet_table.source = {{cloud_source_uuid}}
WHERE parquet_table.source = {{cloud_provider_uuid}}
AND parquet_table.year = {{year}}
AND lpad(parquet_table.month, 2, '0') = {{month}}
AND lineitem_lineitemtype != 'SavingsPlanCoveredUsage'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ FROM
(
SELECT sum(costinbillingcurrency) AS managed_total_cost
FROM hive.{{schema | sqlsafe}}.managed_azure_openshift_daily as managed_ocpcloud
WHERE managed_ocpcloud.source = {{cloud_source_uuid}}
WHERE managed_ocpcloud.source = {{cloud_provider_uuid}}
AND managed_ocpcloud.year = {{year}}
AND managed_ocpcloud.month = {{month}}
AND (resource_id_matched = True or matched_tag != '')
Expand All @@ -23,7 +23,7 @@ FROM
LEFT JOIN cte_agg_tags AS tag_matches
ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.tags, x) != 0)
AND parquet_table.resource_id_matched = False
WHERE parquet_table.source = {{cloud_source_uuid}}
WHERE parquet_table.source = {{cloud_provider_uuid}}
AND parquet_table.year = {{year}}
AND parquet_table.month = {{month}}
AND (resource_id_matched = True or tag_matches.matched_tags IS NOT NULL)
Expand Down
4 changes: 0 additions & 4 deletions koku/masu/processor/parquet/managed_flow_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class ManagedSqlMetadata:
days_tup: tuple = field(init=False)
year: str = field(init=False)
month: str = field(init=False)
tmp_id: str = field(init=False)

def __post_init__(self):
self._check_date_parameters_format()
Expand All @@ -47,7 +46,6 @@ def _generate_sql_params(self):
self.days_tup = tuple(str(day.day) for day in days)
self.year = self.start_date.strftime("%Y")
self.month = self.start_date.strftime("%m")
self.tmp_id = self.cloud_provider_uuid.replace("-", "_") + f"{self.year}_{self.month}"

def build_params(self, requested_keys: List[str]) -> Dict[str, Any]:
"""
Expand All @@ -69,8 +67,6 @@ def build_params(self, requested_keys: List[str]) -> Dict[str, Any]:
"ocp_source_uuids": self.ocp_source_uuids,
"cloud_provider_uuid": self.cloud_provider_uuid,
"days_tup": self.days_tup,
"tmp_id": self.tmp_id,
"days": self.days_tup,
"cloud_source_uuid": self.cloud_provider_uuid, # FIXME
}
return {key: base_params[key] for key in requested_keys}

0 comments on commit 9ea7d2b

Please sign in to comment.