diff --git a/dev/scripts/load_test_customer_data.sh b/dev/scripts/load_test_customer_data.sh index 58dee32c43..b22c8bbe08 100755 --- a/dev/scripts/load_test_customer_data.sh +++ b/dev/scripts/load_test_customer_data.sh @@ -215,7 +215,7 @@ trigger_ocp_ingest() { check_has_data() { local source_name=$1 - response=$(curl -s "${KOKU_URL_PREFIX}/v1/sources/") + response=$(curl -s "${KOKU_URL_PREFIX}/v1/sources/?type=OCP") has_data=$(echo "$response" | jq -r --arg source_name "$source_name" '.data[] | select(.name == $source_name) | .has_data') } diff --git a/koku/masu/database/aws_report_db_accessor.py b/koku/masu/database/aws_report_db_accessor.py index d2ed2d4e93..f89825b577 100644 --- a/koku/masu/database/aws_report_db_accessor.py +++ b/koku/masu/database/aws_report_db_accessor.py @@ -484,7 +484,7 @@ def verify_populate_ocp_on_cloud_daily_trino(self, verification_tags: List[str], """ Verify the managed trino table population went successfully. """ - params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"]) + params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"]) params["matched_tag_array"] = verification_tags verification_sql = pkgutil.get_data("masu.database", "trino_sql/verify/managed_ocp_on_aws_verification.sql") verification_sql = verification_sql.decode("utf-8") diff --git a/koku/masu/database/azure_report_db_accessor.py b/koku/masu/database/azure_report_db_accessor.py index 6921342a4e..41144d1faf 100644 --- a/koku/masu/database/azure_report_db_accessor.py +++ b/koku/masu/database/azure_report_db_accessor.py @@ -433,7 +433,7 @@ def verify_populate_ocp_on_cloud_daily_trino(self, verification_tags: List[str], """ Verify the managed trino table population went successfully. """ - params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"]) + params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"]) params["matched_tag_array"] = verification_tags verification_sql = pkgutil.get_data("masu.database", "trino_sql/verify/managed_ocp_on_azure_verification.sql") verification_sql = verification_sql.decode("utf-8") diff --git a/koku/masu/database/gcp_report_db_accessor.py b/koku/masu/database/gcp_report_db_accessor.py index 929adf26cd..16867930f7 100644 --- a/koku/masu/database/gcp_report_db_accessor.py +++ b/koku/masu/database/gcp_report_db_accessor.py @@ -580,7 +580,7 @@ def verify_populate_ocp_on_cloud_daily_trino( Args: verification_tags: List of all cluster's matchable kv pairs """ - params = sql_metadata.build_params(["schema", "cloud_source_uuid", "year", "month"]) + params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"]) params["matched_tag_array"] = verification_tags verify_path = "trino_sql/verify/gcp/" cost_total_file = verify_path + "managed_ocp_on_gcp_verification.sql" @@ -601,21 +601,6 @@ def verify_populate_ocp_on_cloud_daily_trino( return LOG.info(log_json(msg="Verification successful", **params)) - def _clean_up_managed_temp_tables(self, sql_metadata: ManagedSqlMetadata) -> Any: - """ - Trino doesn't allow large deletes on non-transactioanl tables. - We usually work around this by deleting the data a day at a time. - However, that many deletes can be slow, since this data is - temporary, we can just drop the table to speed up the delete. - """ - temp_tables = [ - f"hive.{sql_metadata.schema}.managed_gcp_uuid_temp_{sql_metadata.tmp_id}", - f"hive.{sql_metadata.schema}.managed_gcp_openshift_daily_temp_{sql_metadata.tmp_id}", - ] - for temp_table in temp_tables: - sql = f"""DROP TABLE IF EXISTS {temp_table}""" - self._execute_trino_raw_sql_query(sql, log_ref=f"drop temporary table {temp_table}") - def _create_tables_and_generate_unique_id(self, sql_metadata: ManagedSqlMetadata) -> Any: """ The parquet generated for the gcp line item table does not @@ -623,7 +608,7 @@ def _create_tables_and_generate_unique_id(self, sql_metadata: ManagedSqlMetadata temporary tables to prevent cost duplication. """ params = sql_metadata.build_params( - ["tmp_id", "schema", "cloud_provider_uuid", "year", "month", "start_date", "end_date"] + ["schema", "cloud_provider_uuid", "year", "month", "start_date", "end_date"] ) populate_uuid_sql = pkgutil.get_data( "masu.database", "trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql" @@ -643,7 +628,7 @@ def _populate_gcp_filtered_by_ocp_tmp_table( (None) """ params = sql_metadata.build_params( - ["tmp_id", "schema", "cloud_provider_uuid", "start_date", "end_date", "days_tup", "year", "month"] + ["schema", "cloud_provider_uuid", "start_date", "end_date", "days_tup", "year", "month"] ) params["ocp_source_uuid"] = ocp_provider_uuid params["matched_tag_array"] = matched_tags_result @@ -659,7 +644,6 @@ def _populate_final_managed_table(self, sql_metadata: ManagedSqlMetadata) -> Any """Populates the managed openshift on gcp table""" params = sql_metadata.build_params( [ - "tmp_id", "schema", "start_date", "year", @@ -679,32 +663,20 @@ def _populate_final_managed_table(self, sql_metadata: ManagedSqlMetadata) -> Any def populate_ocp_on_cloud_daily_trino(self, sql_metadata: ManagedSqlMetadata) -> Any: """Populate the managed_gcp_openshift_daily trino table for OCP on GCP""" - try: - self._clean_up_managed_temp_tables(sql_metadata) - self._create_tables_and_generate_unique_id(sql_metadata) - verification_tags = [] - for ocp_provider_uuid in sql_metadata.ocp_source_uuids: - matched_tags_result = self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata) - verification_tags.extend(matched_tags_result) - self._populate_gcp_filtered_by_ocp_tmp_table(ocp_provider_uuid, matched_tags_result, sql_metadata) - self.delete_ocp_on_gcp_hive_partition_by_day( - sql_metadata.days_tup, - sql_metadata.cloud_provider_uuid, - ocp_provider_uuid, - sql_metadata.year, - sql_metadata.month, - TRINO_MANAGED_OCP_GCP_DAILY_TABLE, - ) - - self._populate_final_managed_table(sql_metadata) - self._clean_up_managed_temp_tables(sql_metadata) - verification_tags = list(dict.fromkeys(verification_tags)) - self.verify_populate_ocp_on_cloud_daily_trino(verification_tags, sql_metadata) - except Exception as error: - message = "Unexpected error during ocp on gcp managed table population" - log_context = sql_metadata.build_params( - ["schema", "start_date", "end_date", "ocp_source_uuid", "cloud_source_uuid"] + self._create_tables_and_generate_unique_id(sql_metadata) + verification_tags = [] + for ocp_provider_uuid in sql_metadata.ocp_source_uuids: + matched_tags_result = self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata) + verification_tags.extend(matched_tags_result) + self._populate_gcp_filtered_by_ocp_tmp_table(ocp_provider_uuid, matched_tags_result, sql_metadata) + self.delete_ocp_on_gcp_hive_partition_by_day( + sql_metadata.days_tup, + sql_metadata.cloud_provider_uuid, + ocp_provider_uuid, + sql_metadata.year, + sql_metadata.month, + TRINO_MANAGED_OCP_GCP_DAILY_TABLE, ) - LOG.error(log_json(msg=message, context=log_context), exc_info=error) - finally: - self._clean_up_managed_temp_tables(sql_metadata) + self._populate_final_managed_table(sql_metadata) + verification_tags = list(dict.fromkeys(verification_tags)) + self.verify_populate_ocp_on_cloud_daily_trino(verification_tags, sql_metadata) diff --git a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql index 9865c6e4f1..6b967bfe99 100644 --- a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql +++ b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily ; -- Note: We can remove the need for this table if we add in uuid during parquet creation -CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} +CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp ( row_uuid varchar, invoice_month varchar, @@ -54,9 +54,6 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp cost double, daily_credits double, resource_global_name varchar, - resource_id_matched boolean, - matched_tag varchar, - ocp_source varchar, source varchar, year varchar, month varchar, @@ -66,7 +63,7 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp partitioned_by=ARRAY['source', 'year', 'month', 'day'] ); -CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}} +CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp ( row_uuid varchar, invoice_month varchar, @@ -98,11 +95,17 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily day varchar ) WITH( format = 'PARQUET', - partitioned_by=ARRAY['source', 'year', 'month', 'day'] + partitioned_by=ARRAY['ocp_source', 'source', 'year', 'month', 'day'] ); +DELETE FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp +WHERE source = {{cloud_provider_uuid}} + AND year = {{year}} + AND month = {{month}} +; + -- Populate the possible rows of GCP data assigning a uuid to each row -INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} ( +INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp ( row_uuid, invoice_month, billing_account_id, @@ -124,10 +127,7 @@ INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} cost, daily_credits, resource_global_name, - resource_id_matched, - matched_tag, source, - ocp_source, year, month, day @@ -153,10 +153,7 @@ SELECT cast(uuid() as varchar) as row_uuid, gcp.cost, gcp.daily_credits, gcp.resource_global_name, - NULL as resource_id_matched, - NULL as matched_tag, gcp.source as source, - NULL as ocp_source, gcp.year, gcp.month, cast(day(gcp.usage_start_time) as varchar) as day diff --git a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/1_populate_managed_tmp_table.sql b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/1_populate_managed_tmp_table.sql index 09896bda96..57703b3f61 100644 --- a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/1_populate_managed_tmp_table.sql +++ b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/1_populate_managed_tmp_table.sql @@ -1,5 +1,11 @@ +DELETE FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp +WHERE ocp_source = {{ocp_source_uuid}} +AND source = {{cloud_provider_uuid}} +AND year = {{year}} +AND month = {{month}}; + -- Direct resource matching -INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}} ( +INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp ( row_uuid, invoice_month, billing_account_id, @@ -31,7 +37,7 @@ INSERT INTO hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id ) WITH cte_gcp_resource_names AS ( SELECT DISTINCT resource_name - FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} + FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp WHERE source = {{cloud_provider_uuid}} AND year = {{year}} AND month = {{month}} @@ -109,7 +115,7 @@ SELECT gcp.row_uuid, gcp.year, gcp.month, cast(day(gcp.usage_start_time) as varchar) as day -FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp_{{tmp_id | sqlsafe}} AS gcp +FROM hive.{{schema | sqlsafe}}.managed_gcp_uuid_temp AS gcp LEFT JOIN cte_matchable_resource_names AS resource_names ON gcp.resource_name = resource_names.resource_name LEFT JOIN cte_agg_tags AS tag_matches diff --git a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/2_managed_gcp_openshift_daily.sql b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/2_managed_gcp_openshift_daily.sql index c58f67f7bf..8780090bd6 100644 --- a/koku/masu/database/trino_sql/gcp/openshift/managed_flow/2_managed_gcp_openshift_daily.sql +++ b/koku/masu/database/trino_sql/gcp/openshift/managed_flow/2_managed_gcp_openshift_daily.sql @@ -58,7 +58,7 @@ FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY row_uuid ORDER BY usage_amount_in_pricing_units) AS row_number - FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp_{{tmp_id | sqlsafe}} AS gcp + FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily_temp AS gcp WHERE gcp.source = {{cloud_provider_uuid}} AND gcp.year = {{year}} AND gcp.month = {{month}} diff --git a/koku/masu/database/trino_sql/verify/gcp/managed_ocp_on_gcp_verification.sql b/koku/masu/database/trino_sql/verify/gcp/managed_ocp_on_gcp_verification.sql index b842d1d94c..fb5cf560e9 100644 --- a/koku/masu/database/trino_sql/verify/gcp/managed_ocp_on_gcp_verification.sql +++ b/koku/masu/database/trino_sql/verify/gcp/managed_ocp_on_gcp_verification.sql @@ -10,7 +10,7 @@ FROM ( SELECT sum(cost) AS managed_total_cost FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily as managed_ocpcloud - WHERE managed_ocpcloud.source = {{cloud_source_uuid}} + WHERE managed_ocpcloud.source = {{cloud_provider_uuid}} AND managed_ocpcloud.year = {{year}} AND managed_ocpcloud.month = {{month}} AND (resource_id_matched = True or matched_tag != '') @@ -21,7 +21,7 @@ FROM LEFT JOIN cte_agg_tags AS tag_matches ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.labels, x) != 0) AND parquet_table.ocp_matched = False - WHERE parquet_table.source = {{cloud_source_uuid}} + WHERE parquet_table.source = {{cloud_provider_uuid}} AND parquet_table.year = {{year}} AND parquet_table.month = {{month}} AND (ocp_matched = True or tag_matches.matched_tags IS NOT NULL) diff --git a/koku/masu/database/trino_sql/verify/gcp/managed_resources.sql b/koku/masu/database/trino_sql/verify/gcp/managed_resources.sql index 46f3520b4e..bfd2744016 100644 --- a/koku/masu/database/trino_sql/verify/gcp/managed_resources.sql +++ b/koku/masu/database/trino_sql/verify/gcp/managed_resources.sql @@ -10,7 +10,7 @@ cte_resource_breakdown AS ( FROM ( SELECT 'parquet' AS source_type, resource_name, usage_start_time, cost FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily parquet_table - WHERE source = {{cloud_source_uuid}} + WHERE source = {{cloud_provider_uuid}} AND year = {{year}} AND month = {{month}} AND (ocp_matched = TRUE OR EXISTS ( SELECT 1 @@ -20,7 +20,7 @@ cte_resource_breakdown AS ( UNION ALL SELECT 'managed' AS source_type, resource_name, usage_start_time, cost FROM hive.{{schema | sqlsafe}}.managed_gcp_openshift_daily - WHERE source = {{cloud_source_uuid}} + WHERE source = {{cloud_provider_uuid}} AND year = {{year}} AND month = {{month}} AND (resource_id_matched = TRUE OR matched_tag != '') ) aggregated_data @@ -55,7 +55,7 @@ cte_initial_cost_check AS ( JOIN cte_discrepancies d ON gcp.resource_name = d.resource_name AND gcp.usage_start_time = d.usage_start_time - WHERE gcp.source = {{cloud_source_uuid}} + WHERE gcp.source = {{cloud_provider_uuid}} AND gcp.year = {{year}} AND gcp.month = {{month}} GROUP BY gcp.resource_name, gcp.usage_start_time ) diff --git a/koku/masu/database/trino_sql/verify/managed_ocp_on_aws_verification.sql b/koku/masu/database/trino_sql/verify/managed_ocp_on_aws_verification.sql index 4806b0bc87..4c01317689 100644 --- a/koku/masu/database/trino_sql/verify/managed_ocp_on_aws_verification.sql +++ b/koku/masu/database/trino_sql/verify/managed_ocp_on_aws_verification.sql @@ -12,7 +12,7 @@ FROM ( SELECT sum(lineitem_unblendedcost) AS managed_total_cost FROM hive.{{schema | sqlsafe}}.managed_aws_openshift_daily as managed_ocpcloud - WHERE managed_ocpcloud.source = {{cloud_source_uuid}} + WHERE managed_ocpcloud.source = {{cloud_provider_uuid}} AND managed_ocpcloud.year = {{year}} AND managed_ocpcloud.month = {{month}} AND (resource_id_matched = True or matched_tag != '') @@ -25,7 +25,7 @@ FROM LEFT JOIN cte_agg_tags AS tag_matches ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.resourcetags, x) != 0) AND parquet_table.resource_id_matched = False - WHERE parquet_table.source = {{cloud_source_uuid}} + WHERE parquet_table.source = {{cloud_provider_uuid}} AND parquet_table.year = {{year}} AND lpad(parquet_table.month, 2, '0') = {{month}} AND lineitem_lineitemtype != 'SavingsPlanCoveredUsage' diff --git a/koku/masu/database/trino_sql/verify/managed_ocp_on_azure_verification.sql b/koku/masu/database/trino_sql/verify/managed_ocp_on_azure_verification.sql index 7f0d68abc6..e192b0455d 100644 --- a/koku/masu/database/trino_sql/verify/managed_ocp_on_azure_verification.sql +++ b/koku/masu/database/trino_sql/verify/managed_ocp_on_azure_verification.sql @@ -12,7 +12,7 @@ FROM ( SELECT sum(costinbillingcurrency) AS managed_total_cost FROM hive.{{schema | sqlsafe}}.managed_azure_openshift_daily as managed_ocpcloud - WHERE managed_ocpcloud.source = {{cloud_source_uuid}} + WHERE managed_ocpcloud.source = {{cloud_provider_uuid}} AND managed_ocpcloud.year = {{year}} AND managed_ocpcloud.month = {{month}} AND (resource_id_matched = True or matched_tag != '') @@ -23,7 +23,7 @@ FROM LEFT JOIN cte_agg_tags AS tag_matches ON any_match(tag_matches.matched_tags, x->strpos(parquet_table.tags, x) != 0) AND parquet_table.resource_id_matched = False - WHERE parquet_table.source = {{cloud_source_uuid}} + WHERE parquet_table.source = {{cloud_provider_uuid}} AND parquet_table.year = {{year}} AND parquet_table.month = {{month}} AND (resource_id_matched = True or tag_matches.matched_tags IS NOT NULL) diff --git a/koku/masu/processor/parquet/managed_flow_params.py b/koku/masu/processor/parquet/managed_flow_params.py index 86f9017e26..4ab9e6f716 100644 --- a/koku/masu/processor/parquet/managed_flow_params.py +++ b/koku/masu/processor/parquet/managed_flow_params.py @@ -22,7 +22,6 @@ class ManagedSqlMetadata: days_tup: tuple = field(init=False) year: str = field(init=False) month: str = field(init=False) - tmp_id: str = field(init=False) def __post_init__(self): self._check_date_parameters_format() @@ -47,7 +46,6 @@ def _generate_sql_params(self): self.days_tup = tuple(str(day.day) for day in days) self.year = self.start_date.strftime("%Y") self.month = self.start_date.strftime("%m") - self.tmp_id = self.cloud_provider_uuid.replace("-", "_") + f"{self.year}_{self.month}" def build_params(self, requested_keys: List[str]) -> Dict[str, Any]: """ @@ -69,8 +67,6 @@ def build_params(self, requested_keys: List[str]) -> Dict[str, Any]: "ocp_source_uuids": self.ocp_source_uuids, "cloud_provider_uuid": self.cloud_provider_uuid, "days_tup": self.days_tup, - "tmp_id": self.tmp_id, "days": self.days_tup, - "cloud_source_uuid": self.cloud_provider_uuid, # FIXME } return {key: base_params[key] for key in requested_keys}