Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Data Collector API #314

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Please do not update the unreleased notes.

<!-- Content should be placed here -->

## [v11.2.0](https://github.com/eduNEXT/eox-core/compare/v11.1.0...v11.2.0) - (2025-01-20)

### Added

- API to collect data and generate reports.

## [v11.1.0](https://github.com/eduNEXT/eox-core/compare/v11.0.0...v11.1.0) - (2024-11-21)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion eox_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
Init for main eox-core app
"""
__version__ = '11.1.0'
__version__ = '11.2.0'
Empty file.
73 changes: 73 additions & 0 deletions eox_core/api/data/data_collector/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Async task for generating reports by executing database queries
and posting the results to the Shipyard API.
"""

from celery import shared_task, Task
from eox_core.api.data.data_collector.utils import execute_query, post_data_to_api, serialize_data, process_query_results
import yaml
import logging

logger = logging.getLogger(__name__)


class ReportTask(Task):
"""
Custom task class to handle report generation with an on_failure hook.
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
Called when the task has exhausted all retries.

Args:
exc (Exception): The exception raised.
task_id (str): The ID of the failed task.
args (tuple): The positional arguments for the task.
kwargs (dict): The keyword arguments for the task.
einfo (ExceptionInfo): Exception information.
"""
logger.error(f"Task {task_id} failed after retries. Exception: {exc}. Could not collect data.")


@shared_task(bind=True)
def generate_report(self, destination_url, query_file_content, token_generation_url, current_host):
"""
Async task to generate a report:
1. Reads queries from the provided query file content.
2. Executes each query against the database.
3. Sends the results to the Shipyard API.

Args:
self (Task): The Celery task instance.
query_file_content (str): The content of the query file in YAML format.

Raises:
Retry: If an error occurs, the task retries up to 3 times with a 60-second delay.
"""
try:
queries = yaml.safe_load(query_file_content).get("queries", [])
if not queries:
logger.warning("No queries found in the provided file. Task will exit.")
return

report_data = {}
for query in queries:
query_name = query.get("name")
query_sql = query.get("query")
logger.info(f"Executing query: {query_name}")
try:
result = execute_query(query_sql)

serialized_result = serialize_data(result)
processed_result = process_query_results(serialized_result)
report_data[query_name] = processed_result
except Exception as e:
logger.error(f"Failed to execute query '{query_name}': {e}")
continue

post_data_to_api(destination_url, report_data, token_generation_url, current_host)

logger.info("Report generation task completed successfully.")
except Exception as e:
logger.error(f"An error occurred in the report generation task: {e}. Retrying")
raise self.retry(exc=e, countdown=60, max_retries=3)
11 changes: 11 additions & 0 deletions eox_core/api/data/data_collector/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
URLs for the Microsite API
"""
from django.urls import include, re_path

app_name = 'eox_core' # pylint: disable=invalid-name


urlpatterns = [ # pylint: disable=invalid-name
re_path(r'^v1/', include('eox_core.api.data.data_collector.v1.urls', namespace='eox-data-api-collector-v1')),
]
98 changes: 98 additions & 0 deletions eox_core/api/data/data_collector/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Utility functions for report generation, including query execution
and integration with the Shipyard API.
"""

import yaml
from django.db import connection
import requests
from django.conf import settings
from datetime import datetime
import logging

from eox_core.utils import get_access_token

logger = logging.getLogger(__name__)


def execute_query(sql_query):
"""
Execute a raw SQL query and return the results in a structured format.

Args:
sql_query (str): The raw SQL query to execute.

Returns:
list or dict: Structured query results.
"""
with connection.cursor() as cursor:
cursor.execute(sql_query)
rows = cursor.fetchall()
# If the query returns more than one column, return rows as is.
if cursor.description:
columns = [col[0] for col in cursor.description]
if len(columns) == 1:
return [row[0] for row in rows] # Return single-column results as a list
return [dict(zip(columns, row)) for row in rows] # Multi-column results as a list of dicts
return rows


def serialize_data(data):
"""
Recursively serialize data, converting datetime objects to strings.

Args:
data (dict or list): The data to serialize.

Returns:
dict or list: The serialized data with datetime objects as strings.
"""
if isinstance(data, dict):
return {key: serialize_data(value) for key, value in data.items()}
elif isinstance(data, list):
return [serialize_data(item) for item in data]
elif isinstance(data, datetime):
return data.isoformat()
return data


def process_query_results(raw_result):
"""
Process the raw result of a query.

Args:
raw_result: The result from the SQL query (list, scalar, or dictionary).

Returns:
The processed result, extracting scalar values from single-item lists,
or returning the original value for more complex data structures.
"""
if isinstance(raw_result, list) and len(raw_result) == 1:
return raw_result[0]
return raw_result


