diff --git a/dsi/plugins/file_writer.py b/dsi/plugins/file_writer.py index 7f433a0..d222622 100644 --- a/dsi/plugins/file_writer.py +++ b/dsi/plugins/file_writer.py @@ -31,10 +31,14 @@ def __init__(self, filenames, **kwargs): class ER_Diagram(FileWriter): - def __init__(self, filenames, **kwargs): - super().__init__(filenames, **kwargs) + def __init__(self, filename, dbname = None, target_table_prefix = None, **kwargs): + super().__init__(filename, **kwargs) + self.output_filename = filename + self.target_table_prefix = target_table_prefix + #COMMENT OUT DBNAME VARIABLE ONCE DELETING export_to_erd + self.dbname = dbname - def export_erd(self, dbname, fname): + def get_rows(self, collection) -> None: """ Function that outputs a ER diagram for the given database. @@ -44,60 +48,50 @@ def export_erd(self, dbname, fname): `return`: none """ - - db = sqlite3.connect(dbname) + # if self.dbname is not None: + # self.export_erd(self.dbname, self.output_filename) + # return + # else: file_type = ".png" - if fname[-4:] == ".png" or fname[-4:] == ".pdf" or fname[-4:] == ".jpg": - file_type = fname[-4:] - fname = fname[:-4] - elif fname[-5:] == ".jpeg": - file_type = fname[-5:] - fname = fname[:-5] - - dot_file = open(fname + ".dot", "w") - - numColsERD = 1 - - dot_file.write("digraph sqliteschema { ") + if self.output_filename[-4:] == ".png" or self.output_filename[-4:] == ".pdf" or self.output_filename[-4:] == ".jpg": + file_type = self.output_filename[-4:] + self.output_filename = self.output_filename[:-4] + elif self.output_filename[-5:] == ".jpeg": + file_type = self.output_filename[-5:] + self.output_filename = self.output_filename[:-5] + + if self.target_table_prefix is not None and not any(self.target_table_prefix in element for element in collection.keys()): + raise ValueError("Your input for target_table_prefix does not exist in the database. Please enter a valid prefix for table names.") + + dot_file = open(self.output_filename + ".dot", "w") + + num_tbl_cols = 1 + dot_file.write("digraph workflow_schema { ") + if self.target_table_prefix is not None: + dot_file.write(f'label="ER Diagram for {self.target_table_prefix} tables"; ') + dot_file.write('labelloc="t"; ') dot_file.write("node [shape=plaintext]; ") dot_file.write("rankdir=LR ") dot_file.write("splines=true ") dot_file.write("overlap=false ") - list_db_tbls = "SELECT tbl_name, NULL AS label, NULL AS color, NULL AS clusterid FROM sqlite_master WHERE type='table'" - try: - tbl_list_stmt = db.execute(list_db_tbls) - except sqlite3.Error as er: - dot_file.write(er.sqlite_errorname) - dot_file.write("Can't prepare table list statement") - db.close() - dot_file.close() - - for row in tbl_list_stmt: - tbl_name = row[0] - - tbl_info_sql = f"PRAGMA table_info({tbl_name})" - try: - tbl_info_stmt = db.execute(tbl_info_sql) - except sqlite3.Error as er: - dot_file.write(er.sqlite_errorname) - dot_file.write(f"Can't prepare table info statement on table {tbl_name}") - db.close() - dot_file.close() - - dot_file.write(f"{tbl_name} [label=<") + for tableName, tableData in collection.items(): + if tableName == "dsi_relations" or (self.target_table_prefix is not None and self.target_table_prefix not in tableName): + continue + + dot_file.write(f"{tableName} [label=<
{tbl_name}
") curr_row = 0 inner_brace = 0 - for info_row in tbl_info_stmt: - if curr_row % numColsERD == 0: + for col_name in tableData.keys(): + if curr_row % num_tbl_cols == 0: inner_brace = 1 dot_file.write("") - dot_file.write(f"") + dot_file.write(f"") curr_row += 1 - if curr_row % numColsERD == 0: + if curr_row % num_tbl_cols == 0: inner_brace = 0 dot_file.write("") @@ -105,28 +99,112 @@ def export_erd(self, dbname, fname): dot_file.write("") dot_file.write("
{tableName}
{info_row[1]}{col_name}
>]; ") - tbl_list_stmt = db.execute(list_db_tbls) - for row in tbl_list_stmt: - tbl_name = row[0] - - fkey_info_sql = f"PRAGMA foreign_key_list({tbl_name})" - try: - fkey_info_stmt = db.execute(fkey_info_sql) - except sqlite3.Error as er: - dot_file.write(er.sqlite_errorname) - dot_file.write(f"Can't prepare foreign key statement on table {tbl_name}") - db.close() - dot_file.close() - - for fkey_row in fkey_info_stmt: - dot_file.write(f"{tbl_name}:{fkey_row[3]} -> {fkey_row[2]}:{fkey_row[4]}; ") + for f_table, f_col in collection["dsi_relations"]["foreign_key"]: + if self.target_table_prefix is not None and self.target_table_prefix not in f_table: + continue + if f_table != "NULL": + foreignIndex = collection["dsi_relations"]["foreign_key"].index((f_table, f_col)) + dot_file.write(f"{f_table}:{f_col} -> {collection["dsi_relations"]['primary_key'][foreignIndex][0]}: {collection["dsi_relations"]['primary_key'][foreignIndex][1]}; ") dot_file.write("}") - db.close() dot_file.close() - subprocess.run(["dot", "-T", file_type[1:], "-o", fname + file_type, fname + ".dot"]) - os.remove(fname + ".dot") + subprocess.run(["dot", "-T", file_type[1:], "-o", self.output_filename + file_type, self.output_filename + ".dot"]) + os.remove(self.output_filename + ".dot") + + # def export_erd(self, dbname, fname): + # """ + # Function that outputs a ER diagram for the given database. + + # `dbname`: database to create an ER diagram for + + # `fname`: name (including path) of the image file that contains the generated ER diagram - default png if not specified + + # `return`: none + # """ + + # db = sqlite3.connect(dbname) + + # file_type = ".png" + # if fname[-4:] == ".png" or fname[-4:] == ".pdf" or fname[-4:] == ".jpg": + # file_type = fname[-4:] + # fname = fname[:-4] + # elif fname[-5:] == ".jpeg": + # file_type = fname[-5:] + # fname = fname[:-5] + + # dot_file = open(fname + ".dot", "w") + + # numColsERD = 1 + + # dot_file.write("digraph sqliteschema { ") + # dot_file.write("node [shape=plaintext]; ") + # dot_file.write("rankdir=LR ") + # dot_file.write("splines=true ") + # dot_file.write("overlap=false ") + + # list_db_tbls = "SELECT tbl_name, NULL AS label, NULL AS color, NULL AS clusterid FROM sqlite_master WHERE type='table'" + # try: + # tbl_list_stmt = db.execute(list_db_tbls) + # except sqlite3.Error as er: + # dot_file.write(er.sqlite_errorname) + # dot_file.write("Can't prepare table list statement") + # db.close() + # dot_file.close() + + # for row in tbl_list_stmt: + # tbl_name = row[0] + + # tbl_info_sql = f"PRAGMA table_info({tbl_name})" + # try: + # tbl_info_stmt = db.execute(tbl_info_sql) + # except sqlite3.Error as er: + # dot_file.write(er.sqlite_errorname) + # dot_file.write(f"Can't prepare table info statement on table {tbl_name}") + # db.close() + # dot_file.close() + + # dot_file.write(f"{tbl_name} [label=<") + + # curr_row = 0 + # inner_brace = 0 + # for info_row in tbl_info_stmt: + # if curr_row % numColsERD == 0: + # inner_brace = 1 + # dot_file.write("") + + # dot_file.write(f"") + # curr_row += 1 + # if curr_row % numColsERD == 0: + # inner_brace = 0 + # dot_file.write("") + + # if inner_brace: + # dot_file.write("") + # dot_file.write("
{tbl_name}
{info_row[1]}
>]; ") + + # tbl_list_stmt = db.execute(list_db_tbls) + # for row in tbl_list_stmt: + # tbl_name = row[0] + + # fkey_info_sql = f"PRAGMA foreign_key_list({tbl_name})" + # try: + # fkey_info_stmt = db.execute(fkey_info_sql) + # except sqlite3.Error as er: + # dot_file.write(er.sqlite_errorname) + # dot_file.write(f"Can't prepare foreign key statement on table {tbl_name}") + # db.close() + # dot_file.close() + + # for fkey_row in fkey_info_stmt: + # dot_file.write(f"{tbl_name}:{fkey_row[3]} -> {fkey_row[2]}:{fkey_row[4]}; ") + + # dot_file.write("}") + # db.close() + # dot_file.close() + + # subprocess.run(["dot", "-T", file_type[1:], "-o", fname + file_type, fname + ".dot"]) + # os.remove(fname + ".dot") class Csv_Writer(FileWriter): """ diff --git a/examples/coreterminal.py b/examples/coreterminal.py index 5035fa9..7d326a6 100644 --- a/examples/coreterminal.py +++ b/examples/coreterminal.py @@ -50,7 +50,7 @@ #Example use 2 -# a.load_module('backend','SqliteReader','back-read', filename='data/code-trans-dm0801.db') +# a.load_module('backend','SqliteReader','back-read', filename='data/data.db') # a.artifact_handler(interaction_type="read") # a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics") # a.transload() \ No newline at end of file