-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
337 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
""" | ||
Async task for generating reports by executing database queries | ||
and posting the results to the Shipyard API. | ||
""" | ||
|
||
from celery import shared_task, Task | ||
from eox_core.api.data.data_collector.utils import execute_query, post_data_to_api, serialize_data | ||
import yaml | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ReportTask(Task): | ||
""" | ||
Custom task class to handle report generation with an on_failure hook. | ||
""" | ||
def on_failure(self, exc, task_id, args, kwargs, einfo): | ||
""" | ||
Called when the task has exhausted all retries. | ||
Args: | ||
exc (Exception): The exception raised. | ||
task_id (str): The ID of the failed task. | ||
args (tuple): The positional arguments for the task. | ||
kwargs (dict): The keyword arguments for the task. | ||
einfo (ExceptionInfo): Exception information. | ||
""" | ||
logger.error(f"Task {task_id} failed after retries. Exception: {exc}. Could not collect data.") | ||
|
||
|
||
@shared_task(bind=True) | ||
def generate_report(self, destination_url, query_file_content, token_generation_url, current_host): | ||
""" | ||
Async task to generate a report: | ||
1. Reads queries from the provided query file content. | ||
2. Executes each query against the database. | ||
3. Sends the results to the Shipyard API. | ||
Args: | ||
self (Task): The Celery task instance. | ||
query_file_content (str): The content of the query file in YAML format. | ||
Raises: | ||
Retry: If an error occurs, the task retries up to 3 times with a 60-second delay. | ||
""" | ||
try: | ||
queries = yaml.safe_load(query_file_content).get("queries", []) | ||
if not queries: | ||
logger.warning("No queries found in the provided file. Task will exit.") | ||
return | ||
|
||
report_data = {} | ||
for query in queries: | ||
query_name = query.get("name") | ||
query_sql = query.get("query") | ||
logger.info(f"Executing query: {query_name}") | ||
try: | ||
result = execute_query(query_sql) | ||
|
||
serialized_result = serialize_data(result) | ||
report_data[query_name] = serialized_result | ||
except Exception as e: | ||
logger.error(f"Failed to execute query '{query_name}': {e}") | ||
continue | ||
|
||
post_data_to_api(destination_url, report_data, token_generation_url, current_host) | ||
|
||
logger.info("Report generation task completed successfully.") | ||
except Exception as e: | ||
logger.error(f"An error occurred in the report generation task: {e}. Retrying") | ||
raise self.retry(exc=e, countdown=60, max_retries=3) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
""" | ||
URLs for the Microsite API | ||
""" | ||
from django.urls import include, re_path | ||
|
||
app_name = 'eox_core' # pylint: disable=invalid-name | ||
|
||
|
||
urlpatterns = [ # pylint: disable=invalid-name | ||
re_path(r'^v1/', include('eox_core.api.data.data_collector.v1.urls', namespace='eox-data-api-collector-v1')), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
""" | ||
Utility functions for report generation, including query execution | ||
and integration with the Shipyard API. | ||
""" | ||
|
||
import yaml | ||
from django.db import connection | ||
import requests | ||
from django.conf import settings | ||
from datetime import datetime | ||
import logging | ||
|
||
from eox_core.utils import get_access_token | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def execute_query(sql_query): | ||
""" | ||
Execute a raw SQL query and return the results in a structured format. | ||
Args: | ||
sql_query (str): The raw SQL query to execute. | ||
Returns: | ||
list or dict: Structured query results. | ||
""" | ||
with connection.cursor() as cursor: | ||
cursor.execute(sql_query) | ||
rows = cursor.fetchall() | ||
# If the query returns more than one column, return rows as is. | ||
if cursor.description: | ||
columns = [col[0] for col in cursor.description] | ||
if len(columns) == 1: | ||
return [row[0] for row in rows] # Return single-column results as a list | ||
return [dict(zip(columns, row)) for row in rows] # Multi-column results as a list of dicts | ||
return rows | ||
|
||
|
||
def serialize_data(data): | ||
""" | ||
Recursively serialize data, converting datetime objects to strings. | ||
Args: | ||
data (dict or list): The data to serialize. | ||
Returns: | ||
dict or list: The serialized data with datetime objects as strings. | ||
""" | ||
if isinstance(data, dict): | ||
return {key: serialize_data(value) for key, value in data.items()} | ||
elif isinstance(data, list): | ||
return [serialize_data(item) for item in data] | ||
elif isinstance(data, datetime): | ||
return data.isoformat() | ||
return data | ||
|
||
|
||
def post_data_to_api(api_url, report_data, token_generation_url, current_host): | ||
""" | ||
Sends the generated report data to the Shipyard API. | ||
Args: | ||
report_data (dict): The data to be sent to the Shipyard API. | ||
Raises: | ||
Exception: If the API request fails. | ||
""" | ||
token = get_access_token( | ||
token_generation_url, | ||
settings.EOX_CORE_SAVE_DATA_API_CLIENT_ID, | ||
settings.EOX_CORE_SAVE_DATA_API_CLIENT_SECRET, | ||
) | ||
headers = { | ||
"Authorization": f"Bearer {token}", | ||
"Content-Type": "application/json", | ||
} | ||
payload = {"instance_domain":current_host, "data": report_data} | ||
response = requests.post(api_url, json=payload, headers=headers) | ||
|
||
if not response.ok: | ||
raise Exception(f"Failed to post data to Shipyard API: {response.content}") |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from rest_framework import serializers | ||
|
||
class DataCollectorSerializer(serializers.Serializer): | ||
""" | ||
Serializer for the DataCollectorView API. | ||
Validates the incoming payload for the data collection endpoint. | ||
Fields: | ||
query_file_content (str): The content of the query file in YAML format. | ||
query_file_url (str): A public URL pointing to the query file. | ||
destination_url (str): The URL where the results should be sent. | ||
""" | ||
query_file_content = serializers.CharField( | ||
required=False, | ||
allow_blank=True, | ||
help_text="Content of the query file in YAML format." | ||
) | ||
query_file_url = serializers.URLField( | ||
required=False, | ||
allow_blank=True, | ||
help_text="Public URL pointing to the query file." | ||
) | ||
destination_url = serializers.URLField( | ||
required=True, | ||
help_text="The API endpoint where the results will be sent." | ||
) | ||
token_generation_url = serializers.URLField( | ||
required=True, | ||
help_text="The API endpoint where the results will be sent." | ||
) | ||
|
||
def validate(self, data): | ||
""" | ||
Custom validation to ensure either 'query_file_content' or 'query_file_url' is provided. | ||
Args: | ||
data (dict): The validated data. | ||
Returns: | ||
dict: The validated data if valid. | ||
Raises: | ||
serializers.ValidationError: If neither 'query_file_content' nor 'query_file_url' is provided. | ||
""" | ||
if not data.get("query_file_content") and not data.get("query_file_url"): | ||
raise serializers.ValidationError( | ||
"Either 'query_file_content' or 'query_file_url' must be provided." | ||
) | ||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
"""_""" | ||
from django.urls import path | ||
from eox_core.api.data.data_collector.v1.views import DataCollectorView | ||
|
||
app_name = "data_collector" | ||
|
||
urlpatterns = [ | ||
path("collect-data/", DataCollectorView.as_view(), name="collect_data"), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import logging | ||
import requests | ||
import yaml | ||
from rest_framework.views import APIView | ||
from rest_framework.response import Response | ||
from rest_framework import status, permissions | ||
from django.conf import settings | ||
from eox_core.api.data.data_collector.tasks import generate_report | ||
|
||
from rest_framework.permissions import BasePermission | ||
from rest_framework.authentication import get_authorization_header | ||
from django.conf import settings | ||
from eox_core.api.data.data_collector.v1.serializers import DataCollectorSerializer | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class IsGitHubAction(BasePermission): | ||
""" | ||
Permission class to allow access only if the request contains a valid GitHub Action token. | ||
""" | ||
def has_permission(self, request, view): | ||
auth_header = get_authorization_header(request).decode('utf-8') | ||
auth_token = settings.EOX_CORE_DATA_COLLECT_AUTH_TOKEN | ||
if auth_header and auth_header == f"Bearer {auth_token}": | ||
return True | ||
return False | ||
|
||
|
||
class DataCollectorView(APIView): | ||
""" | ||
API view to handle data collection requests. | ||
This view: | ||
- Validates input using DataCollectorSerializer. | ||
- Triggers an async task to execute queries and send results to a specified destination. | ||
""" | ||
# Allow JWT Auth | ||
permission_classes = [IsGitHubAction] | ||
|
||
def post(self, request): | ||
""" | ||
Handles POST requests to collect data. | ||
Args: | ||
request (HttpRequest): The incoming request. | ||
Returns: | ||
Response: A success or error message. | ||
""" | ||
serializer = DataCollectorSerializer(data=request.data) | ||
|
||
if serializer.is_valid(): | ||
validated_data = serializer.validated_data | ||
query_file_content = validated_data.get("query_file_content") | ||
query_file_url = validated_data.get("query_file_url") | ||
destination_url = validated_data.get("destination_url") | ||
token_generation_url = validated_data.get("token_generation_url") | ||
current_host = request.get_host() #Remove trailing slash and http | ||
|
||
# If the query file content is not provided, fetch it from the URL | ||
if not query_file_content and query_file_url: | ||
try: | ||
response = requests.get(query_file_url) | ||
if response.status_code == 200: | ||
query_file_content = response.text | ||
else: | ||
return Response( | ||
{"error": "Failed to fetch query file from the provided URL."}, | ||
status=status.HTTP_400_BAD_REQUEST | ||
) | ||
except Exception as e: | ||
return Response( | ||
{"error": f"An error occurred while fetching the query file: {str(e)}"}, | ||
status=status.HTTP_400_BAD_REQUEST | ||
) | ||
|
||
generate_report.delay(destination_url, query_file_content, token_generation_url, current_host) | ||
return Response( | ||
{"message": "Data collection task has been initiated successfully."}, | ||
status=status.HTTP_202_ACCEPTED | ||
) | ||
|
||
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters