diff --git a/metadata_validation_conversion/ontology_improver/tasks.py b/metadata_validation_conversion/ontology_improver/tasks.py index 368b5d2..741c414 100644 --- a/metadata_validation_conversion/ontology_improver/tasks.py +++ b/metadata_validation_conversion/ontology_improver/tasks.py @@ -1,84 +1,134 @@ import json -import requests -# import pandas as pd -from collections import Counter from django.conf import settings from elasticsearch import Elasticsearch, RequestsHttpConnection from metadata_validation_conversion.celery import app -from metadata_validation_conversion.constants import BE_SVC, PROJECTS - -es = Elasticsearch([settings.NODE], connection_class=RequestsHttpConnection, \ - http_auth=(settings.ES_USER, settings.ES_PASSWORD), \ - use_ssl=True, verify_certs=True) - -def type_count(data): - data = data.split(', ') - data = dict(Counter(data)) - return data - -def convertToListOfDict(data): - l = [] - for k in data: - obj = { - 'type': k, - 'count': data[k] - } - l.append(obj) - return l - -def comma_separated_combine(data): - values = [] - for i in data: - i_list = i.split(', ') - values = values + i_list - return ', '.join(values) - -def get_species_for_project(project): - project_filter = json.dumps({ - 'secondaryProject': [project] - }) - url = f'{BE_SVC}/data/organism/_search/?size=10000&filters={project_filter}' - data = requests.get(url).json()['hits']['hits'] - species = filter(lambda y: y, set(map(lambda x: x['_source']['organism']['text'], data))) - species = ', '.join(species) - return species +import itertools + +es = Elasticsearch([settings.NODE], connection_class=RequestsHttpConnection, + http_auth=(settings.ES_USER, settings.ES_PASSWORD), + use_ssl=True, verify_certs=True) + + +# generate field 'type_counts' list of entries based on ontology['type'] +# params: ontology_type: "type": [ "organismPart", "cellType"] +# returns: "type_counts": [{"type": "organismPart", "count": 34}, ...] +def generate_type_counts(ontology_type, type_counts): + for type in ontology_type: + # check if type is found in type_counts list + if any(obj['type'] == type for obj in type_counts): + index = -1 + for i, obj in enumerate(type_counts): + if obj['type'] == type: + index = i + break + type_counts[index]['count'] = type_counts[index]['count'] + 1 + else: + type_counts.append({'type': type, + 'count': 1}) + + +# update created_edited_count +def generate_created_edited_count(activity): + activity['created_edited_count'] = activity['created_edited_count'] + 1 + + +# fetch ES records based on query and index provided +def es_fetch_records(index, filters): + count = 0 + recordset = [] + + while True: + res = es.search(index=index, size=50000, from_=count, + track_total_hits=True, body=json.loads(filters)) + count += 50000 + records = list(map(lambda rec: rec['_source'], res['hits']['hits'])) + recordset += records + + if count > res['hits']['total']['value']: + break + return recordset + + +# fetch organisms (species) associated with project +def fetch_project_species(project): + query = {'query': {'bool': {'filter': [{'terms': {'secondaryProject': [project]}}]}}} + records = es_fetch_records("organism", json.dumps(query)) + species = filter(lambda x: x is not None, + set(map(lambda rec: rec['organism']['text'], records))) + species_str = ', '.join(str(s) for s in species) + return species_str + + +# fetch latest activity for each user form list provided +def get_latest_status_activity(status_activity): + latest_activity_list = [] + sorted_status_activity = sorted(status_activity, key=lambda x: x['timestamp'], reverse=True) + activity_users = set(map(lambda d: d['user'], sorted_status_activity)) + for user in activity_users: + gen = ( + activity for activity in sorted_status_activity + if activity["user"] == user + ) + latest_activity_list.append(next(gen)) + + return latest_activity_list + + +# generate users latest activity for each project +def generate_activity_counts(project): + query = {'query': {'bool': {'filter': [{'terms': {'projects': [project]}}]}}} + records = es_fetch_records("ontologies", json.dumps(query)) + status_activity_list = list( + map(lambda rec: get_latest_status_activity(rec['status_activity']), records)) + + # merge list of lists into one list + status_activity_list = list(itertools.chain.from_iterable(status_activity_list)) + + verified_records = list(filter(lambda d: 'status' in d and d['status'].lower() == 'verified', + status_activity_list)) + needs_improvement_records = list(filter(lambda d: 'status' in d and d['status'].lower() == 'needs improvement', + status_activity_list)) + + return verified_records, needs_improvement_records + @app.task +# Update the summary_ontologies index based on data from the ontologies index def update_ontology_summary(): - # url = f'{BE_SVC}/data/ontologies/_search/?size=10000' - # resultset = requests.get(url).json()['hits']['hits'] - # ontologies = map(lambda ontology: ontology['_source'], resultset) - # df = pd.DataFrame.from_dict(ontologies)[['projects', 'type', 'term']] - # df['type'] = [', '.join(map(str, l)) for l in df['type']] - # df = df.explode('projects') - # df = df.loc[df['projects'].isin(PROJECTS)] - # df = df.groupby(['projects']).agg({ - # 'type': comma_separated_combine, - # 'term': 'count' - # }).reset_index() - # df['type'] = df['type'].apply(type_count) - # # get project-specific species from organisms index - # species = {} - # for project in PROJECTS: - # if project in df['projects'].values: - # species[get_species_for_project(project)] = project - # df['species'] = species - # # get existing summary statistics - # url = f"{BE_SVC}/data/summary_ontologies/_search/?size=10" - # res_data = requests.get(url).json()['hits']['hits'] - # validated_counts = {} - # for record in res_data: - # validated_counts[record['_id']] = record['_source']['activity']['validated_count'] - # # generate update payload - # for index, row in df.iterrows(): - # updated_project_stats = { - # 'project': row['projects'], - # 'species': row['species'], - # 'type_counts': convertToListOfDict(row['type']), - # 'activity': { - # 'created_edited_count': row['term'], - # 'validated_count': validated_counts[row['projects']] if row['projects'] in validated_counts else 0 - # } - # } - # es.index(index='summary_ontologies', id=updated_project_stats['project'], body=updated_project_stats) - return "Success" \ No newline at end of file + query = {"query": + { + "regexp": { + "projects": ".+" + } + } + } + records = es_fetch_records("ontologies", json.dumps(query)) + project_dict = {} + + for rec in records: + projects_list = rec['projects'] + for proj in projects_list: + if proj in project_dict: + # update dict + generate_type_counts(rec['type'], project_dict[proj]['type_counts']) + generate_created_edited_count(project_dict[proj]['activity']) + else: + project_dict[proj] = { + "project": proj, + "species": "", + "type_counts": [], + "activity": {'created_edited_count': 0, 'validated_count': 0, 'downvoted_count': 0} + } + generate_type_counts(rec['type'], project_dict[proj]['type_counts']) + generate_created_edited_count(project_dict[proj]['activity']) + project_dict[proj]['species'] = fetch_project_species(proj) + activity_counts_verified, activity_counts_needs_improvement = generate_activity_counts(proj) + project_dict[proj]['activity']['validated_count'] = len(activity_counts_verified) + project_dict[proj]['activity']['downvoted_count'] = len(activity_counts_needs_improvement) + + # update index + for project in project_dict: + print("updated_project_stats: ", project_dict[project]) + es.index(index='summary_ontologies', id=project, body=project_dict[project]) + + return "Success" diff --git a/metadata_validation_conversion/ontology_improver/views.py b/metadata_validation_conversion/ontology_improver/views.py index fa94da2..d80fd87 100644 --- a/metadata_validation_conversion/ontology_improver/views.py +++ b/metadata_validation_conversion/ontology_improver/views.py @@ -166,6 +166,9 @@ def validate_ontology(request, room_id): if len(res['hits']['hits']) == 0: es.index(index='ontologies', id=new_ontology['key'], body=new_ontology) + # task = update_ontology_summary.s().set(queue='submission') + # task_chain = chain(task) + # task_chain.apply_async() return HttpResponse(status=200) diff --git a/metadata_validation_conversion/ontology_improver_workshop/tasks.py b/metadata_validation_conversion/ontology_improver_workshop/tasks.py index 594f3e6..afeb4f0 100644 --- a/metadata_validation_conversion/ontology_improver_workshop/tasks.py +++ b/metadata_validation_conversion/ontology_improver_workshop/tasks.py @@ -1,84 +1,135 @@ import json -import requests -# import pandas as pd -from collections import Counter from django.conf import settings from elasticsearch import Elasticsearch, RequestsHttpConnection from metadata_validation_conversion.celery import app -from metadata_validation_conversion.constants import BE_SVC, PROJECTS +import itertools es = Elasticsearch([settings.NODE], connection_class=RequestsHttpConnection, \ http_auth=(settings.ES_USER, settings.ES_PASSWORD), \ use_ssl=True, verify_certs=True) -def type_count(data): - data = data.split(', ') - data = dict(Counter(data)) - return data - -def convertToListOfDict(data): - l = [] - for k in data: - obj = { - 'type': k, - 'count': data[k] - } - l.append(obj) - return l - -def comma_separated_combine(data): - values = [] - for i in data: - i_list = i.split(', ') - values = values + i_list - return ', '.join(values) - -def get_species_for_project(project): - project_filter = json.dumps({ - 'secondaryProject': [project] - }) - url = f'{BE_SVC}/data/organism/_search/?size=10000&filters={project_filter}' - data = requests.get(url).json()['hits']['hits'] - species = filter(lambda y: y, set(map(lambda x: x['_source']['organism']['text'], data))) - species = ', '.join(species) - return species + +# generate field 'type_counts' list of entries based on ontology['type'] +# params: ontology_type: "type": [ "organismPart", "cellType"] +# returns: "type_counts": [{"type": "organismPart", "count": 34}, ...] +def generate_type_counts(ontology_type, type_counts): + for type in ontology_type: + # check if type is found in type_counts list + if any(obj['type'] == type for obj in type_counts): + index = -1 + for i, obj in enumerate(type_counts): + if obj['type'] == type: + index = i + break + type_counts[index]['count'] = type_counts[index]['count'] + 1 + else: + type_counts.append({'type': type, + 'count': 1}) + + +# update created_edited_count +def generate_created_edited_count(activity): + activity['created_edited_count'] = activity['created_edited_count'] + 1 + + +# fetch ES records based on query and index provided +def es_fetch_records(index, filters): + count = 0 + recordset = [] + + while True: + res = es.search(index=index, size=50000, from_=count, + track_total_hits=True, body=json.loads(filters)) + count += 50000 + records = list(map(lambda rec: rec['_source'], res['hits']['hits'])) + recordset += records + + if count > res['hits']['total']['value']: + break + return recordset + + +# fetch organisms (species) associated with project +def fetch_project_species(project): + query = {'query': {'bool': {'filter': [{'terms': {'secondaryProject': [project]}}]}}} + records = es_fetch_records("organism", json.dumps(query)) + species = filter(lambda x: x is not None, + set(map(lambda rec: rec['organism']['text'], records))) + species_str = ', '.join(str(s) for s in species) + return species_str + + +# fetch latest activity for each user form list provided +def get_latest_status_activity(status_activity): + latest_activity_list = [] + sorted_status_activity = sorted(status_activity, key=lambda x: x['timestamp'], reverse=True) + activity_users = set(map(lambda d: d['user'], sorted_status_activity)) + for user in activity_users: + gen = ( + activity for activity in sorted_status_activity + if activity["user"] == user + ) + latest_activity_list.append(next(gen)) + + return latest_activity_list + + +# generate users latest activity for each project +def generate_activity_counts(project): + query = {'query': {'bool': {'filter': [{'terms': {'projects': [project]}}]}}} + records = es_fetch_records("ontologies_test", json.dumps(query)) + status_activity_list = list( + map(lambda rec: get_latest_status_activity(rec['status_activity']), records)) + + # merge list of lists into one list + status_activity_list = list(itertools.chain.from_iterable(status_activity_list)) + + verified_records = list(filter(lambda d: 'status' in d and d['status'].lower() == 'verified', + status_activity_list)) + needs_improvement_records = list(filter(lambda d: 'status' in d and d['status'].lower() == 'needs improvement', + status_activity_list)) + + return verified_records, needs_improvement_records + @app.task +# Update the summary_ontologies index based on data from the ontologies index def update_ontology_summary(): - url = f'{BE_SVC}/data/ontologies_test/_search/?size=10000' - # resultset = requests.get(url).json()['hits']['hits'] - # ontologies = map(lambda ontology: ontology['_source'], resultset) - # df = pd.DataFrame.from_dict(ontologies)[['projects', 'type', 'term']] - # df['type'] = [', '.join(map(str, l)) for l in df['type']] - # df = df.explode('projects') - # df = df.loc[df['projects'].isin(PROJECTS)] - # df = df.groupby(['projects']).agg({ - # 'type': comma_separated_combine, - # 'term': 'count' - # }).reset_index() - # df['type'] = df['type'].apply(type_count) - # # get project-specific species from organisms index - # species = {} - # for project in PROJECTS: - # if project in df['projects'].values: - # species[get_species_for_project(project)] = project - # df['species'] = species - # # get existing summary statistics - # url = f"{BE_SVC}/data/summary_ontologies_test/_search/?size=10" - # res_data = requests.get(url).json()['hits']['hits'] - # validated_counts = {} - # for record in res_data: - # validated_counts[record['_id']] = record['_source']['activity']['validated_count'] - # # generate update payload - # for index, row in df.iterrows(): - # updated_project_stats = { - # 'project': row['projects'], - # 'species': row['species'], - # 'type_counts': convertToListOfDict(row['type']), - # 'activity': { - # 'created_edited_count': row['term'], - # 'validated_count': validated_counts[row['projects']] if row['projects'] in validated_counts else 0 - # } - # } - # es.index(index='summary_ontologies_test', id=updated_project_stats['project'], body=updated_project_stats) - return "Success" \ No newline at end of file + query = {"query": + { + "regexp": { + "projects": ".+" + } + } + } + records = es_fetch_records("ontologies_test", json.dumps(query)) + project_dict = {} + + for rec in records: + projects_list = rec['projects'] + for proj in projects_list: + if proj in project_dict: + # update dict + generate_type_counts(rec['type'], project_dict[proj]['type_counts']) + generate_created_edited_count(project_dict[proj]['activity']) + else: + project_dict[proj] = { + "project": proj, + "species": "", + "type_counts": [], + "activity": {'created_edited_count': 0, 'validated_count': 0, 'downvoted_count': 0} + } + generate_type_counts(rec['type'], project_dict[proj]['type_counts']) + generate_created_edited_count(project_dict[proj]['activity']) + project_dict[proj]['species'] = fetch_project_species(proj) + activity_counts_verified, activity_counts_needs_improvement = generate_activity_counts(proj) + project_dict[proj]['activity']['validated_count'] = len(activity_counts_verified) + project_dict[proj]['activity']['downvoted_count'] = len(activity_counts_needs_improvement) + + # update index + for project in project_dict: + print("updated_project_stats: ", project_dict[project]) + print("updated_project_stats: ", project_dict[project]) + es.index(index='summary_ontologies_test', id=project, body=project_dict[project]) + + return "Success" diff --git a/metadata_validation_conversion/submission/tasks.py b/metadata_validation_conversion/submission/tasks.py index e89b8a8..28583f2 100644 --- a/metadata_validation_conversion/submission/tasks.py +++ b/metadata_validation_conversion/submission/tasks.py @@ -22,7 +22,7 @@ from .helpers import get_credentials from celery import Task from django.conf import settings -# from deepdiff import DeepDiff +from deepdiff import DeepDiff from django.core import mail from django.template.loader import render_to_string from django.utils.html import strip_tags @@ -471,11 +471,11 @@ def save_submission_data(root, submission_type, room_id, action): study_obj['subscribers'] = existing_doc['subscribers'] # email subscribers - # deepdiff_obj = DeepDiff(existing_doc, study_obj) - # if deepdiff_obj: - # subscriber_emails = [ele['email'] for ele in existing_doc['subscribers']] - # for email in subscriber_emails: - # send_user_email(study_obj['study_id'], email) + deepdiff_obj = DeepDiff(existing_doc, study_obj) + if deepdiff_obj: + subscriber_emails = [ele['email'] for ele in existing_doc['subscribers']] + for email in subscriber_emails: + send_user_email(study_obj['study_id'], email) es.index(index='submissions', id=study_obj['study_id'], body=study_obj)