def post_data_to_api(api_url, report_data, token_generation_url, current_host):
"""
Sends the generated report data to the Shipyard API.

Args:
report_data (dict): The data to be sent to the Shipyard API.

Raises:
Exception: If the API request fails.
"""
token = get_access_token(
token_generation_url,
settings.EOX_CORE_SAVE_DATA_API_CLIENT_ID,
settings.EOX_CORE_SAVE_DATA_API_CLIENT_SECRET,
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
payload = {"instance_domain":current_host, "data": report_data}
response = requests.post(api_url, json=payload, headers=headers)

if not response.ok:
raise Exception(f"Failed to post data to Shipyard API: {response.content}")
Empty file.
50 changes: 50 additions & 0 deletions eox_core/api/data/data_collector/v1/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from rest_framework import serializers

class DataCollectorSerializer(serializers.Serializer):
"""
Serializer for the DataCollectorView API.

Validates the incoming payload for the data collection endpoint.

Fields:
query_file_content (str): The content of the query file in YAML format.
query_file_url (str): A public URL pointing to the query file.
destination_url (str): The URL where the results should be sent.
"""
query_file_content = serializers.CharField(
required=False,
allow_blank=True,
help_text="Content of the query file in YAML format."
)
query_file_url = serializers.URLField(
required=False,
allow_blank=True,
help_text="Public URL pointing to the query file."
)
destination_url = serializers.URLField(
required=True,
help_text="The API endpoint where the results will be sent."
)
token_generation_url = serializers.URLField(
required=True,
help_text="The API endpoint where the results will be sent."
)

def validate(self, data):
"""
Custom validation to ensure either 'query_file_content' or 'query_file_url' is provided.

Args:
data (dict): The validated data.

Returns:
dict: The validated data if valid.

Raises:
serializers.ValidationError: If neither 'query_file_content' nor 'query_file_url' is provided.
"""
if not data.get("query_file_content") and not data.get("query_file_url"):
raise serializers.ValidationError(
"Either 'query_file_content' or 'query_file_url' must be provided."
)
return data
9 changes: 9 additions & 0 deletions eox_core/api/data/data_collector/v1/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""_"""
from django.urls import path
from eox_core.api.data.data_collector.v1.views import DataCollectorView

app_name = "data_collector"

urlpatterns = [
path("collect-data/", DataCollectorView.as_view(), name="collect_data"),
]
84 changes: 84 additions & 0 deletions eox_core/api/data/data_collector/v1/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging
import requests
import yaml
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from django.conf import settings
from eox_core.api.data.data_collector.tasks import generate_report

from rest_framework.permissions import BasePermission
from rest_framework.authentication import get_authorization_header
from django.conf import settings
from eox_core.api.data.data_collector.v1.serializers import DataCollectorSerializer

logger = logging.getLogger(__name__)


class IsGitHubAction(BasePermission):
"""
Permission class to allow access only if the request contains a valid GitHub Action token.
"""
def has_permission(self, request, view):
auth_header = get_authorization_header(request).decode('utf-8')
auth_token = settings.EOX_CORE_DATA_COLLECT_AUTH_TOKEN
if auth_header and auth_header == f"Bearer {auth_token}":
return True
return False


class DataCollectorView(APIView):
"""
API view to handle data collection requests.

This view:
- Validates input using DataCollectorSerializer.
- Triggers an async task to execute queries and send results to a specified destination.
"""
# Allow JWT Auth
permission_classes = [IsGitHubAction]

def post(self, request):
"""
Handles POST requests to collect data.

Args:
request (HttpRequest): The incoming request.

Returns:
Response: A success or error message.
"""
serializer = DataCollectorSerializer(data=request.data)

if serializer.is_valid():
validated_data = serializer.validated_data
query_file_content = validated_data.get("query_file_content")
query_file_url = validated_data.get("query_file_url")
destination_url = validated_data.get("destination_url")
token_generation_url = validated_data.get("token_generation_url")
current_host = request.get_host() #Remove trailing slash and http

# If the query file content is not provided, fetch it from the URL
if not query_file_content and query_file_url:
try:
response = requests.get(query_file_url)
if response.status_code == 200:
query_file_content = response.text
else:
return Response(
{"error": "Failed to fetch query file from the provided URL."},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{"error": f"An error occurred while fetching the query file: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST
)

generate_report.delay(destination_url, query_file_content, token_generation_url, current_host)
return Response(
{"message": "Data collection task has been initiated successfully."},
status=status.HTTP_202_ACCEPTED
)

return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
1 change: 1 addition & 0 deletions eox_core/api/data/v1/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
urlpatterns = [ # pylint: disable=invalid-name
re_path(r'^v1/', include((ROUTER.urls, 'eox_core'), namespace='eox-data-api-v1')),
re_path(r'^v1/tasks/(?P<task_id>.*)$', CeleryTasksStatus.as_view(), name="celery-data-api-tasks"),
re_path(r'^', include('eox_core.api.data.data_collector.urls', namespace='eox-data-api-collector')),
]
28 changes: 28 additions & 0 deletions eox_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import hashlib
import re
import requests

from django.conf import settings
from django.contrib.sites.models import Site
Expand Down Expand Up @@ -165,3 +166,30 @@ def get_or_create_site_from_oauth_app_uris(redirect_uris):
return sites_qs.first()

return Site.objects.create(domain=domain, name=domain)


def get_access_token(
token_url,
client_id,
client_secret,
grant_type="client_credentials"
):
"""
Fetch an access token from a service OAuth2 API.

Returns:
str: The access token.
Raises:
Exception: If the token request fails.
"""
response = requests.post(
token_url,
data={
"grant_type": grant_type,
"client_id": client_id,
"client_secret": client_secret,
},
)
if response.ok:
return response.json().get("access_token")
raise Exception("Failed to obtain access token for API.")
Loading
Loading