Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ena-submission/cchf_ena_submission_list.json

Large diffs are not rendered by default.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ena-submission/config/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ time_between_iterations: 10
min_between_github_requests: 2
min_between_ena_checks: 5
ena_http_timeout_seconds: 60
backend_http_timeout_seconds: 3600
ena_public_search_timeout_seconds: 120
ncbi_public_search_timeout_seconds: 120
ena_http_get_retry_attempts: 3
Expand Down
5 changes: 5 additions & 0 deletions ena-submission/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ dependencies:
- python-dotenv=1.2.1
- pytest=8.4.2
- unidecode=1.3.8
<<<<<<< HEAD
- tqdm
=======
- orjson
>>>>>>> origin/main
- orjsonl
- zstandard
- deepdiff
88 changes: 58 additions & 30 deletions ena-submission/src/ena_deposition/call_loculus.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import json
import logging
import os
import shutil
import tempfile
import uuid
from collections.abc import Iterator
from http import HTTPMethod
from typing import Any

import orjson
import orjsonl
import requests
from tqdm import tqdm

from .config import Config
from .loculus_models import Group, GroupDetails

logger = logging.getLogger(__name__)

# Constants for error logging truncation
MAX_LOG_LINE_LENGTH = 400
LOG_SNIPPET_LENGTH = 200


def backend_url(config: Config) -> str:
"""Right strip the URL to remove trailing slashes"""
Expand Down Expand Up @@ -152,40 +160,60 @@ def fetch_released_entries(config: Config, organism: str) -> Iterator[dict[str,

request_id = str(uuid.uuid4())
url = f"{organism_url(config, organism)}/get-released-data"
params = {"compression": "zstd"}

headers = {
"Content-Type": "application/json",
"X-Request-ID": request_id,
}
logger.info(f"Fetching released data from {url} with request id {request_id}")

with requests.get(url, headers=headers, timeout=3600, stream=True) as response:
response.raise_for_status()
for line_no, line in enumerate(response.iter_lines(chunk_size=65536), start=1):
if not line:
continue

try:
full_json = orjson.loads(line)
except orjson.JSONDecodeError as e:
head = line[:200]
tail = line[-200:] if len(line) > 200 else line # noqa: PLR2004

error_msg = (
f"Invalid NDJSON from {url}\n"
f"request_id={request_id}\n"
f"line={line_no}\n"
f"bytes={len(line)}\n"
f"json_error={e}\n"
f"head={head!r}\n"
f"tail={tail!r}"
)

logger.error(error_msg)
raise RuntimeError(error_msg) from e

yield {
k: v
for k, v in full_json.items()
if k in {"metadata", "unalignedNucleotideSequences"}
}
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, "downloaded_data.zst")

with requests.get(
url,
headers=headers,
params=params,
timeout=config.backend_http_timeout_seconds,
stream=True,
) as response:
response.raise_for_status()

# Ensure we get raw bytes to preserve compression
response.raw.decode_content = False

with open(temp_file_path, "wb") as f:
shutil.copyfileobj(response.raw, f)

try:
wanted_keys = {"metadata", "unalignedNucleotideSequences"}
with tqdm(orjsonl.stream(temp_file_path), unit=" records", mininterval=2.0) as pbar:
for full_json in pbar:
yield {k: v for k, v in full_json.items() if k in wanted_keys}
except orjson.JSONDecodeError as e:
line_content = getattr(e, "doc", "")
if len(line_content) > MAX_LOG_LINE_LENGTH:
if isinstance(line_content, bytes):
line_content = (
line_content[:LOG_SNIPPET_LENGTH]
+ b"..."
+ line_content[-LOG_SNIPPET_LENGTH:]
)
else:
line_content = (
line_content[:LOG_SNIPPET_LENGTH]
+ "..."
+ line_content[-LOG_SNIPPET_LENGTH:]
)

error_msg = (
f"Invalid NDJSON from {url}\n"
f"request_id={request_id}\n"
f"line_no={pbar.n + 1}\n"
f"json_error={e}\n"
f"line={line_content!r}"
)

logger.error(error_msg)
raise RuntimeError(error_msg) from e
1 change: 1 addition & 0 deletions ena-submission/src/ena_deposition/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Config(BaseModel):
set_alias_suffix: str | None = None # Add to test revisions in dev

ena_http_timeout_seconds: int = 60
backend_http_timeout_seconds: int = 3600
ena_public_search_timeout_seconds: int = 120
ncbi_public_search_timeout_seconds: int = 120
ena_http_get_retry_attempts: int = 3
Expand Down
Loading