Skip to content

Commit

Permalink
move hcs copy to bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb committed Jan 16, 2025
1 parent bcf7091 commit d21adc7
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 64 deletions.
41 changes: 34 additions & 7 deletions koku/hcs/csv_file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import os

import pandas as pd
from botocore.exceptions import ClientError
from botocore.exceptions import EndpointConnectionError
from dateutil import relativedelta
from django.conf import settings

from api.common import log_json
from masu.util.aws.common import copy_local_hcs_report_file_to_s3_bucket
from masu.util.aws.common import get_s3_resource

LOG = logging.getLogger(__name__)

Expand All @@ -18,6 +21,12 @@ def __init__(self, schema_name, provider, provider_uuid):
self._schema_name = str(schema_name).strip("acct")
self._provider = provider
self._provider_uuid = provider_uuid
self._s3_resource = get_s3_resource(
access_key=settings.S3_HCS_ACCESS_KEY,
secret_key=settings.S3_HCS_SECRET,
region=settings.S3_HCS_REGION,
endpoint_url=settings.S3_HCS_ENDPOINT,
)

def write_csv_to_s3(self, date, data, cols, finalize=False, tracing_id=None):
"""
Expand All @@ -30,7 +39,6 @@ def write_csv_to_s3(self, date, data, cols, finalize=False, tracing_id=None):
:return none
"""
my_df = pd.DataFrame(data)
filename = f"hcs_{date}.csv"
month = date.strftime("%m")
year = date.strftime("%Y")
Expand All @@ -44,13 +52,32 @@ def write_csv_to_s3(self, date, data, cols, finalize=False, tracing_id=None):
if finalize:
# reports are finalized on the 15th of the month following the report date
finalize_date = (date.replace(day=15) + relativedelta.relativedelta(months=1)).strftime("%Y-%m-%d")
s3_csv_path = (
s3_path = (
f"hcs/csv/{self._schema_name}/{self._provider}/source={self._provider_uuid}/year={year}/month={month}"
)

LOG.info(log_json(tracing_id, msg="preparing to write file to object storage"))
LOG.info(log_json(tracing_id, msg="preparing to write file to object storage", context=context))
self.copy_data_to_s3_bucket(tracing_id, data, cols, filename, s3_path, finalize, finalize_date, context)
LOG.info(log_json(tracing_id, msg="wrote file to object storage", context=context))

def copy_data_to_s3_bucket(
self, tracing_id, data, cols, filename, s3_path, finalize, finalize_date, context
): # pragma: no cover
"""Copy hcs data to S3 bucket."""
my_df = pd.DataFrame(data)
my_df.to_csv(filename, header=cols, index=False)
copy_local_hcs_report_file_to_s3_bucket(
tracing_id, s3_csv_path, filename, filename, finalize, finalize_date, context
)
metadata = {"finalized": str(finalize)}
if finalize and finalize_date:
metadata["finalized-date"] = finalize_date
extra_args = {"Metadata": metadata}
with open(filename, "rb") as fin:
try:
upload_key = f"{s3_path}/{filename}"
s3_obj = {"bucket_name": settings.S3_HCS_BUCKET_NAME, "key": upload_key}
upload = self._s3_resource.Object(**s3_obj)
upload.upload_fileobj(fin, ExtraArgs=extra_args)
except (EndpointConnectionError, ClientError) as err:
msg = f"unable to copy data to {upload_key}, bucket {settings.S3_HCS_BUCKET_NAME}. Reason: {str(err)}"
LOG.warning(log_json(tracing_id, msg=msg, context=context))
return
os.remove(filename)
15 changes: 7 additions & 8 deletions koku/hcs/test/test_csv_file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from hcs.test import HCSTestCase


@patch("hcs.csv_file_handler.get_s3_resource")
class TestHCSCSVFileHandler(HCSTestCase):
"""Test cases for HCS CSV File Handler"""

Expand All @@ -24,14 +25,13 @@ def setUpClass(cls):
cls.provider = Provider.PROVIDER_AWS
cls.provider_uuid = "cabfdddb-4ed5-421e-a041-311b75daf235"

def test_init(self):
def test_init(self, *args):
"""Test the initializer."""
fh = CSVFileHandler(self.schema, self.provider, self.provider_uuid)
self.assertEqual(fh._schema_name, "org1234567")
self.assertEqual(fh._provider, "AWS")
self.assertEqual(fh._provider_uuid, "cabfdddb-4ed5-421e-a041-311b75daf235")

@patch("masu.util.aws.common.get_s3_resource")
def test_write_df_to_csv(self, *args):
data = {"x": "123", "y": "456", "z": "456"}
with self.assertLogs("hcs.csv_file_handler", "INFO") as _logs:
Expand All @@ -40,10 +40,8 @@ def test_write_df_to_csv(self, *args):

self.assertIn("preparing to write file to object storage", _logs.output[0])

@patch("subs.subs_data_messenger.os.remove")
@patch("pandas.DataFrame")
@patch("hcs.csv_file_handler.copy_local_hcs_report_file_to_s3_bucket")
def test_write_csv_to_s3_finalize(self, mock_copy, mock_df, mock_remove):
@patch("hcs.csv_file_handler.CSVFileHandler.copy_data_to_s3_bucket")
def test_write_csv_to_s3_finalize(self, mock_copy, *args):
"""Test the right finalized date is determine when finalizing a report"""
data = {"x": "123", "y": "456", "z": "456"}
date = "2023-08-01"
Expand All @@ -67,9 +65,10 @@ def test_write_csv_to_s3_finalize(self, mock_copy, mock_df, mock_remove):
fh.write_csv_to_s3(datetime_date, data.items(), ["x", "y", "z"], finalize, mock_trace)
mock_copy.assert_called_once_with(
mock_trace,
expected_s3_path,
expected_filename,
data.items(),
["x", "y", "z"],
expected_filename,
expected_s3_path,
finalize,
expected_finalize_date,
mock_context,
Expand Down
2 changes: 1 addition & 1 deletion koku/koku/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@
REQUESTED_ROS_BUCKET = ENVIRONMENT.get_value("REQUESTED_ROS_BUCKET", default="ros-report")
REQUESTED_SUBS_BUCKET = ENVIRONMENT.get_value("REQUESTED_SUBS_BUCKET", default="subs-report")
S3_TIMEOUT = ENVIRONMENT.int("S3_CONNECTION_TIMEOUT", default=60)
S3_DEFAULT_ENDPOINT = ENVIRONMENT.get_value("S3_ENDPOINT", default="https://s3.amazonaws.com")
S3_ENDPOINT = ENVIRONMENT.get_value("S3_ENDPOINT", default="https://s3.amazonaws.com")
S3_REGION = ENVIRONMENT.get_value("S3_REGION", default="us-east-1")
S3_BUCKET_NAME = CONFIGURATOR.get_object_store_bucket(REQUESTED_BUCKET)
S3_ACCESS_KEY = CONFIGURATOR.get_object_store_access_key(REQUESTED_BUCKET)
Expand Down
3 changes: 1 addition & 2 deletions koku/masu/external/ros_report_shipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER
from masu.util.ocp import common as utils


LOG = logging.getLogger(__name__)


Expand All @@ -34,7 +33,7 @@ def get_ros_s3_client(): # pragma: no cover
aws_secret_access_key=settings.S3_ROS_SECRET,
region_name=settings.S3_ROS_REGION,
)
return s3_session.client("s3", endpoint_url=settings.S3_ENDPOINT, config=config)
return s3_session.client("s3", endpoint_url=settings.S3_ROS_ENDPOINT, config=config)


def generate_s3_object_url(client, upload_key): # pragma: no cover
Expand Down
49 changes: 18 additions & 31 deletions koku/masu/test/util/aws/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from unittest import TestCase
from unittest.mock import call
from unittest.mock import Mock
from unittest.mock import mock_open
from unittest.mock import patch
from unittest.mock import PropertyMock
from uuid import uuid4
Expand Down Expand Up @@ -262,8 +261,9 @@ def test_update_account_aliases_no_aliases(self):

mock_account_id = self.account_id

with patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get, patch(
"masu.util.aws.common.get_account_names_by_organization"
with (
patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get,
patch("masu.util.aws.common.get_account_names_by_organization"),
):
mock_get.return_value = (mock_account_id, mock_account_id)
utils.update_account_aliases(self.aws_provider)
Expand All @@ -279,8 +279,9 @@ def test_update_account_aliases_with_aliases(self):
mock_account_id = self.account_id
mock_alias = "mock_alias"

with patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get, patch(
"masu.util.aws.common.get_account_names_by_organization"
with (
patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get,
patch("masu.util.aws.common.get_account_names_by_organization"),
):
mock_get.return_value = (mock_account_id, mock_alias)
utils.update_account_aliases(self.aws_provider)
Expand All @@ -298,9 +299,10 @@ def test_update_account_aliases_with_aliases_and_orgs(self):
mock_alias = "mock_alias"
mock_alias2 = "mock_alias2"

with patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get, patch(
"masu.util.aws.common.get_account_names_by_organization"
) as mock_get_orgs:
with (
patch("masu.util.aws.common.get_account_alias_from_role_arn") as mock_get,
patch("masu.util.aws.common.get_account_names_by_organization") as mock_get_orgs,
):
mock_get.return_value = (mock_account_id, mock_alias)
mock_get_orgs.return_value = [{"id": mock_account_id2, "name": mock_alias2}]
utils.update_account_aliases(self.aws_provider)
Expand Down Expand Up @@ -536,9 +538,10 @@ def test_remove_s3_objects_not_matching_metadata(self):
)
self.assertListEqual(removed, [])

with patch("masu.util.aws.common.get_s3_objects_not_matching_metadata") as mock_get_objects, patch(
"masu.util.aws.common.get_s3_resource"
) as mock_s3:
with (
patch("masu.util.aws.common.get_s3_objects_not_matching_metadata") as mock_get_objects,
patch("masu.util.aws.common.get_s3_resource") as mock_s3,
):
mock_s3.return_value.Object.return_value.delete.side_effect = ClientError({}, "Error")
mock_get_objects.return_value = []
removed = utils.delete_s3_objects_not_matching_metadata(
Expand Down Expand Up @@ -644,9 +647,10 @@ def test_remove_s3_objects_matching_metadata(self):
)
self.assertListEqual(removed, [])

with patch("masu.util.aws.common.get_s3_objects_matching_metadata") as mock_get_objects, patch(
"masu.util.aws.common.get_s3_resource"
) as mock_s3:
with (
patch("masu.util.aws.common.get_s3_objects_matching_metadata") as mock_get_objects,
patch("masu.util.aws.common.get_s3_resource") as mock_s3,
):
mock_s3.return_value.Object.return_value.delete.side_effect = ClientError({}, "Error")
mock_get_objects.return_value = []
removed = utils.delete_s3_objects_matching_metadata(
Expand All @@ -665,23 +669,6 @@ def test_copy_data_to_s3_bucket(self):
with self.assertRaises(utils.UploadError):
utils.copy_data_to_s3_bucket("request_id", "path", "filename", "data", "manifest_id")

@patch("masu.util.aws.common.copy_data_to_s3_bucket")
def test_copy_local_hcs_report_file_to_s3_bucket_with_finalize(self, mock_copy):
"""Test that the proper metadata is used when a finalized date is passed in with the finalize option"""
fake_request_id = "fake_id"
fake_s3_path = "fake_path"
fake_filename = "fake_filename"
expected_metadata = {"finalized": "True", "finalized-date": "2023-08-15"}
expected_context = {}
mock_op = mock_open(read_data="x,y,z")
with patch("builtins.open", mock_op):
utils.copy_local_hcs_report_file_to_s3_bucket(
fake_request_id, fake_s3_path, fake_filename, fake_filename, True, "2023-08-15", expected_context
)
mock_copy.assert_called_once_with(
fake_request_id, fake_s3_path, fake_filename, mock_op(), expected_metadata, expected_context
)

def test_match_openshift_resources_and_labels(self):
"""Test OCP on AWS data matching."""
cluster_topology = [
Expand Down
15 changes: 0 additions & 15 deletions koku/masu/util/aws/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,21 +561,6 @@ def copy_local_report_file_to_s3_bucket(
copy_data_to_s3_bucket(request_id, s3_path, local_filename, fin, metadata, context)


def copy_local_hcs_report_file_to_s3_bucket(
request_id, s3_path, full_file_path, local_filename, finalize=False, finalize_date=None, context={}
):
"""
Copies local report file to s3 bucket
"""
if s3_path:
LOG.info(f"copy_local_HCS_report_file_to_s3_bucket: {s3_path} {full_file_path}")
with open(full_file_path, "rb") as fin:
metadata = {"finalized": str(finalize)}
if finalize and finalize_date:
metadata["finalized-date"] = finalize_date
copy_data_to_s3_bucket(request_id, s3_path, local_filename, fin, metadata, context)


def _get_s3_objects(s3_path):
s3_resource = get_s3_resource(settings.S3_ACCESS_KEY, settings.S3_SECRET, settings.S3_REGION)
return s3_resource.Bucket(settings.S3_BUCKET_NAME).objects.filter(Prefix=s3_path)
Expand Down
3 changes: 3 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
envlist = py311
skipsdist = True



[flake8]
; E203 = Whitespace before ':', conflicts with black
; W503 = Line break before binary operator
Expand All @@ -22,6 +24,7 @@ import-order-style = pycharm
application-import-names = koku, api, providers, reporting, reporting_common, cost_models, masu

[testenv]
allowlist_externals=/bin/sh
setenv =
DATABASE_SERVICE_NAME={env:DATABASE_SERVICE_NAME:POSTGRES_SQL}
DATABASE_ENGINE={env:DATABASE_ENGINE:postgresql}
Expand Down

0 comments on commit d21adc7

Please sign in to comment.