Skip to content

Commit

Permalink
RHINENG-13972 download and extract archive (#559)
Browse files Browse the repository at this point in the history
* RHINENG-13972 download and extract archive

* separate download and extract logic, remove delete section

* add tests for download_and_extract logic
  • Loading branch information
r14chandra authored Jan 13, 2025
1 parent a4522da commit a61683a
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 12 deletions.
2 changes: 1 addition & 1 deletion ros/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def build_endpoint_url(ep):
DAYS_UNTIL_STALE = int(os.getenv("DAYS_UNTIL_STALE", '45'))
CW_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'
ROS_PROCESSOR_PORT = int(os.getenv("ROS_PROCESSOR_PORT", "8000"))
ROS_SUGGESTIONS_ENGINE_PORT = int(os.getenv("ROS_SUGGESTIONS_ENGINE_PORT", "8000"))
ROS_SUGGESTIONS_ENGINE_PORT = int(os.getenv("ROS_SUGGESTIONS_ENGINE_PORT", "8003"))
ROS_API_PORT = int(os.getenv("ROS_API_PORT", "8000"))
# Timeout in seconds to set against keys of deleted systems in a cache
CACHE_TIMEOUT_FOR_DELETED_SYSTEM = int(
Expand Down
6 changes: 3 additions & 3 deletions ros/processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def events_processor():
processor.run()


def suggestions_engine_processor():
def suggestions_engine():
processor = SuggestionsEngine()
processor.processor_name = 'suggestions-engine-processor'
processor.processor_name = 'suggestions-engine'
PROCESSOR_INSTANCES.append(processor)
processor.run()

Expand All @@ -50,7 +50,7 @@ def thread_monitor():
events = threading.Thread(name='events-processor', target=events_processor)
collector = threading.Thread(name='garbage-collector', target=garbage_collector)
threadmonitor = threading.Thread(name='thread-monitor', target=thread_monitor)
suggestions_engine = threading.Thread(name='suggestions-engine', target=suggestions_engine_processor)
suggestions_engine = threading.Thread(name='suggestions-engine', target=suggestions_engine)
events.start()
engine_results.start()
suggestions_engine.start()
Expand Down
88 changes: 80 additions & 8 deletions ros/processor/suggestions_engine.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,98 @@
import time
import json
import requests
from http import HTTPStatus
from ros.lib import consume
from insights import extract
from ros.lib.config import get_logger
from tempfile import NamedTemporaryFile
from prometheus_client import start_http_server
from ros.lib.config import ROS_SUGGESTIONS_ENGINE_PORT
from ros.lib.config import (
INVENTORY_EVENTS_TOPIC,
METRICS_PORT,
GROUP_ID_SUGGESTIONS_ENGINE
)


logging = get_logger(__name__)


class SuggestionsEngine:
def __init__(self):
pass
self.consumer = consume.init_consumer(INVENTORY_EVENTS_TOPIC, GROUP_ID_SUGGESTIONS_ENGINE)
self.service = 'SUGGESTIONS_ENGINE'
self.event = None

def handle_create_update(self, payload):
self.event = "Update event" if payload.get('type') == 'updated' else "Create event"

platform_metadata = payload.get('platform_metadata')
host = payload.get('host')

if platform_metadata is None or host is None:
logging.info(f"{self.service} - {self.event} - Missing host or/and platform_metadata field(s).")
return

if not is_pcp_collected(platform_metadata):
return

archive_URL = platform_metadata.get('url')
download_and_extract(self.service, self.event, archive_URL, host, org_id=host.get('org_id'))

def run(self):
logging.info(f"{self.service} - Engine is running. Awaiting msgs.")
try:
logging.info("Flask server running on port %s", ROS_SUGGESTIONS_ENGINE_PORT)
while True:
time.sleep(1)
except Exception as err:
logging.error(err)
message = self.consumer.poll(timeout=1.0)
if message is None:
continue

try:
payload = json.loads(message.value().decode('utf-8'))
event_type = payload['type']

if 'created' == event_type or 'updated' == event_type:
self.handle_create_update(payload)
self.consumer.commit()

except json.JSONDecodeError as error:
logging.error(f"{self.service} - {self.event} - Failed to decode message: {error}")
except Exception as error:
logging.error(f"{self.service} - {self.event} - Error processing message: {error}")
except Exception as error:
logging.error(f"{self.service} - {self.event} - error: {error}")
finally:
self.consumer.close()


def download_and_extract(service, event, archive_URL, host, org_id):
logging.info(f"{service} - {event} - Downloading the report for system {host.get('id')}.")

response = requests.get(archive_URL, timeout=10)

if response.status_code != HTTPStatus.OK:
logging.error(
f"{service} - {event} - Unable to download the report for system {host.get('id')}. "
f"ERROR - {response.reason}"
)
else:
with NamedTemporaryFile(delete=True) as tempfile:
tempfile.write(response.content)
logging.info(
f"{service} - {event} - Downloaded the report successfully for system {host.get('id')}"
)
tempfile.flush()
with extract(tempfile.name) as extract_dir:
return extract_dir.tmp_dir


def is_pcp_collected(platform_metadata):
return (
platform_metadata.get('is_ros_v2') and
platform_metadata.get('is_pcp_raw_data_collected')
)


if __name__ == "__main__":
start_http_server(ROS_SUGGESTIONS_ENGINE_PORT)
start_http_server(int(METRICS_PORT))
processor = SuggestionsEngine()
processor.run()
Binary file added sample-files/with-ros-collect-pcp-rhel9.tar.gz
Binary file not shown.
95 changes: 95 additions & 0 deletions tests/test_suggestions_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import unittest
import logging
from unittest.mock import patch
from ros.processor.suggestions_engine import (
SuggestionsEngine,
is_pcp_collected,
download_and_extract
)


class TestSuggestionsEngine(unittest.TestCase):
def setUp(self):
self.engine = SuggestionsEngine()

def test_handle_create_update_missing_data(self):
payload_create = {'type': 'create'}
payload_update = {'type': 'updated', 'platform_metadata': {}}

with self.assertLogs(logging.getLogger(), level='INFO') as log:
self.engine.handle_create_update(payload_create)
expected_log_message = (
"INFO:ros.processor.suggestions_engine:SUGGESTIONS_ENGINE - Create event - "
"Missing host or/and platform_metadata field(s)."
)
self.assertIn(expected_log_message, log.output)

self.engine.handle_create_update(payload_update)
expected_log_message = (
"INFO:ros.processor.suggestions_engine:SUGGESTIONS_ENGINE - Update event - "
"Missing host or/and platform_metadata field(s)."
)
self.assertIn(expected_log_message, log.output)

def test_is_pcp_collected(self):
valid_metadata = {'is_ros_v2': True, 'is_pcp_raw_data_collected': True}
self.assertTrue(is_pcp_collected(valid_metadata))

invalid_metadata = {'is_ros_v2': True, 'is_pcp_raw_data_collected': False}
self.assertFalse(is_pcp_collected(invalid_metadata))

invalid_metadata = {'is_ros_v2': False, 'is_pcp_raw_data_collected': True}
self.assertFalse(is_pcp_collected(invalid_metadata))

invalid_metadata = {'is_ros_v2': False, 'is_pcp_raw_data_collected': False}
self.assertFalse(is_pcp_collected(invalid_metadata))


class TestDownloadAndExtract(unittest.TestCase):

@patch("ros.processor.suggestions_engine.extract")
@patch("ros.processor.suggestions_engine.NamedTemporaryFile")
@patch("ros.processor.suggestions_engine.requests.get")
def test_download_and_extract_successful(self, mock_get, mock_tempfile, mock_extract):
mock_get.return_value.status_code = 200
mock_get.return_value.content = b"dummy data"

mock_tempfile.return_value.__enter__.return_value.name = "tempfile.tar.gz"

mock_extract.return_value.__enter__.return_value.tmp_dir = "extracted_dir"

extract_dir = download_and_extract(
service="TestService",
event="TestEvent",
archive_URL="http://example.com/archive.tar.gz",
host={"id": "test_host"},
org_id="123"
)

mock_get.assert_called_once_with("http://example.com/archive.tar.gz", timeout=10)
mock_tempfile.return_value.__enter__.assert_called_once()
mock_extract.assert_called_once()
self.assertEqual(extract_dir, "extracted_dir")

@patch('ros.processor.suggestions_engine.extract')
@patch('ros.processor.suggestions_engine.requests.get')
def test_download_and_extract_failure(self, mock_get, mock_extract):
mock_get.return_value.status_code = 404
mock_get.return_value.reason = "Not Found"

mock_extract.return_value.__enter__.return_value.tmp_dir = "extracted_dir"

extract_dir = download_and_extract(
service="TestService",
event="TestEvent",
archive_URL="http://example.com/archive.tar.gz",
host={"id": "test_host"},
org_id="123"
)

mock_get.assert_called_once_with("http://example.com/archive.tar.gz", timeout=10)
self.assertIsNone(extract_dir)


if __name__ == '__main__':
unittest.main()

0 comments on commit a61683a

Please sign in to comment.