Skip to content

Commit

Permalink
[COST-4445] simplify OCPReportDBAccessor (#4786)
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb authored Nov 20, 2023
1 parent 237f5ae commit a990d16
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 378 deletions.
233 changes: 82 additions & 151 deletions koku/masu/database/ocp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
from django.db.models import F
from django.db.models import Value
from django.db.models.functions import Coalesce
from django_tenants.utils import schema_context
from trino.exceptions import TrinoExternalError

from api.common import log_json
from api.metrics.constants import DEFAULT_DISTRIBUTION_TYPE
from api.provider.models import Provider
from koku.database import SQLScriptAtomicExecutorMixin
from masu.config import Config
from masu.database import AWS_CUR_TABLE_MAP
from masu.database import OCP_REPORT_TABLE_MAP
from masu.database.report_db_accessor_base import ReportDBAccessorBase
from masu.util.common import filter_dictionary
Expand Down Expand Up @@ -74,46 +72,24 @@ def __init__(self, schema):
super().__init__(schema)
self._datetime_format = Config.OCP_DATETIME_STR_FORMAT
self._table_map = OCP_REPORT_TABLE_MAP
self._aws_table_map = AWS_CUR_TABLE_MAP

@property
def line_item_daily_summary_table(self):
return OCPUsageLineItemDailySummary

def get_current_usage_period(self, provider_uuid):
"""Get the most recent usage report period object."""
with schema_context(self.schema):
return (
OCPUsageReportPeriod.objects.filter(provider_id=provider_uuid).order_by("-report_period_start").first()
)

def get_usage_period_by_dates_and_cluster(self, start_date, end_date, cluster_id):
"""Return all report period entries for the specified start date."""
table_name = self._table_map["report_period"]
with schema_context(self.schema):
return (
self._get_db_obj_query(table_name)
.filter(report_period_start=start_date, report_period_end=end_date, cluster_id=cluster_id)
.first()
)

def get_usage_period_query_by_provider(self, provider_uuid):
"""Return all report periods for the specified provider."""
table_name = self._table_map["report_period"]
with schema_context(self.schema):
return self._get_db_obj_query(table_name).filter(provider_id=provider_uuid)
return OCPUsageReportPeriod.objects.filter(provider_id=provider_uuid)

def report_periods_for_provider_uuid(self, provider_uuid, start_date=None):
"""Return all report periods for provider_uuid on date."""
report_periods = self.get_usage_period_query_by_provider(provider_uuid)
with schema_context(self.schema):
if start_date:
if isinstance(start_date, str):
start_date = parse(start_date)
report_date = start_date.replace(day=1)
report_periods = report_periods.filter(report_period_start=report_date).first()

return report_periods
if start_date:
if isinstance(start_date, str):
start_date = parse(start_date)
report_date = start_date.replace(day=1)
report_periods = report_periods.filter(report_period_start=report_date).first()
return report_periods

def populate_ui_summary_tables(self, start_date, end_date, source_uuid, tables=UI_SUMMARY_TABLES):
"""Populate our UI summary tables (formerly materialized views)."""
Expand Down Expand Up @@ -438,45 +414,16 @@ def populate_volume_label_summary_table(self, report_period_ids, start_date, end

def populate_markup_cost(self, markup, start_date, end_date, cluster_id):
"""Set markup cost for OCP including infrastructure cost markup."""
with schema_context(self.schema):
OCPUsageLineItemDailySummary.objects.filter(
cluster_id=cluster_id, usage_start__gte=start_date, usage_start__lte=end_date
).update(
infrastructure_markup_cost=(
(Coalesce(F("infrastructure_raw_cost"), Value(0, output_field=DecimalField()))) * markup
),
infrastructure_project_markup_cost=(
(Coalesce(F("infrastructure_project_raw_cost"), Value(0, output_field=DecimalField()))) * markup
),
)

def get_distinct_nodes(self, start_date, end_date, cluster_id):
"""Return a list of nodes for a cluster between given dates."""
with schema_context(self.schema):
unique_nodes = (
OCPUsageLineItemDailySummary.objects.filter(
usage_start__gte=start_date, usage_start__lt=end_date, cluster_id=cluster_id, node__isnull=False
)
.values_list("node")
.distinct()
)
return [node[0] for node in unique_nodes]

def get_distinct_pvcs(self, start_date, end_date, cluster_id):
"""Return a list of tuples of (PVC, node) for a cluster between given dates."""
with schema_context(self.schema):
unique_pvcs = (
OCPUsageLineItemDailySummary.objects.filter(
usage_start__gte=start_date,
usage_start__lt=end_date,
cluster_id=cluster_id,
persistentvolumeclaim__isnull=False,
namespace__isnull=False,
)
.values_list("persistentvolumeclaim", "node", "namespace")
.distinct()
)
return [(pvc[0], pvc[1], pvc[2]) for pvc in unique_pvcs]
OCPUsageLineItemDailySummary.objects.filter(
cluster_id=cluster_id, usage_start__gte=start_date, usage_start__lte=end_date
).update(
infrastructure_markup_cost=(
(Coalesce(F("infrastructure_raw_cost"), Value(0, output_field=DecimalField()))) * markup
),
infrastructure_project_markup_cost=(
(Coalesce(F("infrastructure_project_raw_cost"), Value(0, output_field=DecimalField()))) * markup
),
)

def populate_platform_and_worker_distributed_cost_sql(
self, start_date, end_date, provider_uuid, distribution_info
Expand All @@ -499,9 +446,8 @@ def populate_platform_and_worker_distributed_cost_sql(
context = {"schema": self.schema, "provider_uuid": provider_uuid, "start_date": start_date}
LOG.info(log_json(msg=msg, context=context))
return
with schema_context(self.schema):
report_period_id = report_period.id

report_period_id = report_period.id
distribute_mapping = {
"platform_cost": {
"sql_file": "distribute_platform_cost.sql",
Expand Down Expand Up @@ -569,9 +515,7 @@ def populate_monthly_cost_sql(self, cost_type, rate_type, rate, start_date, end_
)
)
return
with schema_context(self.schema):
report_period_id = report_period.id

report_period_id = report_period.id
if not rate:
LOG.info(log_json(msg="removing monthly costs", context=ctx))
self.delete_line_item_daily_summary_entries_for_date_range_raw(
Expand Down Expand Up @@ -631,8 +575,7 @@ def populate_monthly_tag_cost_sql( # noqa: C901
)
)
return
with schema_context(self.schema):
report_period_id = report_period.id
report_period_id = report_period.id

cpu_case, memory_case, volume_case = case_dict.get("cost")
labels = case_dict.get("labels")
Expand Down Expand Up @@ -721,8 +664,7 @@ def populate_usage_costs(self, rate_type, rates, start_date, end_date, provider_
)
)
return
with schema_context(self.schema):
report_period_id = report_period.id
report_period_id = report_period.id

if not rates:
LOG.info(log_json(msg="removing usage costs", context=ctx))
Expand Down Expand Up @@ -947,76 +889,72 @@ def populate_openshift_cluster_information_tables(self, provider, cluster_id, cl

def populate_cluster_table(self, provider, cluster_id, cluster_alias):
"""Get or create an entry in the OCP cluster table."""
with schema_context(self.schema):
LOG.info(log_json(msg="fetching entry in reporting_ocp_cluster", provider_uuid=provider.uuid))
clusters = OCPCluster.objects.filter(provider_id=provider.uuid)
if clusters.count() > 1:
clusters_to_delete = clusters.exclude(cluster_alias=cluster_alias)
LOG.info(
log_json(
msg="attempting to delete duplicate entries in reporting_ocp_cluster",
provider_uuid=provider.uuid,
)
)
clusters_to_delete.delete()
cluster = clusters.first()
msg = "fetched entry in reporting_ocp_cluster"
if not cluster:
cluster, created = OCPCluster.objects.get_or_create(
cluster_id=cluster_id, cluster_alias=cluster_alias, provider_id=provider.uuid
)
msg = f"created entry in reporting_ocp_clusters: {created}"

# if the cluster entry already exists and cluster alias does not match, update the cluster alias
elif cluster.cluster_alias != cluster_alias:
cluster.cluster_alias = cluster_alias
cluster.save()
msg = "updated cluster entry with new cluster alias in reporting_ocp_clusters"

LOG.info(log_json(msg="fetching entry in reporting_ocp_cluster", provider_uuid=provider.uuid))
clusters = OCPCluster.objects.filter(provider_id=provider.uuid)
if clusters.count() > 1:
clusters_to_delete = clusters.exclude(cluster_alias=cluster_alias)
LOG.info(
log_json(
msg=msg,
cluster_id=cluster_id,
cluster_alias=cluster_alias,
msg="attempting to delete duplicate entries in reporting_ocp_cluster",
provider_uuid=provider.uuid,
)
)
clusters_to_delete.delete()
cluster = clusters.first()
msg = "fetched entry in reporting_ocp_cluster"
if not cluster:
cluster, created = OCPCluster.objects.get_or_create(
cluster_id=cluster_id, cluster_alias=cluster_alias, provider_id=provider.uuid
)
msg = f"created entry in reporting_ocp_clusters: {created}"

# if the cluster entry already exists and cluster alias does not match, update the cluster alias
elif cluster.cluster_alias != cluster_alias:
cluster.cluster_alias = cluster_alias
cluster.save()
msg = "updated cluster entry with new cluster alias in reporting_ocp_clusters"

LOG.info(
log_json(
msg=msg,
cluster_id=cluster_id,
cluster_alias=cluster_alias,
provider_uuid=provider.uuid,
)
)
return cluster

def populate_node_table(self, cluster, nodes):
"""Get or create an entry in the OCP node table."""
LOG.info(log_json(msg="populating reporting_ocp_nodes table", schema=self.schema, cluster=cluster))
with schema_context(self.schema):
for node in nodes:
tmp_node = OCPNode.objects.filter(
node=node[0], resource_id=node[1], node_capacity_cpu_cores=node[2], cluster=cluster
).first()
if not tmp_node:
OCPNode.objects.create(
node=node[0],
resource_id=node[1],
node_capacity_cpu_cores=node[2],
node_role=node[3],
cluster=cluster,
)
# if the node entry already exists but does not have a role assigned, update the node role
elif not tmp_node.node_role:
tmp_node.node_role = node[3]
tmp_node.save()
for node in nodes:
tmp_node = OCPNode.objects.filter(
node=node[0], resource_id=node[1], node_capacity_cpu_cores=node[2], cluster=cluster
).first()
if not tmp_node:
OCPNode.objects.create(
node=node[0],
resource_id=node[1],
node_capacity_cpu_cores=node[2],
node_role=node[3],
cluster=cluster,
)
# if the node entry already exists but does not have a role assigned, update the node role
elif not tmp_node.node_role:
tmp_node.node_role = node[3]
tmp_node.save()

def populate_pvc_table(self, cluster, pvcs):
"""Get or create an entry in the OCP cluster table."""
LOG.info(log_json(msg="populating reporting_ocp_pvcs table", schema=self.schema, cluster=cluster))
with schema_context(self.schema):
for pvc in pvcs:
OCPPVC.objects.get_or_create(persistent_volume=pvc[0], persistent_volume_claim=pvc[1], cluster=cluster)
for pvc in pvcs:
OCPPVC.objects.get_or_create(persistent_volume=pvc[0], persistent_volume_claim=pvc[1], cluster=cluster)

def populate_project_table(self, cluster, projects):
"""Get or create an entry in the OCP cluster table."""
LOG.info(log_json(msg="populating reporting_ocp_projects table", schema=self.schema, cluster=cluster))
with schema_context(self.schema):
for project in projects:
OCPProject.objects.get_or_create(project=project, cluster=cluster)
for project in projects:
OCPProject.objects.get_or_create(project=project, cluster=cluster)

def get_nodes_trino(self, source_uuid, start_date, end_date):
"""Get the nodes from an OpenShift cluster."""
Expand Down Expand Up @@ -1081,37 +1019,30 @@ def get_projects_trino(self, source_uuid, start_date, end_date):

def get_cluster_for_provider(self, provider_uuid):
"""Return the cluster entry for a provider UUID."""
with schema_context(self.schema):
cluster = OCPCluster.objects.filter(provider_id=provider_uuid).first()
return cluster
return OCPCluster.objects.filter(provider_id=provider_uuid).first()

def get_nodes_for_cluster(self, cluster_id):
"""Get all nodes for an OCP cluster."""
with schema_context(self.schema):
nodes = (
OCPNode.objects.filter(cluster_id=cluster_id)
.exclude(node__exact="")
.values_list("node", "resource_id")
)
nodes = [(node[0], node[1]) for node in nodes]
nodes = (
OCPNode.objects.filter(cluster_id=cluster_id).exclude(node__exact="").values_list("node", "resource_id")
)
nodes = [(node[0], node[1]) for node in nodes]
return nodes

def get_pvcs_for_cluster(self, cluster_id):
"""Get all nodes for an OCP cluster."""
with schema_context(self.schema):
pvcs = (
OCPPVC.objects.filter(cluster_id=cluster_id)
.exclude(persistent_volume__exact="")
.values_list("persistent_volume", "persistent_volume_claim")
)
pvcs = [(pvc[0], pvc[1]) for pvc in pvcs]
pvcs = (
OCPPVC.objects.filter(cluster_id=cluster_id)
.exclude(persistent_volume__exact="")
.values_list("persistent_volume", "persistent_volume_claim")
)
pvcs = [(pvc[0], pvc[1]) for pvc in pvcs]
return pvcs

def get_projects_for_cluster(self, cluster_id):
"""Get all nodes for an OCP cluster."""
with schema_context(self.schema):
projects = OCPProject.objects.filter(cluster_id=cluster_id).values_list("project")
projects = [project[0] for project in projects]
projects = OCPProject.objects.filter(cluster_id=cluster_id).values_list("project")
projects = [project[0] for project in projects]
return projects

def get_openshift_topology_for_multiple_providers(self, provider_uuids):
Expand Down
Loading

0 comments on commit a990d16

Please sign in to comment.