diff --git a/silo-import/pyproject.toml b/silo-import/pyproject.toml index 4a5586fde1..a83aba8c51 100644 --- a/silo-import/pyproject.toml +++ b/silo-import/pyproject.toml @@ -5,6 +5,7 @@ requires-python = ">=3.11" dependencies = [ "zstandard>=0.22,<0.26", "requests>=2.31,<3.0", + "orjsonl>=0.3,<1.0", ] [project.scripts] diff --git a/silo-import/src/silo_import/decompressor.py b/silo-import/src/silo_import/decompressor.py index ba2053757c..2ae80fdbdf 100644 --- a/silo-import/src/silo_import/decompressor.py +++ b/silo-import/src/silo_import/decompressor.py @@ -2,13 +2,11 @@ from __future__ import annotations -import io -import json import logging from dataclasses import dataclass from pathlib import Path -import zstandard +import orjsonl logger = logging.getLogger(__name__) @@ -18,7 +16,7 @@ class NdjsonAnalysis: """Result of analyzing an NDJSON file.""" record_count: int - pipeline_versions: set[int] + pipeline_version: int | None def analyze_ndjson(path: Path) -> NdjsonAnalysis: @@ -35,30 +33,17 @@ def analyze_ndjson(path: Path) -> NdjsonAnalysis: RuntimeError: If decompression or JSON parsing fails """ record_count = 0 - pipeline_versions: set[int] = set() - decompressor = zstandard.ZstdDecompressor() + pipeline_version: int | None = None try: - with path.open("rb") as compressed, decompressor.stream_reader(compressed) as reader: - text_stream = io.TextIOWrapper(reader, encoding="utf-8") - for line in text_stream: - line_stripped = line.strip() - if not line_stripped: - continue - record_count += 1 - try: - obj = json.loads(line_stripped) - except json.JSONDecodeError as exc: - msg = f"Invalid JSON record: {exc}" - raise RuntimeError(msg) from exc + for record in orjsonl.stream(path): + record_count += 1 + if pipeline_version is None: + pipeline_version = record.get("metadata", {}).get("pipelineVersion") # type: ignore - metadata = obj.get("metadata") if isinstance(obj, dict) else None - if isinstance(metadata, dict): - pipeline_version = metadata.get("pipelineVersion") - if pipeline_version: - pipeline_versions.add(int(pipeline_version)) - except zstandard.ZstdError as exc: + except Exception as exc: msg = f"Failed to decompress {path}: {exc}" + logger.error(msg) raise RuntimeError(msg) from exc - return NdjsonAnalysis(record_count=record_count, pipeline_versions=pipeline_versions) + return NdjsonAnalysis(record_count=record_count, pipeline_version=pipeline_version) diff --git a/silo-import/src/silo_import/download_manager.py b/silo-import/src/silo_import/download_manager.py index 313e72b8fb..0df45cd9c1 100644 --- a/silo-import/src/silo_import/download_manager.py +++ b/silo-import/src/silo_import/download_manager.py @@ -45,7 +45,7 @@ class DownloadResult: directory: Path data_path: Path etag: str - pipeline_versions: set[int] + pipeline_version: int | None def _download_file( @@ -202,7 +202,7 @@ def download_release( directory=download_dir, data_path=data_path, etag=etag_value, - pipeline_versions=analysis.pipeline_versions, + pipeline_version=analysis.pipeline_version, ) except ( diff --git a/silo-import/src/silo_import/lineage.py b/silo-import/src/silo_import/lineage.py index bb4c5e55f4..35abbdd236 100644 --- a/silo-import/src/silo_import/lineage.py +++ b/silo-import/src/silo_import/lineage.py @@ -13,7 +13,7 @@ def update_lineage_definitions( - pipeline_versions: set[int], + pipeline_version: int | None, config: ImporterConfig, paths: ImporterPaths, ) -> None: @@ -21,17 +21,12 @@ def update_lineage_definitions( logger.info("LINEAGE_DEFINITIONS not provided; skipping lineage configuration") return - if not pipeline_versions: + if not pipeline_version: # required for dummy organisms logger.info("No pipeline version found; writing empty lineage definitions") write_text(paths.lineage_definition_file, "{}\n") return - if len(pipeline_versions) > 1: - msg = "Multiple pipeline versions found in released data" - raise RuntimeError(msg) - - pipeline_version = next(iter(pipeline_versions)) lineage_url: str | None = config.lineage_definitions.get(int(pipeline_version)) if not lineage_url: msg = f"No lineage definition URL configured for pipeline version {pipeline_version}" diff --git a/silo-import/src/silo_import/runner.py b/silo-import/src/silo_import/runner.py index 818edf49e0..9bd35ab991 100644 --- a/silo-import/src/silo_import/runner.py +++ b/silo-import/src/silo_import/runner.py @@ -79,7 +79,7 @@ def run_once(self) -> None: return try: - update_lineage_definitions(download.pipeline_versions, self.config, self.paths) + update_lineage_definitions(download.pipeline_version, self.config, self.paths) except Exception: logger.exception("Failed to download lineage definitions; cleaning up input") safe_remove(self.paths.silo_input_data_path)