-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadd_workflows.py
155 lines (112 loc) · 5.15 KB
/
add_workflows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 15 15:36:41 2024
@author: rjovelin
"""
import os
import sqlite3
from utilities import connect_to_db
import argparse
def collect_workflows(db, project_name, table = 'Workflows'):
'''
(str, str) -> list
Returns a list of workflows for project_name extracted from db database
Parameters
----------
- db (str): Path to the sqlite database
- project_name (str): Name of project of interest
- table (str): Table in db storing the workflow information. Default is Workflows
'''
conn = connect_to_db(db)
data = conn.execute('SELECT wfrun_id FROM {0} WHERE {0}.project_id = "{1}"'.format(table, project_name)).fetchall()
conn.close()
L = [i['wfrun_id'] for i in data]
return L
def collect_projects(main_db, table = 'Projects'):
'''
(str, str) -> list
Returns a list of project names extracted from the Projects table in main database
Parameters
----------
- main_db (str): Path to the sqlite database storing production data
- table (str): Table from main_db storing workflow information. Default is Projects
'''
conn = connect_to_db(main_db)
data = conn.execute('SELECT project_id FROM {0}'.format(table)).fetchall()
conn.close()
L = [i['project_id'] for i in data]
return L
def add_missing_workflow(workflow_db, project_id, workflows, workflow_table='Workflows'):
'''
(str, str, list, str) -> None
Add all workflows into workflow_table of worfklow_db. Set selected status to 0.
Parameters
----------
- workflow_db (str): Path to the sqlite database storing Workflow information
- project_id (str): Name of project of interest
- workflows (list): List of workflow ids not alread recorded in the workflow_db
- workflow_table : Table in db storing the workflow information. Default is Workflows
'''
column_names = ('wfrun_id', 'project_id', 'selected')
conn = connect_to_db(workflow_db)
for workflow_id in workflows:
# insert data into table
conn.execute('INSERT INTO {0} {1} VALUES {2}'.format(workflow_table, column_names, (workflow_id, project_id, 0)))
conn.commit()
conn.close()
def update_workflow_db(workflow_db, main_db, project_table, workflow_table):
'''
(str, str, str, str) -> None
Update the workflow database with the workflows from the main database if these
workflows are not already recorded
Parameters
----------
- workflow_db (str): Path to the sqlite database storing Workflow information
- main_db (str): Path to the sqlite database storing production data
- project_table (str): Table from main_db storing workflow information. Default is Projects
- workflow_table : Table in db storing the workflow information. Default is Workflows
'''
# make a list of project name
projects = collect_projects(main_db, project_table)
for project in projects:
# collect the workflows from the main database
new_workflows = collect_workflows(main_db, project, workflow_table)
# collect the workflows from the workflow database
recorded_workflows = collect_workflows(workflow_db, project, workflow_table)
# make a list of workflows to be added
workflows = list(set(new_workflows).difference(set(recorded_workflows)))
# add the missing workflows for the project of focus
add_missing_workflow(workflow_db, project, workflows, workflow_table)
def setup_database(database, table = 'Workflows'):
'''
(str, str) -> None
Create a database with table Workflows
Parameters
----------
- database (str): Path to the sqlite database
- table (str): Table in database. Defulat is Workflows
'''
# create dict to store column names for each table {table: [column names]}
column_names = ['wfrun_id', 'project_id', 'selected']
column_types = ['VARCHAR(572)', 'VARCHAR(128)', 'INT']
# define table format including constraints
table_format = ', '.join(list(map(lambda x: ' '.join(x), list(zip(column_names, column_types)))))
# connect to database
conn = sqlite3.connect(database)
cur = conn.cursor()
# create table
cmd = 'CREATE TABLE {0} ({1})'.format(table, table_format)
cur.execute(cmd)
conn.commit()
conn.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog = 'add_workflows.py', description='Updates the Workflows database with workflow ids from the main database', add_help=True)
parser.add_argument('-w', '--workflow_db', dest='workflow_db', help='Path to the database storing workflow information')
parser.add_argument('-m', '--main_db', dest = 'main_db', help = 'Path to the main database storing production information')
# get arguments from the command line
args = parser.parse_args()
# create database if doesn't exist
if os.path.isfile(args.workflow_db) == False:
setup_database(args.workflow_db, 'Workflows')
# add missing workflows
update_workflow_db(args.workflow_db, args.main_db, 'Projects', 'Workflows')