|
2 | 2 | import os |
3 | 3 | import subprocess |
4 | 4 | import sys |
5 | | -# import pytest |
| 5 | +import pytest |
6 | 6 |
|
7 | 7 | testdir = os.path.dirname(os.path.realpath(__file__)) |
8 | 8 |
|
@@ -61,3 +61,60 @@ def test_mock_pairsam(): |
61 | 61 | assert int(cur_pair[3]) >= int(prev_pair[3]) |
62 | 62 |
|
63 | 63 | prev_pair = cur_pair |
| 64 | + |
| 65 | + |
| 66 | +def test_custom_column_warning(tmpdir): |
| 67 | + """Test that a warning is emitted when sorting with a custom column not in pairsam format.""" |
| 68 | + # Create a temporary .pairsam file with custom_col in the header |
| 69 | + mock_pairsam_path = os.path.join(tmpdir, "test.pairsam") |
| 70 | + output_path = os.path.join(tmpdir, "sorted_output.pairsam") |
| 71 | + |
| 72 | + # Write a minimal .pairsam file |
| 73 | + with open(mock_pairsam_path, "w") as f: |
| 74 | + f.write("## pairs format v1.0\n") |
| 75 | + f.write("#columns: readID chr1 pos1 chr2 pos2 strand1 strand2 pair_type custom_col\n") |
| 76 | + f.write("read1\tchr1\t100\tchr2\t200\t+\t-\tUU\t42\n") |
| 77 | + f.write("read2\tchr2\t150\tchr1\t250\t-\t+\tUU\t99\n") |
| 78 | + |
| 79 | + # Run sort command with a custom column |
| 80 | + cmd = [ |
| 81 | + "python", |
| 82 | + "-m", |
| 83 | + "pairtools", |
| 84 | + "sort", |
| 85 | + mock_pairsam_path, |
| 86 | + "--output", |
| 87 | + output_path, |
| 88 | + "--extra-col", |
| 89 | + "custom_col", |
| 90 | + ] |
| 91 | + |
| 92 | + # Capture stderr to check for warning |
| 93 | + process = subprocess.Popen( |
| 94 | + cmd, |
| 95 | + stderr=subprocess.PIPE, |
| 96 | + stdout=subprocess.PIPE, |
| 97 | + universal_newlines=True, |
| 98 | + ) |
| 99 | + stdout, stderr = process.communicate() |
| 100 | + |
| 101 | + # Check that the command completed successfully |
| 102 | + assert process.returncode == 0, f"Command failed: {stderr}" |
| 103 | + |
| 104 | + # Verify warning was emitted |
| 105 | + assert "Column 'custom_col' not found in pairsam format definitions" in stderr |
| 106 | + assert "Assuming string type for sorting" in stderr |
| 107 | + |
| 108 | + # Verify output file exists and has content |
| 109 | + assert os.path.exists(output_path) |
| 110 | + with open(output_path, "r") as f: |
| 111 | + output_lines = f.readlines() |
| 112 | + assert len(output_lines) > 0 |
| 113 | + |
| 114 | + # Check that the output is sorted (basic check on header and body) |
| 115 | + output_header = [line.strip() for line in output_lines if line.startswith("#")] |
| 116 | + output_body = [ |
| 117 | + line.strip() for line in output_lines if not line.startswith("#") and line.strip() |
| 118 | + ] |
| 119 | + assert any(line.startswith("#sorted") for line in output_header) |
| 120 | + assert len(output_body) > 0 |
0 commit comments