From f6e7ebba50c88cae41b7d4eb10b55e107eacb17c Mon Sep 17 00:00:00 2001 From: Luke Couzens Date: Tue, 7 May 2024 13:59:37 +0100 Subject: [PATCH] [COST-3617] - Fix GCP end of month crossover summary (#5080) * [COST-3617] - Fix GCP end of month crossover summary --- koku/api/utils.py | 8 ++++++++ koku/masu/database/gcp_report_db_accessor.py | 13 +++++++------ ...reporting_gcpcostentrylineitem_daily_summary.sql | 1 - .../downloader/gcp/gcp_report_downloader.py | 12 +++++++----- .../gcp_local/gcp_local_report_downloader.py | 8 +++++--- .../processor/parquet/parquet_report_processor.py | 3 +++ 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/koku/api/utils.py b/koku/api/utils.py index c3ae296bbd..ebd8fdd499 100644 --- a/koku/api/utils.py +++ b/koku/api/utils.py @@ -448,6 +448,14 @@ def gcp_find_invoice_months_in_date_range(self, start, end): invoice_months.append(invoice_month) return invoice_months + def get_year_month_list_from_start_end(self, start, end): + if isinstance(start, datetime.date): + start = datetime.datetime(start.year, start.month, start.day, tzinfo=settings.UTC) + if isinstance(end, datetime.date): + end = datetime.datetime(end.year, end.month, end.day, tzinfo=settings.UTC) + dates = self.list_months(start, end) + return [{"year": date.strftime("%Y"), "month": date.strftime("%m")} for date in dates] + def materialized_view_month_start(dh=DateHelper()): """Datetime of midnight on the first of the month where materialized summary starts.""" diff --git a/koku/masu/database/gcp_report_db_accessor.py b/koku/masu/database/gcp_report_db_accessor.py index 5f6405e911..3497deece5 100644 --- a/koku/masu/database/gcp_report_db_accessor.py +++ b/koku/masu/database/gcp_report_db_accessor.py @@ -21,6 +21,7 @@ from api.common import log_json from api.provider.models import Provider +from api.utils import DateHelper from koku.database import SQLScriptAtomicExecutorMixin from masu.database import GCP_REPORT_TABLE_MAP from masu.database import OCP_REPORT_TABLE_MAP @@ -107,6 +108,7 @@ def populate_line_item_daily_summary_table_trino( (None) """ + date_dicts = DateHelper().get_year_month_list_from_start_end(start_date, end_date) last_month_end = datetime.date.today().replace(day=1) - datetime.timedelta(days=1) if end_date == last_month_end: @@ -134,15 +136,14 @@ def populate_line_item_daily_summary_table_trino( "schema": self.schema, "table": TRINO_LINE_ITEM_TABLE, "source_uuid": source_uuid, - "year": invoice_month_date.strftime("%Y"), - "month": invoice_month_date.strftime("%m"), "markup": markup_value or 0, "bill_id": bill_id, } - - self._execute_trino_raw_sql_query( - sql, sql_params=sql_params, log_ref="reporting_gcpcostentrylineitem_daily_summary.sql" - ) + for date_dict in date_dicts: + sql_params = sql_params | {"year": date_dict["year"], "month": date_dict["month"]} + self._execute_trino_raw_sql_query( + sql, sql_params=sql_params, log_ref="reporting_gcpcostentrylineitem_daily_summary.sql" + ) def populate_tags_summary_table(self, bill_ids, start_date, end_date): """Populate the line item aggregated totals data table.""" diff --git a/koku/masu/database/trino_sql/reporting_gcpcostentrylineitem_daily_summary.sql b/koku/masu/database/trino_sql/reporting_gcpcostentrylineitem_daily_summary.sql index 3f3c9b9ca7..8b274d1201 100644 --- a/koku/masu/database/trino_sql/reporting_gcpcostentrylineitem_daily_summary.sql +++ b/koku/masu/database/trino_sql/reporting_gcpcostentrylineitem_daily_summary.sql @@ -63,7 +63,6 @@ CROSS JOIN WHERE source = '{{source_uuid | sqlsafe}}' AND year = '{{year | sqlsafe}}' AND month = '{{month | sqlsafe}}' - AND invoice_month = '{{year | sqlsafe}}{{month | sqlsafe}}' AND usage_start_time >= TIMESTAMP '{{start_date | sqlsafe}}' AND usage_start_time < date_add('day', 1, TIMESTAMP '{{end_date | sqlsafe}}') GROUP BY billing_account_id, diff --git a/koku/masu/external/downloader/gcp/gcp_report_downloader.py b/koku/masu/external/downloader/gcp/gcp_report_downloader.py index c8e44d0bff..d679d3e61c 100644 --- a/koku/masu/external/downloader/gcp/gcp_report_downloader.py +++ b/koku/masu/external/downloader/gcp/gcp_report_downloader.py @@ -91,12 +91,14 @@ def create_daily_archives( data_frame = pd_read_csv(local_file_path) data_frame = add_label_columns(data_frame) # putting it in for loop handles crossover data, when we have distinct invoice_month + unique_usage_days = pd.to_datetime(data_frame["usage_start_time"]).dt.date.unique() + days = list({day.strftime("%Y-%m-%d") for day in unique_usage_days}) + date_range = {"start": min(days), "end": max(days)} for invoice_month in data_frame["invoice.month"].unique(): invoice_filter = data_frame["invoice.month"] == invoice_month invoice_month_data = data_frame[invoice_filter] - unique_usage_days = pd.to_datetime(invoice_month_data["usage_start_time"]).dt.date.unique() - days = list({day.strftime("%Y-%m-%d") for day in unique_usage_days}) - date_range = {"start": min(days), "end": max(days), "invoice_month": str(invoice_month)} + # We may be able to completely remove invoice month in the future + date_range["invoice_month"] = str(invoice_month) partition_dates = invoice_month_data.partition_date.unique() for partition_date in partition_dates: partition_date_filter = invoice_month_data["partition_date"] == partition_date @@ -129,8 +131,8 @@ def create_daily_archives( tracing_id, s3_csv_path, day_filepath, day_file, manifest_id, context ) daily_file_names.append(day_filepath) - except Exception: - msg = f"unable to create daily archives from: {local_file_paths}" + except Exception as e: + msg = f"unable to create daily archives from: {local_file_paths}. reason: {e}" LOG.info(log_json(tracing_id, msg=msg, context=context)) raise CreateDailyArchivesError(msg) return daily_file_names, date_range diff --git a/koku/masu/external/downloader/gcp_local/gcp_local_report_downloader.py b/koku/masu/external/downloader/gcp_local/gcp_local_report_downloader.py index 5a171ad1e5..8c30e92a55 100644 --- a/koku/masu/external/downloader/gcp_local/gcp_local_report_downloader.py +++ b/koku/masu/external/downloader/gcp_local/gcp_local_report_downloader.py @@ -54,12 +54,14 @@ def create_daily_archives(tracing_id, account, provider_uuid, filename, filepath LOG.error(f"File {filepath} could not be parsed. Reason: {str(error)}") raise GCPReportDownloaderError(error) # putting it in for loop handles crossover data, when we have distinct invoice_month + unique_usage_days = pd.to_datetime(data_frame["usage_start_time"]).dt.date.unique() + days = list({day.strftime("%Y-%m-%d") for day in unique_usage_days}) + date_range = {"start": min(days), "end": max(days)} for invoice_month in data_frame["invoice.month"].unique(): invoice_filter = data_frame["invoice.month"] == invoice_month invoice_month_data = data_frame[invoice_filter] - unique_usage_days = pd.to_datetime(invoice_month_data["usage_start_time"]).dt.date.unique() - days = list({day.strftime("%Y-%m-%d") for day in unique_usage_days}) - date_range = {"start": min(days), "end": max(days), "invoice_month": str(invoice_month)} + # We may be able to completely remove invoice month in the future + date_range["invoice_month"] = str(invoice_month) partition_dates = invoice_month_data.partition_date.unique() for partition_date in partition_dates: partition_date_filter = invoice_month_data["partition_date"] == partition_date diff --git a/koku/masu/processor/parquet/parquet_report_processor.py b/koku/masu/processor/parquet/parquet_report_processor.py index 51b5cdf474..76f0c2796a 100644 --- a/koku/masu/processor/parquet/parquet_report_processor.py +++ b/koku/masu/processor/parquet/parquet_report_processor.py @@ -476,6 +476,9 @@ def convert_to_parquet(self): # noqa: C901 daily_data_frames.extend(daily_frame) if self.provider_type not in (Provider.PROVIDER_AZURE): self.create_daily_parquet(parquet_base_filename, daily_frame) + if self.provider_type in [Provider.PROVIDER_GCP, Provider.PROVIDER_GCP_LOCAL]: + # Sync partitions on each file to create partitions that cross month bondaries + self.create_parquet_table(parquet_base_filename) if not success: msg = "failed to convert files to parquet" LOG.warning(