|
1 | 1 | import json |
| 2 | +import os |
| 3 | +from pathlib import Path |
2 | 4 | import pytest |
3 | 5 | from nplinker.strain import Strain |
4 | 6 | from nplinker.strain import StrainCollection |
| 7 | +from nplinker.strain.utils import create_strain_mappings |
| 8 | +from nplinker.strain.utils import extract_features_metabolome_id |
| 9 | +from nplinker.strain.utils import extract_strain_metadata |
5 | 10 | from nplinker.strain.utils import load_user_strains |
| 11 | +from nplinker.strain.utils import merge_bgcs_features |
6 | 12 | from nplinker.strain.utils import podp_generate_strain_mappings |
7 | 13 |
|
8 | 14 |
|
@@ -88,3 +94,177 @@ def test_podp_generate_strain_mappings(monkeypatch, tmp_path): |
88 | 94 | # check output file |
89 | 95 | sc = StrainCollection.read_json(output_file) |
90 | 96 | assert sc == expected_sc |
| 97 | + |
| 98 | +class FakeAntismashBGCLoader: |
| 99 | + def __init__(self, bgc_path): |
| 100 | + pass |
| 101 | + def get_genome_bgcs_mapping(self): |
| 102 | + # create the bgc_dict for testing |
| 103 | + return { |
| 104 | + "genome_1": ["bgc_1", "bgc_2"], |
| 105 | + "genome_2": ["bgc_3"], |
| 106 | + "genome_3": ["bgc_4", "bgc_5"], |
| 107 | + } |
| 108 | + |
| 109 | +# redefine function to use FakeAntismashBGCLoader instead |
| 110 | +def extract_bgcs_genome_id_test(strain_genome, bgc_path): |
| 111 | + """Extract bgcs based on the strain_genome mapping. |
| 112 | +
|
| 113 | + Args: |
| 114 | + strain_genome: dict that comes from extract_strain_metadata function |
| 115 | + bgc_path: path of the folder of antismash results |
| 116 | + """ |
| 117 | + bgc_loader = FakeAntismashBGCLoader(bgc_path) |
| 118 | + bgc_dict = bgc_loader.get_genome_bgcs_mapping() |
| 119 | + |
| 120 | + strain_bgcs = {} |
| 121 | + |
| 122 | + for strain_id, genome_id in strain_genome.items(): |
| 123 | + if genome_id in bgc_dict: |
| 124 | + strain_bgcs[strain_id] = bgc_dict[genome_id] |
| 125 | + |
| 126 | + return strain_bgcs |
| 127 | + |
| 128 | +def test_extract_bgcs_genome_id(): |
| 129 | + strain_genome = { |
| 130 | + "strain_1": "genome_1", |
| 131 | + "strain_2": "genome_2", |
| 132 | + "strain_3": "genome_3", |
| 133 | + "strain_4": "genome_4", |
| 134 | + } |
| 135 | + # now test everything together |
| 136 | + strain_bgcs = extract_bgcs_genome_id_test(strain_genome, None) # None to avoid path access |
| 137 | + |
| 138 | + expected_strain_bgcs = { |
| 139 | + "strain_1": ["bgc_1", "bgc_2"], |
| 140 | + "strain_2": ["bgc_3"], |
| 141 | + "strain_3": ["bgc_4", "bgc_5"], |
| 142 | + } |
| 143 | + assert strain_bgcs == expected_strain_bgcs |
| 144 | + |
| 145 | +def test_extract_strain_metadata(tmp_path): |
| 146 | + # creation of a tab-separated file for genome |
| 147 | + data_genome = """StrainID\tGenomeID |
| 148 | + strain1\tgenome1 |
| 149 | + strain2\tgenome2""" |
| 150 | + file_path = tmp_path / "strain_metadata_genomics.txt" |
| 151 | + assert file_path.suffix == ".txt", "Input file should have a .txt extension" |
| 152 | + with open(file_path, "w") as f: |
| 153 | + f.write(data_genome) |
| 154 | + |
| 155 | + actual_genome = extract_strain_metadata(file_path) |
| 156 | + assert isinstance(actual_genome, dict), "The output should be a dictionary" |
| 157 | + assert all( |
| 158 | + isinstance(k, str) for k in actual_genome.keys() |
| 159 | + ), "All keys in the output dictionary should be StrainIDs (strings)" |
| 160 | + expected_genome = { |
| 161 | + "strain1": ["genome1"], |
| 162 | + "strain2": ["genome2"], |
| 163 | + } |
| 164 | + assert actual_genome == expected_genome |
| 165 | + |
| 166 | + # creation of a tab-separated file for spectra |
| 167 | + data_spectra = """StrainID\tSpectraID |
| 168 | + strain1\tspectra1.mzML |
| 169 | + strain2\tspectra2.mzML""" |
| 170 | + file_path = tmp_path / "strain_metadata_metabolomics.txt" |
| 171 | + assert file_path.suffix == ".txt", "Input file should have a .txt extension" |
| 172 | + |
| 173 | + with open(file_path, "w") as f: |
| 174 | + f.write(data_spectra) |
| 175 | + |
| 176 | + actual_spectra = extract_strain_metadata(file_path) |
| 177 | + assert isinstance(actual_spectra, dict), "The output should be a dictionary" |
| 178 | + assert all( |
| 179 | + isinstance(k, str) for k in actual_spectra.keys() |
| 180 | + ), "All keys in the output dictionary should be StrainIDs (strings)" |
| 181 | + expected_spectra = { |
| 182 | + "strain1": ["spectra1.mzML"], |
| 183 | + "strain2": ["spectra2.mzML"], |
| 184 | + } |
| 185 | + assert actual_spectra == expected_spectra |
| 186 | + |
| 187 | +def extract_mappings_ms_filename_spectrum_id(_): |
| 188 | + # simulating the output function |
| 189 | + return { |
| 190 | + "spectrum1": ["featureA", "featureB"], |
| 191 | + "spectrum2": ["featureC"], |
| 192 | + "spectrum3": ["featureA", "featureD"], |
| 193 | + } |
| 194 | + |
| 195 | + |
| 196 | +def extract_features_metabolome_id_test(strain_spectra, _): |
| 197 | + """Fake function""" |
| 198 | + features_dict = extract_mappings_ms_filename_spectrum_id( |
| 199 | + None |
| 200 | + ) # None or ignore it completely!!! |
| 201 | + strain_features = {} |
| 202 | + for strain_id, spectra in strain_spectra.items(): |
| 203 | + if strain_id == "StrainID": |
| 204 | + continue |
| 205 | + if isinstance(spectra, str): |
| 206 | + spectra = [spectra] |
| 207 | + features_set = set() |
| 208 | + |
| 209 | + for spectrum in spectra: |
| 210 | + if spectrum in features_dict: |
| 211 | + features_set.update(features_dict[spectrum]) |
| 212 | + |
| 213 | + strain_features[strain_id] = sorted(features_set) |
| 214 | + return strain_features |
| 215 | + |
| 216 | + |
| 217 | +def test_extract_features_metabolome_id(): |
| 218 | + # Step 1: Prepare the strain_spectra data |
| 219 | + strain_spectra = { |
| 220 | + "StrainID": "ExtractID", # This is ignored |
| 221 | + "Strain1": ["15b.mzXML", "12c.mzXML"], |
| 222 | + "Strain2": "15a.mzXML" |
| 223 | + } |
| 224 | + |
| 225 | + # Get the absolute path to the test file |
| 226 | + test_file = Path(__file__).parent.parent / "data/gnps/nodes.tsv" |
| 227 | + |
| 228 | + # Call the function with the dynamically determined path |
| 229 | + result = extract_features_metabolome_id(strain_spectra, str(test_file)) |
| 230 | + |
| 231 | + # Check if the result matches the expected output |
| 232 | + assert len(result) == 2 |
| 233 | + |
| 234 | + |
| 235 | + |
| 236 | +def test_merge_bgcs_features(): |
| 237 | + strain_bgcs_fake= {'strain_1': ['bgc1','bgc2','bgc3','bgc4',]} |
| 238 | + strain_features_fake= {'strain_1': ['feature1','feature2','feature3','feature4']} |
| 239 | + strain_bgcs_features = merge_bgcs_features(strain_bgcs_fake, strain_features_fake) |
| 240 | + expected = {'strain_1': ['bgc1','bgc2','bgc3','bgc4','feature1','feature2','feature3','feature4']} |
| 241 | + assert strain_bgcs_features == expected, f"Test failed! Expected {expected}, but got {strain_bgcs_features}" |
| 242 | + |
| 243 | +def test_create_strain_mappings(): |
| 244 | + strain_bgcs = { |
| 245 | + "Strain1": ["BGC1", "BGC2"], |
| 246 | + "Strain2": ["BGC3"], |
| 247 | + } |
| 248 | + |
| 249 | + strain_features = {"Strain1": ["featureA", "featureB"], "Strain2": ["featureC"]} |
| 250 | + |
| 251 | + expected_output = { |
| 252 | + "version": "1.0", |
| 253 | + "strain_mappings": [ |
| 254 | + {"strain_id": "Strain1", "strain_alias": ["BGC1", "BGC2", "featureA", "featureB"]}, |
| 255 | + {"strain_id": "Strain2", "strain_alias": ["BGC3", "featureC"]}, |
| 256 | + ], |
| 257 | + } |
| 258 | + result = create_strain_mappings( |
| 259 | + strain_bgcs, strain_features, version="1.0", filename="strain_mappings.json" |
| 260 | + ) |
| 261 | + assert result == expected_output |
| 262 | + |
| 263 | + # check if the json file was created |
| 264 | + file_path = "strain_mappings.json" |
| 265 | + assert os.path.exists(file_path), f"File {file_path} was not created." |
| 266 | + with open(file_path, "r") as json_file: |
| 267 | + file_content = json.load(json_file) |
| 268 | + assert ( |
| 269 | + file_content == expected_output |
| 270 | + ), f"File content {file_content} does not match expected output {expected_output}" |
0 commit comments