diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 3deb6237..848493b6 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -26,12 +26,16 @@ from graphene.types.generic import GenericScalar from cylc.flow.id import Tokens +from cylc.flow.rundb import CylcWorkflowDAO +from cylc.flow.pathutil import get_workflow_run_dir +from cylc.flow.workflow_files import WorkflowFiles from cylc.flow.network.schema import ( CyclePoint, GenericResponse, ID, SortArgs, Task, + Job, Mutations, Queries, process_resolver_info, @@ -281,14 +285,12 @@ class Meta: result = GenericScalar() -async def get_jobs(root, info, **kwargs): +async def get_elements(root, info, **kwargs): if kwargs['live']: return await get_nodes_all(root, info, **kwargs) _, field_ids = process_resolver_info(root, info, kwargs) - if hasattr(kwargs, 'id'): - kwargs['ids'] = [kwargs.get('id')] if field_ids: if isinstance(field_ids, str): field_ids = [field_ids] @@ -306,16 +308,13 @@ async def get_jobs(root, info, **kwargs): kwargs['exworkflows'] = [ Tokens(w_id) for w_id in kwargs['exworkflows']] - return await list_jobs(kwargs) + return await list_elements(kwargs) -async def list_jobs(args): +async def list_elements(args): if not args['workflows']: raise Exception('At least one workflow must be provided.') - from cylc.flow.rundb import CylcWorkflowDAO - from cylc.flow.pathutil import get_workflow_run_dir - from cylc.flow.workflow_files import WorkflowFiles - jobs = [] + elements = [] for workflow in args['workflows']: db_file = get_workflow_run_dir( workflow['workflow'], @@ -324,11 +323,15 @@ async def list_jobs(args): ) with CylcWorkflowDAO(db_file, is_public=True) as dao: conn = dao.connect() - jobs.extend(make_query(conn, workflow)) - return jobs + if 'tasks' in args: + elements.extend( + run_jobs_query(conn, workflow, args.get('tasks'))) + else: + elements.extend(run_task_query(conn, workflow)) + return elements -def make_query(conn, workflow): +def run_task_query(conn, workflow): # TODO: support all arguments including states # https://github.com/cylc/cylc-uiserver/issues/440 @@ -425,6 +428,7 @@ def make_query(conn, workflow): 'mean_queue_time': row[10], 'max_queue_time': row[11], 'std_dev_queue_time': (row[12] - row[10]**2)**0.5, + # Prevents null entries when there are too few tasks for quartiles 'queue_quartiles': [row[13], row[13] if row[14] is None else row[14], row[13] if row[15] is None else row[15]], @@ -433,6 +437,7 @@ def make_query(conn, workflow): 'mean_run_time': row[17], 'max_run_time': row[18], 'std_dev_run_time': (row[19] - row[17]**2)**0.5, + # Prevents null entries when there are too few tasks for quartiles 'run_quartiles': [row[20], row[20] if row[21] is None else row[21], row[20] if row[22] is None else row[22]], @@ -441,6 +446,7 @@ def make_query(conn, workflow): 'mean_total_time': row[24], 'max_total_time': row[25], 'std_dev_total_time': (row[26] - row[24] ** 2) ** 0.5, + # Prevents null entries when there are too few tasks for quartiles 'total_quartiles': [row[27], row[27] if row[28] is None else row[28], row[27] if row[29] is None else row[29]], @@ -451,6 +457,60 @@ def make_query(conn, workflow): return tasks +def run_jobs_query(conn, workflow, tasks): + + # TODO: support all arguments including states + # https://github.com/cylc/cylc-uiserver/issues/440 + jobs = [] + + # Create sql snippet used to limit which tasks are returned by query + if tasks: + where_clauses = "' OR name = '".join(tasks) + where_clauses = f" AND (name = '{where_clauses}')" + else: + where_clauses = '' + for row in conn.execute(f''' +SELECT + name, + cycle, + submit_num, + submit_status, + time_run, + time_run_exit, + job_id, + platform_name, + time_submit, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time, + STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time +FROM + task_jobs +WHERE + run_status = 0 + {where_clauses}; +'''): + jobs.append({ + 'id': workflow.duplicate( + cycle=row[1], + task=row[0], + job=row[2] + ), + 'name': row[0], + 'cycle_point': row[1], + 'submit_num': row[2], + 'state': row[3], + 'started_time': row[4], + 'finished_time': row[5], + 'job_ID': row[6], + 'platform': row[7], + 'submitted_time': row[8], + 'total_time': row[9], + 'run_time': row[10], + 'queue_time': row[11] + }) + return jobs + + class UISTask(Task): platform = graphene.String() @@ -484,6 +544,13 @@ class UISTask(Task): count = graphene.Int() +class UISJob(Job): + + total_time = graphene.Int() + queue_time = graphene.Int() + run_time = graphene.Int() + + class UISQueries(Queries): class LogFiles(graphene.ObjectType): @@ -511,7 +578,23 @@ class LogFiles(graphene.ObjectType): description=Task._meta.description, live=graphene.Boolean(default_value=True), strip_null=STRIP_NULL_DEFAULT, - resolver=get_jobs, + resolver=get_elements, + workflows=graphene.List(ID, default_value=[]), + exworkflows=graphene.List(ID, default_value=[]), + ids=graphene.List(ID, default_value=[]), + exids=graphene.List(ID, default_value=[]), + mindepth=graphene.Int(default_value=-1), + maxdepth=graphene.Int(default_value=-1), + sort=SortArgs(default_value=None), + + ) + + jobs = graphene.List( + UISJob, + description=Job._meta.description, + live=graphene.Boolean(default_value=True), + strip_null=STRIP_NULL_DEFAULT, + resolver=get_elements, workflows=graphene.List(ID, default_value=[]), exworkflows=graphene.List(ID, default_value=[]), ids=graphene.List(ID, default_value=[]), @@ -519,6 +602,7 @@ class LogFiles(graphene.ObjectType): mindepth=graphene.Int(default_value=-1), maxdepth=graphene.Int(default_value=-1), sort=SortArgs(default_value=None), + tasks=graphene.List(ID, default_value=[]) ) diff --git a/cylc/uiserver/tests/test_workflow_retrieval.py b/cylc/uiserver/tests/test_workflow_retrieval.py index 0689896f..2c0593ca 100644 --- a/cylc/uiserver/tests/test_workflow_retrieval.py +++ b/cylc/uiserver/tests/test_workflow_retrieval.py @@ -15,15 +15,18 @@ import pytest import sqlite3 +from typing import List from cylc.flow.id import Tokens -from cylc.uiserver.schema import make_query +from cylc.uiserver.schema import run_task_query, run_jobs_query, \ + list_elements, get_elements + '''This file tests the ability for the cylc UI to retrieve workflow information and perform simple statistical calculations for the analysis tab''' -def test_make_query_1(): +def test_make_task_query_1(): conn = sqlite3.connect(':memory:') conn.execute('''CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, @@ -43,7 +46,7 @@ def test_make_query_1(): conn.commit() workflow = Tokens('~user/workflow') - return_value = make_query(conn, workflow) + return_value = run_task_query(conn, workflow) assert return_value[0]['count'] == 1 assert return_value[0]['cycle_point'] == '1' @@ -79,7 +82,7 @@ def test_make_query_1(): assert return_value[0]['total_quartiles'][2] == 600 -def test_make_query_2(): +def test_make_task_query_2(): conn = sqlite3.connect(':memory:') conn.execute('''CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, @@ -103,7 +106,7 @@ def test_make_query_2(): conn.commit() workflow = Tokens('~user/workflow') - return_value = make_query(conn, workflow) + return_value = run_task_query(conn, workflow) assert return_value[0]['count'] == 2 assert return_value[0]['cycle_point'] == '2' @@ -139,7 +142,7 @@ def test_make_query_2(): assert return_value[0]['total_quartiles'][2] == 600 -def test_make_query_3(): +def test_make_task_query_3(): conn = sqlite3.connect(':memory:') conn.execute('''CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, @@ -167,8 +170,9 @@ def test_make_query_3(): conn.commit() workflow = Tokens('~user/workflow') - return_value = make_query(conn, workflow) + return_value = run_task_query(conn, workflow) + assert len(return_value) == 1 assert return_value[0]['count'] == 3 assert return_value[0]['cycle_point'] == '3' assert return_value[0]['finished_time'] == '2022-12-16T15:12:00Z' @@ -201,3 +205,168 @@ def test_make_query_3(): assert return_value[0]['queue_quartiles'][2] == 76 assert return_value[0]['run_quartiles'][2] == 644 assert return_value[0]['total_quartiles'][2] == 720 + + +def test_make_jobs_query_1(): + conn = sqlite3.connect(':memory:') + conn.execute('''CREATE TABLE task_jobs(cycle TEXT, name TEXT, + submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, + try_num INTEGER, time_submit TEXT, time_submit_exit TEXT, + submit_status INTEGER, time_run TEXT, time_run_exit TEXT, + run_signal TEXT, run_status INTEGER, platform_name TEXT, + job_runner_name TEXT, job_id TEXT, + PRIMARY KEY(cycle, name, submit_num));''') + + conn.executemany( + 'INSERT into task_jobs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', + [('1', 'Task_1', '01', '{1}', 0, 1, + '2022-12-14T15:00:00Z', '2022-12-14T15:01:00Z', 0, + '2022-12-14T15:01:00Z', '2022-12-14T15:10:00Z', None, 0, + 'MyPlatform', 'User', 'UsersJob'), + ('2', 'Task_1', '01', '{1}', 0, 1, + '2022-12-15T15:00:00Z', '2022-12-15T15:01:15Z', 0, + '2022-12-15T15:01:16Z', '2022-12-15T15:12:00Z', None, 0, + 'MyPlatform', 'User', 'UsersJob'), + ('3', 'Task_1', '01', '{1}', 0, 1, + '2022-12-16T15:00:00Z', '2022-12-16T15:01:15Z', 0, + '2022-12-16T15:01:16Z', '2022-12-16T15:12:00Z', None, 0, + 'MyPlatform', 'User', 'UsersJob') + ]) + conn.commit() + workflow = Tokens('~user/workflow') + tasks = [] + + return_value = run_jobs_query(conn, workflow, tasks) + + assert len(return_value) == 3 + + assert return_value[0]['cycle_point'] == '1' + assert return_value[0]['finished_time'] == '2022-12-14T15:10:00Z' + assert return_value[0]['id'].id == '~user/workflow//1/Task_1/01' + assert return_value[0]['job_ID'] == 'UsersJob' + assert return_value[0]['name'] == 'Task_1' + assert return_value[0]['platform'] == 'MyPlatform' + assert return_value[0]['started_time'] == '2022-12-14T15:01:00Z' + assert return_value[0]['state'] == 0 + assert return_value[0]['submit_num'] == 1 + assert return_value[0]['submitted_time'] == '2022-12-14T15:00:00Z' + + assert return_value[1]['cycle_point'] == '2' + assert return_value[1]['finished_time'] == '2022-12-15T15:12:00Z' + assert return_value[1]['id'].id == '~user/workflow//2/Task_1/01' + assert return_value[1]['job_ID'] == 'UsersJob' + assert return_value[1]['name'] == 'Task_1' + assert return_value[1]['platform'] == 'MyPlatform' + assert return_value[1]['started_time'] == '2022-12-15T15:01:16Z' + assert return_value[1]['state'] == 0 + assert return_value[1]['submit_num'] == 1 + assert return_value[1]['submitted_time'] == '2022-12-15T15:00:00Z' + + +async def test_list_elements(monkeypatch): + + with pytest.raises(Exception) as e_info: + await list_elements({'stuff': [1, 2, 3], 'workflows': []}) + + exception_raised = e_info.value + assert exception_raised.args[0] == \ + 'At least one workflow must be provided.' + + +@pytest.mark.parametrize( + 'field_ids, kwargs, expected', + [ + pytest.param( + [], + { + 'ids': ['//1/t/01'], + 'workflows': ['~u/w'], + }, + [], + id="field_ids = empty list" + ), + pytest.param( + None, + { + 'ids': ['//1/t/01'], + 'exids': ['//1/t/01'], + 'workflows': ['~u/w'], + 'exworkflows': ['~u2/w'] + }, + { + 'live': False, + 'ids': [Tokens('//1/t/01')], + 'exids': [Tokens('//1/t/01')], + 'workflows': [Tokens('~u/w')], + 'exworkflows': [Tokens('~u2/w')] + }, + id="field_ids = None" + ), + pytest.param( + '//2/t/01', + { + 'ids': ['//1/t/01'], + 'exids': [], + 'workflows': ['~u/w'], + 'exworkflows': [] + }, + { + 'live': False, + 'ids': [Tokens('//2/t/01')], + 'exids': [], + 'workflows': [Tokens('~u/w')], + 'exworkflows': [] + }, + id="field_ids = str" + ), + pytest.param( + { + '//2/t/01': 'something', + '//2/t/02': 'something else', + }, + { + 'ids': ['//1/t/01'], + 'exids': [], + 'workflows': ['~u/w'], + 'exworkflows': [] + }, + { + 'live': False, + 'ids': [Tokens('//2/t/01'), Tokens('//2/t/02')], + 'exids': [], + 'workflows': [Tokens('~u/w')], + 'exworkflows': [] + }, + id="field_ids = dict" + ) + ] +) +async def test_get_elements( + monkeypatch: pytest.MonkeyPatch, + field_ids, kwargs, expected +): + + # get_elements takes 2 args, root and info and kwargs. + # Root always seems to be none + root = None + # info is a graphql object that only gets used to pass on other + # functions that I'm not testing + info = None + + async def mock_return_list_elements(kwargs): + return kwargs + + def mock_process_resolver_info(*args): + return None, field_ids + + monkeypatch.setattr('cylc.uiserver.schema.list_elements', + mock_return_list_elements) + monkeypatch.setattr('cylc.uiserver.schema.process_resolver_info', + mock_process_resolver_info) + + assert await get_elements( + root, + info, + live=False, + **kwargs + ) == expected