Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions silo-import/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ requires-python = ">=3.11"
dependencies = [
"zstandard>=0.22,<0.26",
"requests>=2.31,<3.0",
"orjsonl>=0.3,<1.0",
]

[project.scripts]
Expand Down
35 changes: 10 additions & 25 deletions silo-import/src/silo_import/decompressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

from __future__ import annotations

import io
import json
import logging
from dataclasses import dataclass
from pathlib import Path

import zstandard
import orjsonl

logger = logging.getLogger(__name__)

Expand All @@ -18,7 +16,7 @@ class NdjsonAnalysis:
"""Result of analyzing an NDJSON file."""

record_count: int
pipeline_versions: set[int]
pipeline_version: int | None


def analyze_ndjson(path: Path) -> NdjsonAnalysis:
Expand All @@ -35,30 +33,17 @@ def analyze_ndjson(path: Path) -> NdjsonAnalysis:
RuntimeError: If decompression or JSON parsing fails
"""
record_count = 0
pipeline_versions: set[int] = set()
decompressor = zstandard.ZstdDecompressor()
pipeline_version: int | None = None

try:
with path.open("rb") as compressed, decompressor.stream_reader(compressed) as reader:
text_stream = io.TextIOWrapper(reader, encoding="utf-8")
for line in text_stream:
line_stripped = line.strip()
if not line_stripped:
continue
record_count += 1
try:
obj = json.loads(line_stripped)
except json.JSONDecodeError as exc:
msg = f"Invalid JSON record: {exc}"
raise RuntimeError(msg) from exc
for record in orjsonl.stream(path):
record_count += 1
if pipeline_version is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not helping efficiency, we should count lines using a dedicated tool that's fast, like zstdcat and WC

And parse the pipeline version just by reading the first line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other things being equal, using subprocess is quite a bit more brittle than doing things in code, and orjsonl is indeed very quick so I'm a bit uncertain here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but yeah, sorry, agree we don't actually need to parse all the JSON at all)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is definitely an improvement over the current state - but potentially a subprocess with jq is more efficient

pipeline_version = record.get("metadata", {}).get("pipelineVersion") # type: ignore

metadata = obj.get("metadata") if isinstance(obj, dict) else None
if isinstance(metadata, dict):
pipeline_version = metadata.get("pipelineVersion")
if pipeline_version:
pipeline_versions.add(int(pipeline_version))
except zstandard.ZstdError as exc:
except Exception as exc:
msg = f"Failed to decompress {path}: {exc}"
logger.error(msg)
raise RuntimeError(msg) from exc

return NdjsonAnalysis(record_count=record_count, pipeline_versions=pipeline_versions)
return NdjsonAnalysis(record_count=record_count, pipeline_version=pipeline_version)
4 changes: 2 additions & 2 deletions silo-import/src/silo_import/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DownloadResult:
directory: Path
data_path: Path
etag: str
pipeline_versions: set[int]
pipeline_version: int | None


def _download_file(
Expand Down Expand Up @@ -202,7 +202,7 @@ def download_release(
directory=download_dir,
data_path=data_path,
etag=etag_value,
pipeline_versions=analysis.pipeline_versions,
pipeline_version=analysis.pipeline_version,
)

except (
Expand Down
9 changes: 2 additions & 7 deletions silo-import/src/silo_import/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,20 @@


def update_lineage_definitions(
pipeline_versions: set[int],
pipeline_version: int | None,
config: ImporterConfig,
paths: ImporterPaths,
) -> None:
if not config.lineage_definitions:
logger.info("LINEAGE_DEFINITIONS not provided; skipping lineage configuration")
return

if not pipeline_versions:
if not pipeline_version:
# required for dummy organisms
logger.info("No pipeline version found; writing empty lineage definitions")
write_text(paths.lineage_definition_file, "{}\n")
return

if len(pipeline_versions) > 1:
msg = "Multiple pipeline versions found in released data"
raise RuntimeError(msg)

pipeline_version = next(iter(pipeline_versions))
lineage_url: str | None = config.lineage_definitions.get(int(pipeline_version))
if not lineage_url:
msg = f"No lineage definition URL configured for pipeline version {pipeline_version}"
Expand Down
2 changes: 1 addition & 1 deletion silo-import/src/silo_import/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def run_once(self) -> None:
return

try:
update_lineage_definitions(download.pipeline_versions, self.config, self.paths)
update_lineage_definitions(download.pipeline_version, self.config, self.paths)
except Exception:
logger.exception("Failed to download lineage definitions; cleaning up input")
safe_remove(self.paths.silo_input_data_path)
Expand Down