diff --git a/.github/workflows/deploy-app.yaml b/.github/workflows/deploy-app.yaml index 0cc6eb4..267b199 100644 --- a/.github/workflows/deploy-app.yaml +++ b/.github/workflows/deploy-app.yaml @@ -34,6 +34,6 @@ jobs: - name: Add network policies run: | - cf add-network-policy gdrive outbound-proxy -s ${{ steps.cf-setup.outputs.target-environment }}-public --protocol tcp --port 8080 - cf add-network-policy gdrive es-proxy -s ${{ steps.cf-setup.outputs.target-environment }} --protocol tcp --port 8080 - cf add-network-policy gdrive qualtrix -s ${{ steps.cf-setup.outputs.target-environment }} --protocol tcp --port 8080 \ No newline at end of file + cf add-network-policy util outbound-proxy -s prod-public --protocol tcp --port 8080 + cf add-network-policy util es-proxy -s prod --protocol tcp --port 8080 + cf add-network-policy util qualtrix -s prod --protocol tcp --port 8080 \ No newline at end of file diff --git a/gdrive/api.py b/gdrive/api.py index 2e19aa8..9518651 100644 --- a/gdrive/api.py +++ b/gdrive/api.py @@ -53,13 +53,18 @@ async def upload_file( parent = drive_client.create_folder(id, settings.ROOT_DIRECTORY) if zip: - with zipfile.ZipFile(stream) as archive: - files = archive.filelist - for file in files: - image = io.BytesIO(archive.read(file)) - drive_client.upload_basic( - f"{filename}_{file.filename}", parent, image - ) + try: + with zipfile.ZipFile(stream) as archive: + files = archive.filelist + for file in files: + image = io.BytesIO(archive.read(file)) + client.upload_basic( + f"{filename}_{file.filename}", parent, image + ) + except zipfile.BadZipFile as error: + client.upload_basic(filename, parent, stream) + log.error(f"An error occurred: {error}") + response.status_code = status.HTTP_400_BAD_REQUEST else: drive_client.upload_basic(filename, parent, stream) @@ -85,3 +90,27 @@ async def delete_file(filename, response: Response): except HttpError as error: log.error(f"An error occurred: {error}") response.status_code = error.status_code + + +@router.get("/list") +async def list(): + """ """ + s = client.list(count=200, parent="0AFrX3czp_UwZUk9PVA") + print(s) + + +@router.get("/count") +async def list(): + """ """ + s = drive_client.count(parent="1dz8UklyVsBDLP0wC5HPVSOuKpYGqrNEI") + print(s) + + +@router.post("/move") +async def move_file(file: str, source: str, dest: str): + return drive_client.move(file, source, dest) + + +@router.post("/create_folder") +async def move_file(name: str, dest: str): + return drive_client.create_folder(name, dest) diff --git a/gdrive/drive_client.py b/gdrive/drive_client.py index c5e59c8..937938b 100644 --- a/gdrive/drive_client.py +++ b/gdrive/drive_client.py @@ -2,11 +2,13 @@ import logging import json import mimetypes +import time from typing import List from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseUpload +from googleapiclient.errors import HttpError from gdrive import settings, error @@ -30,14 +32,17 @@ def init(): log.info(f"Connected to Root Directory {driveId}") -def list(count: int = 10, shared: bool = True) -> None: +def list(count: int = 10, shared: bool = True, parent: str | None = None) -> None: """ Prints the names and ids of the first files the user has access to. """ + mq = f"'{parent}' in parents" if parent else "" + results = ( service.files() .list( + q=mq, pageSize=count, fields="*", supportsAllDrives=shared, @@ -63,6 +68,37 @@ def list(count: int = 10, shared: bool = True) -> None: log.info(f"No such key: {error} in {item}") +def count(shared: bool = True, parent: str | None = None) -> None: + """ + Prints the names and ids of the first files the user has access to. + """ + + mq = f"'{parent}' in parents" if parent else "" + + results = __count(mq, "", shared) + count = len(results["files"]) + + while results.get("nextPageToken", None): + results = __count(mq, results["nextPageToken"], shared) + count += len(results["files"]) + return count + + +def __count(query, nextPageToken, shared): + results = ( + service.files() + .list( + q=query, + pageSize=1000, + supportsAllDrives=shared, + includeItemsFromAllDrives=shared, + pageToken=nextPageToken, + ) + .execute() + ) + return results + + def create_empty_spreadsheet(filename: str, parent_id: str) -> str: file_metadata = { "name": filename, @@ -180,17 +216,31 @@ def get_files_by_drive_id(filename: str, drive_id: str): Get list of files by filename """ - results = ( - service.files() - .list( - q=f"name = '{filename}'", - corpora="drive", - driveId=drive_id, - includeTeamDriveItems=True, - supportsTeamDrives=True, + try: + results = ( + service.files() + .list( + q=f"name = '{filename}'", + corpora="drive", + driveId=drive_id, + includeTeamDriveItems=True, + supportsTeamDrives=True, + ) + .execute() + ) + except HttpError: # manual retry hack + time.sleep(2) + results = ( + service.files() + .list( + q=f"name = '{filename}'", + corpora="drive", + driveId=drive_id, + includeTeamDriveItems=True, + supportsTeamDrives=True, + ) + .execute() ) - .execute() - ) return results["files"] @@ -202,17 +252,35 @@ def get_files_in_folder(id: str) -> List: files = [] page_token = None while True: - results = ( - service.files() - .list( - q=f"'{id}' in parents and trashed=false", - supportsAllDrives=True, - includeItemsFromAllDrives=True, - fields="nextPageToken, files(*)", - pageToken=page_token, + try: + results = ( + service.files() + .list( + q=f"'{id}' in parents and trashed=false", + pageSize=1000, + supportsAllDrives=True, + includeItemsFromAllDrives=True, + fields="nextPageToken, files(*)", + pageToken=page_token, + orderBy="createdTime", + ) + .execute() + ) + except HttpError: # manual retry hack + time.sleep(2) + results = ( + service.files() + .list( + q=f"'{id}' in parents and trashed=false", + pageSize=1000, + supportsAllDrives=True, + includeItemsFromAllDrives=True, + fields="nextPageToken, files(*)", + pageToken=page_token, + orderBy="createdTime", + ) + .execute() ) - .execute() - ) files.extend(results.get("files", [])) page_token = results.get("nextPageToken") if not page_token: @@ -230,3 +298,73 @@ def delete_file(id: str) -> None: def export(id: str) -> any: return service.files().get_media(fileId=id).execute() + + +def copy(fileId: str, new_location: str): + new_file = {"parents": [new_location]} + + result = ( + service.files() + .copy(body=new_file, fileId=fileId, supportsAllDrives=True) + .execute() + ) + + return result + + +def search(responseId: str, source_drive: str): + """ + Prints the names and ids of the first files the user has access to. + """ + + mq = f"fullText contains '{responseId}' and name = 'analytics.json'" + + results = ( + service.files() + .list( + q=mq, + driveId=source_drive, + corpora="drive", + fields="*", + supportsAllDrives=True, + includeItemsFromAllDrives=True, + ) + .execute() + ) + items = results.get("files", []) + return items + + +def exists(filename: str, parent: str): + # , source_drive: str + """ + does file exist? + """ + + results = ( + service.files() + .list( + q=f"'{parent}' in parents and name = '{filename}'", + # driveId=source_drive, + # corpora='drive', + fields="*", + supportsAllDrives=True, + includeItemsFromAllDrives=True, + ) + .execute() + ) + # return results["files"] + items = results.get("files", []) + return items + + +def move(fileId: str, source: str, dest: str): + result = ( + service.files() + .update( + addParents=dest, removeParents=source, fileId=fileId, supportsAllDrives=True + ) + .execute() + ) + + return result diff --git a/gdrive/export_api.py b/gdrive/export_api.py index 4de53f6..d26d0ff 100644 --- a/gdrive/export_api.py +++ b/gdrive/export_api.py @@ -2,14 +2,17 @@ gdrive rest api """ +from datetime import datetime import io import json import logging import sys +import time import fastapi from pydantic import BaseModel, Field from fastapi import BackgroundTasks, responses +import requests from gdrive import export_client, drive_client, sheets_client, settings, error from gdrive.database import database, crud, models @@ -19,16 +22,28 @@ router = fastapi.APIRouter() +@router.get("/dead") +async def dead(interationId): + export_client.export_dead() + + @router.post("/export") async def upload_file(interactionId): - log.info(f"Export interaction {interactionId}") - export_data = export_client.export(interactionId) - export_bytes = io.BytesIO( - export_client.codename(json.dumps(export_data, indent=2)).encode() - ) parent = drive_client.create_folder(interactionId, settings.ROOT_DIRECTORY) - drive_client.upload_basic("analytics.json", parent, export_bytes) - log.info(f"Uploading {sys.getsizeof(export_bytes)} bytes to drive folder {parent}") + files = drive_client.exists("analytics.json", parent) + if files != []: + log.warn(f"analytics.json for {interactionId} already exists in {parent}") + else: + log.info(f"Export interaction {interactionId}") + export_data = export_client.export(interactionId) + export_bytes = io.BytesIO( + export_client.codename(json.dumps(export_data, indent=2)).encode() + ) + + drive_client.upload_basic("analytics.json", parent, export_bytes) + log.info( + f"Uploading {sys.getsizeof(export_bytes)} bytes to drive folder {parent}" + ) class ParticipantModel(BaseModel): @@ -66,7 +81,53 @@ async def survey_upload_response( ) -async def survey_upload_response_task(request): +@router.post("/manual-survey-export") +async def manual_survey_upload_response( + responseId: str, surveyId: str, background_tasks: BackgroundTasks +): + request = SurveyParticipantModel(surveyId=surveyId, responseId=responseId) + await survey_upload_response_task(request, fetchParticipantInfo=True) + + +@router.post("/bulk-citr-export") +async def bulk_citer_export(): + i = 0 + j = len(ids) + for id in ids: + i += 1 + log.info(f" {i} / {j} {id}") + + # survey export + # request = SurveyParticipantModel(surveyId="SV_9H7s2QQiAWFpQIS", responseId=id) + # await survey_upload_response_task(request) + + # citer export + await citer_export(id) + + +@router.post("/citr-export/{responseId}") +async def citer_export(responseId): + citer_folder = "1Zk7xFackMpN1-4Nl8x1M9GuI4peC5Ymv" + source_drive = "0AFrX3czp_UwZUk9PVA" + + files = drive_client.search(responseId, source_drive) + + # folder_name = datetime.strftime( datetime.now(),'%m/%d') + folder_name = "incompletes" + + parent = "1dz8UklyVsBDLP0wC5HPVSOuKpYGqrNEI" # drive_client.create_folder(folder_name, citer_folder) + + if len(files) == 0: + log.warning(f"no analytics for {responseId}") + else: + responseId_folder = drive_client.create_folder(responseId, parent) + + for file in files: + new_file = drive_client.copy(file["id"], responseId_folder) + # TimeoutError + + +async def survey_upload_response_task(request, fetchParticipantInfo=False): """ Background task that handles qualtrics response fetching and exporting """ @@ -88,30 +149,42 @@ async def survey_upload_response_task(request): # throws exception in get_qualtrics_response survey_resp = response["response"] - if request.participant: - participant = request.participant - upload_result = sheets_client.upload_participant( - participant.first, - participant.last, - participant.email, - request.responseId, - participant.time, - participant.date, - survey_resp["ethnicity"], - ", ".join( - survey_resp["race"] - ), # Can have more than one value in a list - survey_resp["gender"], - survey_resp["age"], - survey_resp["income"], - survey_resp["skin_tone"], - ) - - result_sheet_id = upload_result["spreadsheetId"] - if upload_result: - log.info( - f"Uploaded response: {request.responseId} to completions spreadsheet {result_sheet_id}" + if request.participant or fetchParticipantInfo: + if fetchParticipantInfo: + pdict = export_client.get_qualtrics_contact(survey_resp["contactId"])[ + "result" + ] + participant = ParticipantModel( + first=pdict["firstName"], + last=pdict["lastName"], + email=pdict["email"], + time=pdict["embeddedData"]["time"], + date=pdict["embeddedData"]["Date"], ) + else: + participant = request.participant + upload_result = sheets_client.upload_participant( + participant.first, + participant.last, + participant.email, + request.responseId, + participant.time, + participant.date, + survey_resp["ethnicity"], + ", ".join( + survey_resp["race"] + ), # Can have more than one value in a list + survey_resp["gender"], + survey_resp["age"], + survey_resp["income"], + survey_resp["skin_tone"], + ) + + result_sheet_id = upload_result["spreadsheetId"] + if upload_result: + log.info( + f"Uploaded response: {request.responseId} to completions spreadsheet {result_sheet_id}" + ) crud.create_participant( models.ParticipantModel( diff --git a/gdrive/export_client.py b/gdrive/export_client.py index 03df509..ed97b2b 100644 --- a/gdrive/export_client.py +++ b/gdrive/export_client.py @@ -1,6 +1,7 @@ import logging import json import re +from types import SimpleNamespace import requests from opensearchpy import OpenSearch @@ -151,7 +152,7 @@ def export_response(responseId, survey_response): ) results_update = es.update_by_query( - index="_all", body=query_response_data, refresh=True + index="_all", body=query_response_data, refresh=True, scroll="18s" ) return list(map(lambda id: id["match"]["interactionId"], interactionIds_match)) @@ -174,6 +175,91 @@ def get_qualtrics_response(surveyId: str, responseId: str): return r.json() +def get_qualtrics_contact(contactId: str): + url = f"http://{settings.QUALTRICS_APP_URL}:{settings.QUALTRICS_APP_PORT}/contact/{contactId}" + + r = requests.post( + url, + timeout=30, # qualtrics microservice retries as it waits for response to become available + ) + if r.status_code != 200: + raise error.ExportError(f"No contact response found for contactId: {contactId}") + return r.json() + + +def export_dead(): + es = OpenSearch( + hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300 + ) + + query_interactionId = { + # "size": 1000, + "query": { + "bool": { + "must": [ + { + "match_phrase": { + "properties.outcomeType.value": "survey_response" + } + }, + {"match": {"properties.outcomeDescription.value": "N/A"}}, + ] + } + }, + "_source": ["interactionId"], + } + + results_interacitonId = es.search( + body=json.dumps(query_interactionId), index="_all" + ) + + interactionIds_match = list( + map( + lambda res: { + "match": {"interactionId": f'{res["_source"]["interactionId"]}'} + }, + results_interacitonId["hits"]["hits"], + ) + ) + + query_interactionId = { + # "size": 1000, + "query": { + "bool": { + "must": [ + {"match_phrase": {"properties.outcomeType.value": "survey_data"}} + ], + "should": interactionIds_match, + } + } + } + + responseIds_result = es.search(body=json.dumps(query_interactionId), index="_all") + + for item in responseIds_result["hits"]["hits"]: + survey_data = json.loads( + item["_source"]["properties"]["outcomeDescription"]["value"] + ) + responseId = survey_data["responseId"] + + # query qaultrix for suvery result + # if present and or > 24 hours + # run exoport on the data + + print("hello") + + # if len(interactionIds_match) == 0: + # raise error.ExportError( + # f"No flow interactionId match for responseId: {responseId}" + # ) + + # results_update = es.update_by_query( + # index="_all", body=query_response_data, refresh=True + # ) + + # return list(map(lambda id: id["match"]["interactionId"], interactionIds_match)) + + def get_all_InteractionIds(responseId): es = OpenSearch( hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300 diff --git a/gdrive/main.py b/gdrive/main.py index b116022..095eddc 100644 --- a/gdrive/main.py +++ b/gdrive/main.py @@ -4,9 +4,13 @@ import fastapi import starlette_prometheus +import logging +from . import api, export_api, analytics_api, debug_api, settings -from . import api, export_api, analytics_api, settings + +log = logging.getLogger() +log.addHandler(logging.FileHandler("errors.log")) app = fastapi.FastAPI() @@ -16,3 +20,4 @@ app.include_router(api.router) app.include_router(export_api.router) app.include_router(analytics_api.router) +app.include_router(debug_api.router) diff --git a/gdrive/settings.py b/gdrive/settings.py index e45d42e..16ed995 100644 --- a/gdrive/settings.py +++ b/gdrive/settings.py @@ -21,9 +21,7 @@ "https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/spreadsheets", ] - -SERVICE_ACCOUNT_FILE = "credentials.json" - +SERVICE_ACCOUNT_FILE = "../identity-idva-utils/secrets/gdrive/credentials-prod.json" ROOT_DIRECTORY = "" CODE_NAMES = None CREDENTIALS = None @@ -31,11 +29,11 @@ ANALYTICS_PROPERTY_ID = None ANALYTICS_CREDENTIALS = None -ES_HOST = os.getenv("ES_HOST") -ES_PORT = os.getenv("ES_PORT") +ES_HOST = os.getenv("ES_HOST", "localhost") +ES_PORT = os.getenv("ES_PORT", "8090") -QUALTRICS_APP_URL = os.getenv("QUALTRICS_APP_URL") -QUALTRICS_APP_PORT = os.getenv("QUALTRICS_APP_PORT") +QUALTRICS_APP_URL = os.getenv("QUALTRICS_APP_URL", "localhost") +QUALTRICS_APP_PORT = os.getenv("QUALTRICS_APP_PORT", "8060") RAW_COMPLETIONS_SHEET_NAME = os.getenv("GDRIVE_RAW_COMPLETIONS_SHEET_NAME", "Sheet1") @@ -48,15 +46,15 @@ config = {} if vcap_services: user_services = json.loads(vcap_services)["user-provided"] - DB_URI = json.loads(vcap_services)["aws-rds"][0]["credentials"]["uri"] - # Sqlalchemy requires 'postgresql' as the protocol - DB_URI = DB_URI.replace("postgres://", "postgresql://", 1) for service in user_services: if service["name"] == "gdrive": log.info("Loading credentials from env var") config = service["credentials"] break + db = json.loads(vcap_services)["aws-rds"][0]["credentials"]["uri"] + # Sqlalchemy requires 'postgresql' as the protocol + DB_URI = db.replace("postgres://", "postgresql://", 1) else: with open(SERVICE_ACCOUNT_FILE) as file: log.info("Loading credentials from creds file") @@ -72,4 +70,4 @@ except (json.JSONDecodeError, KeyError, FileNotFoundError) as err: log.warning("Unable to load credentials from VCAP_SERVICES") - log.debug("Error: %s", str(err)) + log.error("Error: %s", str(err)) diff --git a/requirements.txt b/requirements.txt index 54279f9..1e52f61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,6 @@ pandas==2.2.2 sqlalchemy==1.4.* psycopg2==2.9.9 alembic==1.13.1 +wheel +gspread +jinja2 \ No newline at end of file