From 481a068b484cafa22f6be6c5d92140bfd83fb43b Mon Sep 17 00:00:00 2001 From: Annette Lien <70581832+liannette@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:53:09 +0100 Subject: [PATCH] add methods to export results in tabular format (#280) * add print links method to LinkGraph, improve LinkGraph string representation * feat: add a method to print tabular results files * improve method names and docstrings, remove unused method to export gcf file * improve doctring and typing * fix a failing test * refactor a little bit the spectrum method to covert to dict * change the output format for gnps_annotations in metabolomics results file, improve docstrings * fix: convert int to str before using join * change representation of empty values in output files for improved integration to excel * refactoring the export methods * small refactor: specify staticmethod * add more tests * correct typing in doctrings * typing: changed typings to pass mypy static typing checks * refactor: change the order of methods/functions * restore the order of already existing functions and methods * make dicts json compatible * rename functions and variables * refactor: changed the place when the index is added to the link dict * use csv package to write the tabular output files * make sure all elements of the input list have the same type of data. * shorten to long doc string lines, correct some doc strings * tests: adapted the test to the changes * remove a file that was committed by accident * Improve docstrings Apply suggestions from code review Co-authored-by: Cunliang Geng * Improve docstrings Apply suggestions from code review Co-authored-by: Cunliang Geng * refactor: add method to convert a value to string for tabular output * improve doctring, add a comment about key order of bgc dict representation * move to_string method to the BGC/Spectrum class, add a to_tabular method * add tests for the to_string method * change to_tabular to it returns a list and not a string * refactor: to_tabular returns dict, to_string turned into private func, tabs are replaced in to_tabular * fix typing in to_tabular methods * update docstrings and comments * ensure 0 and 0.0 are correctly converted to strings, and not to empty strings * change the order of methods * remove whitespace in blank lines * update and add tests * change variable name to fix mypy error * test: trying to fix unit test issue where the spectrum rt is a dict instead of numerical * tests: add precursor charge to the test spectra * Update src/nplinker/metabolomics/spectrum.py --------- Co-authored-by: Cunliang Geng --- .github/workflows/format-typing-check.yml | 2 +- pyproject.toml | 1 + src/nplinker/genomics/bgc.py | 68 +++++++++++ src/nplinker/metabolomics/spectrum.py | 63 ++++++++++ src/nplinker/nplinker.py | 41 +++++++ src/nplinker/scoring/link_graph.py | 133 +++++++++++++++++----- tests/integration/test_nplinker_local.py | 40 +++++++ tests/unit/genomics/test_bgc.py | 76 +++++++++++++ tests/unit/metabolomics/test_spectrum.py | 76 +++++++++++++ tests/unit/scoring/test_link_graph.py | 58 ++++++++++ 10 files changed, 528 insertions(+), 30 deletions(-) diff --git a/.github/workflows/format-typing-check.yml b/.github/workflows/format-typing-check.yml index a5def2b9..10ea0990 100644 --- a/.github/workflows/format-typing-check.yml +++ b/.github/workflows/format-typing-check.yml @@ -37,7 +37,7 @@ jobs: - name: Install ruff and mypy run: | pip install ruff mypy typing_extensions \ - types-Deprecated types-beautifulsoup4 types-jsonschema types-networkx pandas-stubs + types-Deprecated types-beautifulsoup4 types-jsonschema types-networkx types-tabulate pandas-stubs - name: Get all changed python files id: changed-python-files uses: tj-actions/changed-files@v44 diff --git a/pyproject.toml b/pyproject.toml index 675f89c4..74d050a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ dev = [ "types-beautifulsoup4", "types-jsonschema", "types-networkx", + "types-tabulate", "pandas-stubs", # docs "black", diff --git a/src/nplinker/genomics/bgc.py b/src/nplinker/genomics/bgc.py index 08978587..8decbb81 100644 --- a/src/nplinker/genomics/bgc.py +++ b/src/nplinker/genomics/bgc.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging from typing import TYPE_CHECKING +from typing import Any from deprecated import deprecated from nplinker.strain import Strain from .aa_pred import predict_aa @@ -173,6 +174,73 @@ def is_mibig(self) -> bool: """ return self.id.startswith("BGC") + def to_dict(self) -> dict[str, Any]: + """Convert the BGC object to a dictionary for exporting purpose. + + Returns: + A dictionary containing the following key-value pairs: + + - GCF_id (list[str]): A list of GCF IDs. + - GCF_bigscape_class (list[str]): A list of BiG-SCAPE classes. + - strain_id (str | None): The ID of the strain. + - description (str | None): A description of the BGC. + - BGC_name (str): The name of the BGC. + - product_prediction (list[str]): (predicted) products or product classes of the BGC. + - mibig_bgc_class (list[str] | None): MIBiG biosynthetic classes. + - antismash_id (str | None): The antiSMASH ID. + - antismash_region (int | None): The antiSMASH region number. + """ + # Keys are ordered to make the output easier to analyze + return { + "GCF_id": [gcf.id for gcf in self.parents if gcf.id is not None], + "GCF_bigscape_class": [bsc for bsc in self.bigscape_classes if bsc is not None], + "strain_id": self.strain.id if self.strain is not None else None, + "description": self.description, + "BGC_name": self.id, + "product_prediction": list(self.product_prediction), + "mibig_bgc_class": self.mibig_bgc_class, + "antismash_id": self.antismash_id, + "antismash_region": self.antismash_region, + } + + def to_tabular(self) -> dict[str, str]: + """Convert the BGC object to a tabular format. + + Returns: + dict: A dictionary representing the BGC object in tabular format. + The keys can be treated as headers and values are strings in which tabs are removed. + This dict can be exported as a TSV file. + """ + return { + key: self._to_string(value).replace("\t", " ") + for key, value in self.to_dict().items() + } + + @staticmethod + def _to_string(value: Any) -> str: + """Convert various types of values to a string. + + Args: + value: The value to be converted to a string. + Can be a list, dict, or any other JSON-compatible type. + + Returns: + A string representation of the input value. + """ + # Convert list to comma-separated string + if isinstance(value, list): + formatted_value = ", ".join(map(str, value)) + # Convert dict to comma-separated string + elif isinstance(value, dict): + formatted_value = ", ".join([f"{k}:{v}" for k, v in value.items()]) + # Convert None to empty string + elif value is None: + formatted_value = "" + # Convert anything else to string + else: + formatted_value = str(value) + return formatted_value + # CG: why not providing whole product but only amino acid as product monomer? # this property is not used in NPLinker core business. @property diff --git a/src/nplinker/metabolomics/spectrum.py b/src/nplinker/metabolomics/spectrum.py index fa65de2e..db9b4c3f 100644 --- a/src/nplinker/metabolomics/spectrum.py +++ b/src/nplinker/metabolomics/spectrum.py @@ -1,6 +1,7 @@ from __future__ import annotations from functools import cached_property from typing import TYPE_CHECKING +from typing import Any import numpy as np from nplinker.strain import Strain from nplinker.strain import StrainCollection @@ -108,3 +109,65 @@ def has_strain(self, strain: Strain) -> bool: True when the given strain exist in the spectrum. """ return strain in self.strains + + def to_dict(self) -> dict[str, Any]: + """Convert the Spectrum object to a dictionary for exporting purpose. + + Returns: + A dictionary containing containing the following key-value pairs: + + - "spectrum_id" (str): The unique identifier of the spectrum. + - "num_strains_with_spectrum" (int): The number of strains associated with the spectrum. + - "precursor_mz" (float): The precursor m/z value, rounded to four decimal places. + - "rt" (float): The retention time, rounded to three decimal places. + - "molecular_family" (str | None ): The identifier of the molecular family. + - "gnps_id" (str | None ): The GNPS identifier. + - "gnps_annotations" (dict[str, str]): A dictionary of GNPS annotations. + """ + return { + "spectrum_id": self.id, + "num_strains_with_spectrum": len(self.strains), + "precursor_mz": round(self.precursor_mz, 4), + "rt": round(self.rt, 3), + "molecular_family": self.family.id if self.family else None, + "gnps_id": self.gnps_id, + "gnps_annotations": self.gnps_annotations, + } + + def to_tabular(self) -> dict[str, str]: + """Convert the Spectrum object to a tabular format. + + Returns: + dict: A dictionary representing the Spectrum object in tabular format. + The keys can be treated as headers and values are strings in which tabs are removed. + This dict can be exported as a TSV file. + """ + return { + key: self._to_string(value).replace("\t", " ") + for key, value in self.to_dict().items() + } + + @staticmethod + def _to_string(value: Any) -> str: + """Convert various types of values to a string. + + Args: + value: The value to be converted to a string. + Can be a list, dict, or any other JSON-compatible type. + + Returns: + A string representation of the input value. + """ + # Convert list to comma-separated string + if isinstance(value, list): + formatted_value = ", ".join(map(str, value)) + # Convert dict to comma-separated string + elif isinstance(value, dict): + formatted_value = ", ".join([f"{k}:{v}" for k, v in value.items()]) + # Convert None to empty string + elif value is None: + formatted_value = "" + # Convert anything else to string + else: + formatted_value = str(value) + return formatted_value diff --git a/src/nplinker/nplinker.py b/src/nplinker/nplinker.py index a7146dcc..1a42d7a1 100644 --- a/src/nplinker/nplinker.py +++ b/src/nplinker/nplinker.py @@ -1,4 +1,5 @@ from __future__ import annotations +import csv import logging import pickle from collections.abc import Sequence @@ -355,3 +356,43 @@ def save_data( data = (self.bgcs, self.gcfs, self.spectra, self.mfs, self.strains, links) with open(file, "wb") as f: pickle.dump(data, f) + + def objects_to_tsv(self, objects: Sequence[BGC] | Sequence[Spectrum], filename: str) -> None: + """Exports a list of BGC or Spectrum objects to a tsv file. + + Args: + objects (list): A list of BGC or a list of Spectrum objects to be exported. + filename (str): The name of the output file. + """ + if not objects: + raise ValueError("No objects provided to export") + + # Ensure all elements in the list are of the same type + obj_type = type(objects[0]) + if not all(isinstance(obj, obj_type) for obj in objects): + raise TypeError("All objects in the list must be of the same type") + + with open(self._output_dir / filename, "w", newline="") as outfile: + headers = objects[0].to_tabular().keys() + writer = csv.DictWriter(outfile, fieldnames=headers, delimiter="\t") + writer.writeheader() + for obj in objects: + writer.writerow(obj.to_tabular()) + + def to_tsv(self, lg: LinkGraph | None = None) -> None: + """Export data to tsv files. + + This method exports following data to seperated TSV files: + + - BGC objects: `genomics_data.tsv` + - Spectrum objects: `metabolomics_data.tsv` + - LinkGraph object (if given): `links.tsv` + + Args: + lg (LinkGraph | None): An optional LinkGraph object. If provided, + the links data will be exported to 'links.tsv'. + """ + self.objects_to_tsv(self.bgcs, "genomics_data.tsv") + self.objects_to_tsv(self.spectra, "metabolomics_data.tsv") + if lg is not None: + lg.to_tsv(self._output_dir / "links.tsv") diff --git a/src/nplinker/scoring/link_graph.py b/src/nplinker/scoring/link_graph.py index 50151997..e3653398 100644 --- a/src/nplinker/scoring/link_graph.py +++ b/src/nplinker/scoring/link_graph.py @@ -1,6 +1,9 @@ from __future__ import annotations +import csv from collections.abc import Sequence from functools import wraps +from os import PathLike +from typing import Any from typing import Union from networkx import Graph from tabulate import tabulate @@ -76,17 +79,17 @@ def __init__(self) -> None: Display the empty LinkGraph object: >>> lg - | | Object 1 | Object 2 | Metcalf Score | Rosetta Score | - |----|------------|------------|-----------------|-----------------| + | index | genomic_object_id | genomic_object_type | metabolomic_object_id | metabolomic_object_type | metcalf_score | rosetta_score | + |---------|---------------------|-----------------------|-------------------------|---------------------------|-----------------|-----------------| Add a link between a GCF and a Spectrum object: >>> lg.add_link(gcf, spectrum, metcalf=Score("metcalf", 1.0, {"cutoff": 0.5})) Display all links in LinkGraph object: >>> lg - | | Object 1 | Object 2 | Metcalf Score | Rosetta Score | - |----|--------------|------------------------|-----------------|-----------------| - | 1 | GCF(id=gcf1) | Spectrum(id=spectrum1) | 1 | - | + | index | genomic_object_id | genomic_object_type | metabolomic_object_id | metabolomic_object_type | metcalf_score | rosetta_score | + |---------|---------------------|-----------------------|-------------------------|---------------------------|-----------------|-----------------| + | 1 | 1 | GCF | 1 | Spectrum | 1.00 | | Get all links for a given object: >>> lg[gcf] @@ -103,6 +106,18 @@ def __init__(self) -> None: Get the link data between two objects: >>> lg.get_link_data(gcf, spectrum) {"metcalf": Score("metcalf", 1.0, {"cutoff": 0.5})} + + Filter the links for `gcf1` and `gcf2`: + >>> new_lg = lg.filter([gcf1, gcf2]) + + Filter the links for `spectrum1` and `spectrum2`: + >>> new_lg = lg.filter([spectrum1, spectrum2]) + + Filter the links between two lists of objects: + >>> new_lg = lg.filter([gcf1, gcf2], [spectrum1, spectrum2]) + + Export the links to a file: + >>> lg.to_tsv("links.tsv") """ self._g: Graph = Graph() @@ -267,6 +282,54 @@ def filter(self, u_nodes: Sequence[Entity], v_nodes: Sequence[Entity] = [], /) - return lg + @staticmethod + def link_to_dict(link: LINK) -> dict[str, Any]: + """Convert a link to a dictionary representation. + + Args: + link: A tuple containing the link information (u, v, data). + + Returns: + A dictionary containing the link information with the following keys: + + - genomic_object_id (str): The ID of the genomic object. + - genomic_object_type (str): The type of the genomic object. + - metabolomic_object_id (str): The ID of the metabolomic object. + - metabolomic_object_type (str): The type of the metabolomic object. + - metcalf_score (float | str): The Metcalf score, rounded to 2 decimal places. + - rosetta_score (float | str): The Rosetta score, rounded to 2 decimal places. + """ + u, v, data = link + genomic_types = (GCF,) + genomic_object = u if isinstance(u, genomic_types) else v + metabolomic_object = v if isinstance(u, genomic_types) else u + metcalf_score = data.get("metcalf") + rosetta_score = data.get("rosetta") + return { + "genomic_object_id": genomic_object.id, + "genomic_object_type": genomic_object.__class__.__name__, + "metabolomic_object_id": metabolomic_object.id, + "metabolomic_object_type": metabolomic_object.__class__.__name__, + "metcalf_score": round(metcalf_score.value, 2) if metcalf_score else "", + "rosetta_score": round(rosetta_score.value, 2) if rosetta_score else "", + } + + def to_tsv(self, file: str | PathLike) -> None: + """Exports the links in the LinkGraph to a TSV file. + + Args: + file: the path to the output TSV file. + + Examples: + >>> lg.to_tsv("links.tsv") + """ + table_data = self._links_to_dicts() + headers = table_data[0].keys() + with open(file, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=headers, delimiter="\t") + writer.writeheader() + writer.writerows(table_data) + @validate_u def _filter_one_node(self, u: Entity, lg: LinkGraph) -> None: """Filter the links for a given object and add them to the new LinkGraph object.""" @@ -285,35 +348,47 @@ def _filter_two_nodes(self, u: Entity, v: Entity, lg: LinkGraph) -> None: if link_data is not None: lg.add_link(u, v, **link_data) - def _get_table_repr(self) -> str: + def _get_table_repr(self, display_limit: int | None = 60) -> str: """Generate a table representation of the LinkGraph. - The table is truncated to 60 links. - """ - headers = ["", "Object 1", "Object 2", "Metcalf Score", "Rosetta Score"] - table_data = [] - display_limit = 60 + Args: + display_limit: The maximum number of links to display in the table. Defaults to 60. - for index, (u, v, data) in enumerate(self.links, start=1): - metcalf_score = data.get("metcalf") - rosetta_score = data.get("rosetta") + Returns: + A string representation of the table in GitHub-flavored markdown format. If the + number of links exceeds the display limit, the table is truncated and an additional + line indicating the total number of links is appended. + """ + table = tabulate( + self._links_to_dicts(display_limit), + headers="keys", + tablefmt="github", + stralign="right", + ) + + if display_limit is not None and len(self.links) > display_limit: + truncated_info = f"...\n[ {len(self.links)} links ]" + table += f"\n{truncated_info}" - row = [ - index, - str(u if isinstance(u, GCF) else v), - str(v if isinstance(u, GCF) else u), - f"{metcalf_score.value:.2f}" if metcalf_score else "-", - f"{rosetta_score.value:.2f}" if rosetta_score else "-", - ] - table_data.append(row) + return table - if index == display_limit: - break + def _links_to_dicts(self, display_limit: int | None = None) -> list[dict[str, Any]]: + """Generate the table data for the LinkGraph. - table = tabulate(table_data, headers=headers, tablefmt="github", stralign="right") + This method iterates over the links in the LinkGraph and constructs a table + containing information about genomic and metabolomic objects, as well as their + associated scores. Each row in the table represents a link between a genomic + object and a metabolomic object. - if len(self.links) > display_limit: - truncated_info = f"...\n[ {len(self.links)} links ]" - return f"{table}\n{truncated_info}" + Args: + display_limit (int | None): The maximum number of rows to include in the + table. If None, all rows are included. - return table + Returns: + A list of dictionaries containing the table data. + """ + links = self.links[:display_limit] if display_limit else self.links + link_dicts = [] + for idx, link in enumerate(links): + link_dicts.append({"index": idx + 1, **self.link_to_dict(link)}) + return link_dicts diff --git a/tests/integration/test_nplinker_local.py b/tests/integration/test_nplinker_local.py index 54144dd1..2c27a4ab 100644 --- a/tests/integration/test_nplinker_local.py +++ b/tests/integration/test_nplinker_local.py @@ -1,5 +1,6 @@ import os import pickle +from pathlib import Path import pytest from nplinker.genomics import GCF from nplinker.metabolomics import MolecularFamily @@ -106,3 +107,42 @@ def test_save_data(npl): assert obj1 in mfs else: assert False + + +def test_objects_to_tsv(npl, tmp_path): + tsv_file = tmp_path / "test.tsv" + + # Test objects_to_tsv for BGCs + npl.objects_to_tsv(npl.bgcs, tsv_file) + with open(tsv_file, "r") as f: + lines = f.readlines() + assert len(lines) == len(npl.bgcs) + 1 # +1 for header + + # Test objects_to_tsv for Spectra + npl.objects_to_tsv(npl.spectra, tsv_file) + with open(tsv_file, "r") as f: + lines = f.readlines() + assert len(lines) == len(npl.spectra) + 1 # +1 for header + + +def test_to_tsv(npl): + lg = npl.get_links(npl.spectra[:1], "metcalf") + npl.to_tsv(lg) + + # Check the genomics_data.tsv file + genomics_tsv_file = Path(npl.output_dir) / "genomics_data.tsv" + with open(genomics_tsv_file, "r") as f: + lines = f.readlines() + assert len(lines) == len(npl.bgcs) + 1 # +1 for header + + # Check metabolomics_data.tsv file + metabolomics_tsv_file = Path(npl.output_dir) / "metabolomics_data.tsv" + with open(metabolomics_tsv_file, "r") as f: + lines = f.readlines() + assert len(lines) == len(npl.spectra) + 1 # +1 for header + + # Check the links.tsv file + links_tsv_file = Path(npl.output_dir) / "links.tsv" + with open(links_tsv_file, "r") as f: + lines = f.readlines() + assert len(lines) == len(lg.links) + 1 # +1 for header diff --git a/tests/unit/genomics/test_bgc.py b/tests/unit/genomics/test_bgc.py index 1cf3f401..fd21dd36 100644 --- a/tests/unit/genomics/test_bgc.py +++ b/tests/unit/genomics/test_bgc.py @@ -24,3 +24,79 @@ def test_add_and_detach_parent(): assert bgc.parents == {gcf} bgc.detach_parent(gcf) assert bgc.parents == set() + + +def test_to_dict(): + bgc = BGC("BGC0000001", "Polyketide", "NRP") + bgc.strain = Strain("sample_strain") + bgc.description = "Sample description" + + dict_repr = bgc.to_dict() + assert dict_repr["GCF_id"] == list() + assert dict_repr["GCF_bigscape_class"] == list() + assert dict_repr["BGC_name"] == "BGC0000001" + assert dict_repr["product_prediction"] == ["Polyketide", "NRP"] + assert dict_repr["mibig_bgc_class"] is None + assert dict_repr["description"] == "Sample description" + assert dict_repr["strain_id"] == "sample_strain" + assert dict_repr["antismash_id"] is None + assert dict_repr["antismash_region"] is None + + bgc.add_parent(GCF("1")) + bgc.mibig_bgc_class = [ + "NRP", + ] + bgc.antismash_id = "ABC_0001" + bgc.antismash_region = 1 + dict_repr = bgc.to_dict() + assert dict_repr["GCF_id"] == [ + "1", + ] + assert dict_repr["GCF_bigscape_class"] == list() + assert dict_repr["mibig_bgc_class"] == [ + "NRP", + ] + assert dict_repr["antismash_id"] == "ABC_0001" + assert dict_repr["antismash_region"] == 1 + + +def test__to_string(): + assert BGC._to_string([]) == "" + assert BGC._to_string([1, 2.0, "a"]) == "1, 2.0, a" + assert BGC._to_string(dict()) == "" + assert BGC._to_string({"key1": 1, "key2": "value2"}) == "key1:1, key2:value2" + assert BGC._to_string(None) == "" + assert BGC._to_string(0) == "0" + assert BGC._to_string(0.0) == "0.0" + assert BGC._to_string(100.2) == "100.2" + assert BGC._to_string(False) == "False" + + +def test_to_tabular(): + bgc = BGC("BGC0000001", "Polyketide", "NRP") + bgc.strain = Strain("sample_strain") + bgc.description = "Sample description" + + tabular_repr = bgc.to_tabular() + assert tabular_repr["GCF_id"] == "" + assert tabular_repr["GCF_bigscape_class"] == "" + assert tabular_repr["BGC_name"] == "BGC0000001" + assert tabular_repr["product_prediction"] == "Polyketide, NRP" + assert tabular_repr["mibig_bgc_class"] == "" + assert tabular_repr["description"] == "Sample description" + assert tabular_repr["strain_id"] == "sample_strain" + assert tabular_repr["antismash_id"] == "" + assert tabular_repr["antismash_region"] == "" + + bgc.add_parent(GCF("1")) + bgc.mibig_bgc_class = [ + "NRP", + ] + bgc.antismash_id = "ABC_0001" + bgc.antismash_region = 1 + tabular_repr = bgc.to_tabular() + assert tabular_repr["GCF_id"] == "1" + assert tabular_repr["GCF_bigscape_class"] == "" + assert tabular_repr["mibig_bgc_class"] == "NRP" + assert tabular_repr["antismash_id"] == "ABC_0001" + assert tabular_repr["antismash_region"] == "1" diff --git a/tests/unit/metabolomics/test_spectrum.py b/tests/unit/metabolomics/test_spectrum.py index e5262194..e81bec30 100644 --- a/tests/unit/metabolomics/test_spectrum.py +++ b/tests/unit/metabolomics/test_spectrum.py @@ -69,3 +69,79 @@ def test_has_strain(): spec.strains.add(strain1) assert spec.has_strain(strain1) assert not spec.has_strain(strain2) + + +def test_to_dict(): + """Test the to_dict method.""" + spec = Spectrum("spec1", [100, 200], [0.1, 0.2], 150, 1, 0, {"info": "test"}) + spec.strains.add(Strain("strain1")) + spec.strains.add(Strain("strain2")) + + dict_repr = spec.to_dict() + assert dict_repr["spectrum_id"] == "spec1" + assert dict_repr["num_strains_with_spectrum"] == 2 + assert dict_repr["precursor_mz"] == 150.0 + assert dict_repr["rt"] == 0 + assert dict_repr["molecular_family"] is None + assert dict_repr["gnps_id"] is None + assert dict_repr["gnps_annotations"] == dict() + + # Test with gnps information + spec.gnps_id = "GNPS0001" + spec.gnps_annotations = {"annotation1": "value1"} + + # Test with molecular family + class MockMolecularFamily: + def __init__(self, id): + self.id = id + + spec.family = MockMolecularFamily("family1") + + dict_repr = spec.to_dict() + assert dict_repr["molecular_family"] == "family1" + assert dict_repr["gnps_id"] == "GNPS0001" + assert dict_repr["gnps_annotations"] == {"annotation1": "value1"} + + +def test__to_string(): + assert Spectrum._to_string([]) == "" + assert Spectrum._to_string([1, 2.0, "a"]) == "1, 2.0, a" + assert Spectrum._to_string(dict()) == "" + assert Spectrum._to_string({"key1": 1, "key2": "value2"}) == "key1:1, key2:value2" + assert Spectrum._to_string(None) == "" + assert Spectrum._to_string(0) == "0" + assert Spectrum._to_string(0.0) == "0.0" + assert Spectrum._to_string(100.2) == "100.2" + assert Spectrum._to_string(False) == "False" + + +def test_to_tabular(): + """Test the to_tabular method.""" + spec = Spectrum("spec1", [100, 200], [0.1, 0.2], 150, 1, 0, {"info": "test"}) + spec.strains.add(Strain("strain1")) + spec.strains.add(Strain("strain2")) + + tabular_repr = spec.to_tabular() + assert tabular_repr["spectrum_id"] == "spec1" + assert tabular_repr["num_strains_with_spectrum"] == "2" + assert tabular_repr["precursor_mz"] == "150" + assert tabular_repr["rt"] == "0" + assert tabular_repr["molecular_family"] == "" + assert tabular_repr["gnps_id"] == "" + assert tabular_repr["gnps_annotations"] == "" + + # Test with molecular family + class MockMolecularFamily: + def __init__(self, id): + self.id = id + + spec.family = MockMolecularFamily("family1") + + # Test with gnps information + spec.gnps_id = "GNPS0001" + spec.gnps_annotations = {"key1": "value1", "key2": "value2"} + + tabular_repr = spec.to_tabular() + assert tabular_repr["molecular_family"] == "family1" + assert tabular_repr["gnps_id"] == "GNPS0001" + assert tabular_repr["gnps_annotations"] == "key1:value1, key2:value2" diff --git a/tests/unit/scoring/test_link_graph.py b/tests/unit/scoring/test_link_graph.py index 9f7c9d7d..85ea247c 100644 --- a/tests/unit/scoring/test_link_graph.py +++ b/tests/unit/scoring/test_link_graph.py @@ -112,3 +112,61 @@ def test_filter(gcfs, spectra, score): # test filtering with GCFs and Spectra lg_filtered = lg.filter(u_nodes, v_nodes) assert len(lg_filtered) == 4 + + +def test_link_to_dict(lg, gcfs, spectra, score): + link = lg.links[0] + dict_repr = lg.link_to_dict(link) + assert type(dict_repr) is dict + assert dict_repr["genomic_object_type"] == gcfs[0].__class__.__name__ + assert dict_repr["genomic_object_id"] == gcfs[0].id + assert dict_repr["metabolomic_object_type"] == spectra[0].__class__.__name__ + assert dict_repr["metabolomic_object_id"] == spectra[0].id + assert dict_repr["metcalf_score"] == round(score.value, 2) + assert dict_repr["rosetta_score"] == "" + + +def test__links_to_dicts(lg, gcfs, spectra, score): + # add a second link + lg.add_link(gcfs[1], spectra[1], metcalf=score) + + table_data = lg._links_to_dicts() + assert type(table_data) is list + assert type(table_data[0]) is dict + assert len(table_data) == 2 + assert table_data[0]["index"] == 1 + assert table_data[1]["index"] == 2 + + display_limit = 1 + table_data = lg._links_to_dicts(display_limit) + assert len(table_data) == 1 + + +def test_to_tsv(lg, gcfs, mfs, score, tmp_path): + lg.add_link(gcfs[1], mfs[0], metcalf=score) + + tsv_file = tmp_path / "links.tsv" + lg.to_tsv(tsv_file) + + with open(tsv_file, "r") as f: + lines = f.readlines() + + # Check the header + expected_header_names = [ + "index", + "genomic_object_id", + "genomic_object_type", + "metabolomic_object_id", + "metabolomic_object_type", + "metcalf_score", + "rosetta_score", + ] + assert lines[0].rstrip("\n").split("\t") == expected_header_names + + # Check first link data + expected_line = ["1", "gcf1", "GCF", "spectrum1", "Spectrum", "1.0", ""] + assert lines[1].rstrip("\n").split("\t") == expected_line + + # Check second link data + expected_line = ["2", "gcf2", "GCF", "mf1", "MolecularFamily", "1.0", ""] + assert lines[2].rstrip("\n").split("\t") == expected_line