diff --git a/pyproject.toml b/pyproject.toml index 4245af6d..233c1867 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ dependencies = [ "curies>=0.10.6", "python-dateutil", "networkx>=3.4", + "httpx[http2]", # Resource Downloaders "drugbank_downloader", "chembl_downloader", diff --git a/src/pyobo/sources/antibodyregistry.py b/src/pyobo/sources/antibodyregistry.py index 04e0bd8f..f8388d1b 100644 --- a/src/pyobo/sources/antibodyregistry.py +++ b/src/pyobo/sources/antibodyregistry.py @@ -1,19 +1,21 @@ -"""Converter for the Antibody Registry. - -TODO use API https://www.antibodyregistry.org/api/antibodies?page=1&size=100 -""" +"""Converter for the Antibody Registry.""" +import json import logging from collections.abc import Iterable, Mapping -import pandas as pd +import lxml.html +import pystow +from httpx import Client, Timeout, Cookies, URL as HTTPX_URL, RequestError +from pystow import ConfigError + from bioregistry.utils import removeprefix from tqdm.auto import tqdm +from curies import Prefix from pyobo import Obo, Reference, Term -from pyobo.api.utils import get_version +from pyobo.constants import RAW_MODULE from pyobo.struct.typedef import has_citation -from pyobo.utils.path import ensure_df __all__ = [ "AntibodyRegistryGetter", @@ -22,25 +24,12 @@ logger = logging.getLogger(__name__) PREFIX = "antibodyregistry" -URL = "http://antibodyregistry.org/php/fileHandler.php" -CHUNKSIZE = 20_000 - - -def get_chunks(*, force: bool = False, version: str | None = None) -> pd.DataFrame: - """Get the BioGRID identifiers mapping dataframe.""" - if version is None: - version = get_version(PREFIX) - df = ensure_df( - PREFIX, - url=URL, - name="results.csv", - force=force, - version=version, - sep=",", - chunksize=CHUNKSIZE, - usecols=[0, 1, 2, 3, 5], - ) - return df +BASE_URL = "https://www.antibodyregistry.org/api/antibodies" +PAGE_SIZE = 10_000 +TIMEOUT = 180.0 +RAW_DATA_MODULE = RAW_MODULE.module(PREFIX) +RAW_DATA_PARTS = RAW_DATA_MODULE.module("parts") +RAW_CACHE = RAW_DATA_MODULE.base.joinpath("results.json") class AntibodyRegistryGetter(Obo): @@ -48,10 +37,30 @@ class AntibodyRegistryGetter(Obo): ontology = bioversions_key = PREFIX typedefs = [has_citation] + dynamic_version = True def iter_terms(self, force: bool = False) -> Iterable[Term]: """Iterate over terms in the ontology.""" - return iter_terms(force=force, version=self._version_or_raise) + return iter_terms() + + +def iter_terms(force: bool = False, retries: int = 10) -> Iterable[Term]: + """Get Antibody Registry terms.""" + raw_data = [] + for i in range(max(1, retries)): + try: + raw_data = get_data(force=force) + break + except RequestError: + if i == retries - 1: + logger.error("Too many retries failed for Antibody Registry.") + raise + continue + + needs_curating = set() + for item in raw_data: + term = _get_term(item, needs_curating=needs_curating) + yield term # TODO there are tonnnnsss of mappings to be curated @@ -73,32 +82,171 @@ def iter_terms(self, force: bool = False) -> Iterable[Term]: } -def iter_terms(*, force: bool = False, version: str | None = None) -> Iterable[Term]: - """Iterate over antibodies.""" - chunks = get_chunks(force=force, version=version) - needs_curating = set() - # df['vendor'] = df['vendor'].map(bioregistry.normalize_prefix) - it = tqdm(chunks, desc=f"{PREFIX}, chunkssize={CHUNKSIZE}") - for chunk in it: - for identifier, name, vendor, catalog_number, defining_citation in chunk.values: - if pd.isna(identifier): +def _get_term(json_data: dict[str, None | str | list[str]], needs_curating: set) -> Term: + # todo: makes use of more fields in the JSON? All fields: + # catalogNum, vendorName, clonality, epitope, comments, url, abName, + # abTarget, cloneId, commercialType, definingCitation, productConjugate, + # productForm, productIsotype, sourceOrganism, targetSpecies, uniprotId, + # applications, kitContents, abTargetEntrezId, abTargetUniprotId, + # numOfCitation, accession, status, feedback, abId, catAlt, curateTime, + # curatorComment, discDate, insertTime, targetModification, + # targetSubregion, vendorId, lastEditTime, ix, showLink, vendorUrl + identifier = json_data["abId"] + name = json_data["abName"] + vendor = json_data["vendorName"] + catalog_number = json_data["catalogNum"] + defining_citation = json_data["definingCitation"] + term = Term.from_triple(prefix=PREFIX, identifier=identifier, name=name) + + if vendor not in MAPPING: + if vendor not in needs_curating: + needs_curating.add(vendor) + if all(x not in vendor for x in SKIP): + logger.debug(f"! vendor {vendor} for {identifier}") + elif MAPPING[vendor] is not None and catalog_number: + term.append_xref((MAPPING[vendor], catalog_number)) + if defining_citation: + for pubmed_id in defining_citation.split(","): + pubmed_id = removeprefix(pubmed_id.strip(), prefix="PMID:").strip() + if not pubmed_id: + continue + term.append_provenance( + Reference(prefix=Prefix("pubmed"), identifier=pubmed_id, name=None) + ) + return term + + +def get_data( + force: bool = False, + timeout: float = TIMEOUT, +) -> list[dict[str, str | None | list[str]]]: + # identifier, name, vendor, catalog_number, defining_citation + """Iterate over terms in the Antibody Registry.""" + if RAW_CACHE.is_file() and not force: + # load cache + with open(RAW_CACHE) as file: + return json.load(file) + + # Check for existing parts, sort in ascending order of filename + parts = sorted((p for p in RAW_DATA_PARTS.base.glob("page*json")), key=lambda x: x.name) + + # Get first non-existing page, unless we force + existing_pages = {int(p.stem.removeprefix("page")) for p in parts} if not force else set() + if not existing_pages: + # No existing pages, start from page 1 + first_page = 1 + else: + # Find the first missing page + logger.info(f"Found {len(existing_pages)} existing pages.") + first_page = min( + set(range(1, max(existing_pages) + 1)) - existing_pages or + # if all pages exist, return -1 + {-1} + ) + + if first_page == -1: + logger.info("All pages exist, returning cached data.") + cache = [] + for page in RAW_DATA_PARTS.base.glob("page*json"): + with page.open("r") as file: + cache.extend(json.load(file)) + with RAW_CACHE.open("w") as file: + json.dump(cache, file) + return cache + + # Get first missing page + cookies = antibodyregistry_login(timeout=timeout) + with Client(http2=True, timeout=Timeout(timeout)) as client: + r = client.get( + HTTPX_URL(BASE_URL), + cookies=cookies, + params={"page": first_page, "size": PAGE_SIZE}, + ) + r.raise_for_status() + res_json = r.json() + + # Get max page and calculate total pages left after first page + total_count = res_json["totalElements"] + total_pages = total_count // PAGE_SIZE + (1 if total_count % PAGE_SIZE else 0) + # Check if the first page has the expected number of items (unless it's the last page) + if len(res_json["items"]) != PAGE_SIZE and first_page != total_pages: + logger.error("The first page does not have the expected number of items.") + raise ValueError( + f"Number of items on the first page is not {PAGE_SIZE}. " + f"Recommending reduce page_size." + ) + + # Write the first page to the cache + with RAW_DATA_PARTS.base.joinpath(f"page{first_page}.json").open("w") as file: + json.dump(res_json["items"], file) + + # Now, iterate over the remaining pages + for page in tqdm( + range(1, total_pages+1), + desc=PREFIX, + total=total_pages, + ): + # Skip if the page already exists, unless we are forcing + part_file = RAW_DATA_PARTS.base.joinpath(f"page{page}.json") + if part_file.is_file() and not force: continue - identifier = removeprefix(identifier, "AB_") - term = Term.from_triple(PREFIX, identifier, name if pd.notna(name) else None) - if vendor not in MAPPING: - if vendor not in needs_curating: - needs_curating.add(vendor) - if all(x not in vendor for x in SKIP): - logger.debug(f"! vendor {vendor} for {identifier}") - elif MAPPING[vendor] is not None and pd.notna(catalog_number) and catalog_number: - term.append_xref((MAPPING[vendor], catalog_number)) # type:ignore - if defining_citation and pd.notna(defining_citation): - for pubmed_id in defining_citation.split(","): - pubmed_id = pubmed_id.strip() - if not pubmed_id: - continue - term.append_provenance(Reference(prefix="pubmed", identifier=pubmed_id)) - yield term + + r = client.get( + HTTPX_URL(BASE_URL), + cookies=cookies, + params={"page": page, "size": PAGE_SIZE}, + ) + r.raise_for_status() + res_json = r.json() + with part_file.open("w") as file: + json.dump(res_json["items"], file) + + # Now merge all the pages and write to the cache + cache = [] + for page in RAW_DATA_PARTS.base.glob("page*json"): + with page.open("r") as file: + cache.extend(json.load(file)) + with RAW_CACHE.open("w") as file: + json.dump(cache, file) + return cache + + +def antibodyregistry_login(timeout: float = TIMEOUT) -> Cookies: + """Login to Antibody Registry.""" + logger.info("Logging in to Antibody Registry") + try: + username = pystow.get_config("pyobo", "antibodyregistry_username", raise_on_missing=True) + password = pystow.get_config("pyobo", "antibodyregistry_password", raise_on_missing=True) + except ConfigError: + logger.error("You must register at https://www.antibodyregistry.org to use this source.") + raise + + with Client( + follow_redirects=True, + http2=True, + timeout=Timeout(timeout), + ) as client: + r = client.get(HTTPX_URL("https://www.antibodyregistry.org/login")) + r.raise_for_status() + + cookies = r.cookies + tree = lxml.html.fromstring(r.content) + login_post_url = HTTPX_URL(tree.xpath('//form[@id="kc-form-login"]/@action')[0]) + + r = client.post( + login_post_url, + cookies=cookies, + data={ + "username": username, + "password": password, + "rememberMe": "on", + "credentialId": "", + }, + ) + r.raise_for_status() + + cookies = r.history[1].cookies + return cookies if __name__ == "__main__":