diff --git a/mardi_importer/mardi_importer/arxiv/ArxivPublication.py b/mardi_importer/mardi_importer/arxiv/ArxivPublication.py index ad9037f..770125b 100644 --- a/mardi_importer/mardi_importer/arxiv/ArxivPublication.py +++ b/mardi_importer/mardi_importer/arxiv/ArxivPublication.py @@ -13,42 +13,167 @@ from mardi_importer.logger.logging_utils import get_logger_safe -taxonomy = ["cs.AI", "cs.AR", "cs.CC", "cs.CE", "cs.CG", "cs.CL", "cs.CR", \ - "cs.CV", "cs.CY", "cs.DB", "cs.DC", "cs.DL", "cs.DM", "cs.DS", \ - "cs.ET", "cs.FL", "cs.GL", "cs.GR", "cs.GT", "cs.HC", "cs.IR", \ - "cs.IT", "cs.LG", "cs.LO", "cs.MA", "cs.MM", "cs.MS", "cs.NA", \ - "cs.NE", "cs.NI", "cs.OH", "cs.OS", "cs.PF", "cs.PL", "cs.RO", \ - "cs.SC", "cs.SD", "cs.SE", "cs.SI", "cs.SY", "econ.EM",\ - "econ.GN", "econ.TH", "eess.AS", "eess.IV", "eess.SP", \ - "eess.SY", "math.AC", "math.AG", "math.AP", "math.AT", \ - "math.CA", "math.CO", "math.CT", "math.CV", "math.DG", \ - "math.DS", "math.FA", "math.GM", "math.GN", "math.GR", \ - "math.GT", "math.HO", "math.IT", "math.KT", "math.LO", \ - "math.MG", "math.MP", "math.NA", "math.NT", "math.OA", \ - "math.OC", "math.PR", "math.QA", "math.RA", "math.RT", \ - "math.SG", "math.SP", "math.ST", "astro-ph.CO", "astro-ph.EP", \ - "astro-ph.GA", "astro-ph.HE", "astro-ph.IM", "astro-ph.SR", \ - "cond-mat.dis-nn", "cond-mat.mes-hall", "cond-mat.mtrl-sci", \ - "cond-mat.other", "cond-mat.quant-gas", "cond-mat.soft", \ - "cond-mat.stat-mech", "cond-mat.str-el", "cond-mat.supr-con", \ - "gr-qc", "hep-ex", "hep-lat", "hep-ph", "hep-th", "math-ph", \ - "nlin.AO", "nlin.CD", "nlin.CG", "nlin.PS", "nlin.SI", \ - "nucl-ex", "nucl-th", "physics.acc-ph", "physics.ao-ph", \ - "physics.app-ph", "physics.atm-clus", "physics.atom-ph", \ - "physics.bio-ph", "physics.chem-ph", "physics.class-ph", \ - "physics.comp-ph", "physics.data-an", "physics.ed-ph", \ - "physics.flu-dyn", "physics.gen-ph", "physics.geo-ph", \ - "physics.hist-ph", "physics.ins-det", "physics.med-ph", \ - "physics.optics", "physics.plasm-ph", "physics.pop-ph", \ - "physics.soc-ph", "physics.space-ph", "quant-ph", "q-bio.BM", \ - "q-bio.CB", "q-bio.GN", "q-bio.MN", "q-bio.NC", "q-bio.OT", \ - "q-bio.PE", "q-bio.QM", "q-bio.SC", "q-bio.TO", "q-fin.CP", \ - "q-fin.EC", "q-fin.GN", "q-fin.MF", "q-fin.PM", "q-fin.PR", \ - "q-fin.RM", "q-fin.ST", "q-fin.TR", "stat.AP", "stat.CO", \ - "stat.ME", "stat.ML", "stat.OT", "stat.TH"] +taxonomy = [ + "cs.AI", + "cs.AR", + "cs.CC", + "cs.CE", + "cs.CG", + "cs.CL", + "cs.CR", + "cs.CV", + "cs.CY", + "cs.DB", + "cs.DC", + "cs.DL", + "cs.DM", + "cs.DS", + "cs.ET", + "cs.FL", + "cs.GL", + "cs.GR", + "cs.GT", + "cs.HC", + "cs.IR", + "cs.IT", + "cs.LG", + "cs.LO", + "cs.MA", + "cs.MM", + "cs.MS", + "cs.NA", + "cs.NE", + "cs.NI", + "cs.OH", + "cs.OS", + "cs.PF", + "cs.PL", + "cs.RO", + "cs.SC", + "cs.SD", + "cs.SE", + "cs.SI", + "cs.SY", + "econ.EM", + "econ.GN", + "econ.TH", + "eess.AS", + "eess.IV", + "eess.SP", + "eess.SY", + "math.AC", + "math.AG", + "math.AP", + "math.AT", + "math.CA", + "math.CO", + "math.CT", + "math.CV", + "math.DG", + "math.DS", + "math.FA", + "math.GM", + "math.GN", + "math.GR", + "math.GT", + "math.HO", + "math.IT", + "math.KT", + "math.LO", + "math.MG", + "math.MP", + "math.NA", + "math.NT", + "math.OA", + "math.OC", + "math.PR", + "math.QA", + "math.RA", + "math.RT", + "math.SG", + "math.SP", + "math.ST", + "astro-ph.CO", + "astro-ph.EP", + "astro-ph.GA", + "astro-ph.HE", + "astro-ph.IM", + "astro-ph.SR", + "cond-mat.dis-nn", + "cond-mat.mes-hall", + "cond-mat.mtrl-sci", + "cond-mat.other", + "cond-mat.quant-gas", + "cond-mat.soft", + "cond-mat.stat-mech", + "cond-mat.str-el", + "cond-mat.supr-con", + "gr-qc", + "hep-ex", + "hep-lat", + "hep-ph", + "hep-th", + "math-ph", + "nlin.AO", + "nlin.CD", + "nlin.CG", + "nlin.PS", + "nlin.SI", + "nucl-ex", + "nucl-th", + "physics.acc-ph", + "physics.ao-ph", + "physics.app-ph", + "physics.atm-clus", + "physics.atom-ph", + "physics.bio-ph", + "physics.chem-ph", + "physics.class-ph", + "physics.comp-ph", + "physics.data-an", + "physics.ed-ph", + "physics.flu-dyn", + "physics.gen-ph", + "physics.geo-ph", + "physics.hist-ph", + "physics.ins-det", + "physics.med-ph", + "physics.optics", + "physics.plasm-ph", + "physics.pop-ph", + "physics.soc-ph", + "physics.space-ph", + "quant-ph", + "q-bio.BM", + "q-bio.CB", + "q-bio.GN", + "q-bio.MN", + "q-bio.NC", + "q-bio.OT", + "q-bio.PE", + "q-bio.QM", + "q-bio.SC", + "q-bio.TO", + "q-fin.CP", + "q-fin.EC", + "q-fin.GN", + "q-fin.MF", + "q-fin.PM", + "q-fin.PR", + "q-fin.RM", + "q-fin.ST", + "q-fin.TR", + "stat.AP", + "stat.CO", + "stat.ME", + "stat.ML", + "stat.OT", + "stat.TH", +] + @dataclass -class Arxiv(): +class Arxiv: arxiv_id: str _title: str = None _abstract: str = None @@ -67,7 +192,7 @@ def __post_init__(self) -> None: ) if self.api is None: - self.api = Importer.get_api('arxiv') + self.api = Importer.get_api("arxiv") @property def title(self) -> str: @@ -78,7 +203,7 @@ def title(self) -> str: """ if not self._title: title = self.entry.title - self._title = title.replace('\n', ' ') + self._title = title.replace("\n", " ") return self._title @property @@ -90,7 +215,7 @@ def abstract(self) -> str: """ if not self._abstract: abstract = self.entry.summary - self._abstract = abstract.replace('\n', ' ') + self._abstract = abstract.replace("\n", " ") return self._abstract @property @@ -113,7 +238,7 @@ def authors(self) -> List[Author]: """Get the list of authors for the entry Returns: - List[Author]: + List[Author]: The list of authors for the entry, which can include an arXiv author ID, if found """ @@ -132,7 +257,7 @@ def arxiv_classification(self) -> List[str]: """ if not self._arxiv_classification: for t in self.entry.tags: - self._arxiv_classification.append(t['term']) + self._arxiv_classification.append(t["term"]) return self._arxiv_classification def disambiguate_autor(self, name: str) -> Author: @@ -149,57 +274,67 @@ def disambiguate_autor(self, name: str) -> Author: Author: Author object with the arXiv author ID, if found. """ # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) - author_split = name.lower().split(' ') + author_split = name.lower().split(" ") finish = False i = 1 while not finish: arxiv_author_id = "_".join([author_split[-1], author_split[0][0], str(i)]) i += 1 headers = { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET', - 'Access-Control-Allow-Headers': 'Content-Type', - 'Access-Control-Max-Age': '3600', - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0' + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET", + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Max-Age": "3600", + "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0", } base_url = "https://arxiv.org/a/" req = requests.get(base_url + arxiv_author_id + ".html", headers=headers) - soup = BeautifulSoup(req.content, 'html.parser') + soup = BeautifulSoup(req.content, "html.parser") try: - author_html = soup.find("div", id="content").find("h1").get_text(strip=True) + author_html = ( + soup.find("div", id="content").find("h1").get_text(strip=True) + ) except AttributeError: author_html = "Not Found" if author_html == "Not Found": finish = True else: - author_html = author_html.replace('\'s articles on arXiv', '') - author_initials = author_html.split(' ') + author_html = author_html.replace("'s articles on arXiv", "") + author_initials = author_html.split(" ") author_initials = author_initials[0][0] + ". " + author_initials[-1] if author_html == name or author_initials == name: articles = soup.find_all("div", class_="list-title") for article in articles: article = article.get_text() - article = article.replace('\n', '').replace('Title: ', '').replace(' ',' ') + article = ( + article.replace("\n", "") + .replace("Title: ", "") + .replace(" ", " ") + ) if article == self.title: finish = True orcid = self.get_orcid(soup) # Return using the resolved factory - return author_factory(self.api, - name=name, - orcid=orcid, - arxiv_id=arxiv_author_id) + return author_factory( + self.api, + name=name, + orcid=orcid, + arxiv_id=arxiv_author_id, + ) # Fallback return using the resolved factory return author_factory(self.api, name=name) @@ -208,16 +343,20 @@ def disambiguate_autor(self, name: str) -> Author: def arxiv_api(arxiv_id: str) -> FeedParserDict: log = get_logger_safe(__name__) - clean_id = re.sub(r'^arxiv:', '', arxiv_id, flags=re.IGNORECASE).strip() + clean_id = re.sub(r"^arxiv:", "", arxiv_id, flags=re.IGNORECASE).strip() - api_url = 'http://export.arxiv.org/api/query?id_list=' + api_url = "http://export.arxiv.org/api/query?id_list=" full_url = api_url + clean_id - log.debug(f"Fetching arXiv entry for {clean_id} (original: {arxiv_id}) - using: {full_url}") + log.debug( + f"Fetching arXiv entry for {clean_id} (original: {arxiv_id}) - using: {full_url}" + ) response = requests.get(full_url) feed = feedparser.parse(response.text) if not feed.entries: - log.warning(f"No arXiv entries found for ID: {clean_id}. The list is empty.") + log.warning( + f"No arXiv entries found for ID: {clean_id}. The list is empty." + ) return None return feed.entries[0] @@ -226,13 +365,14 @@ def arxiv_api(arxiv_id: str) -> FeedParserDict: def get_orcid(soup): links = soup.find_all("a") for link in links: - orcid = re.search('https://orcid.org/(.{4}-.{4}-.{4}-.{4})', link['href']) + orcid = re.search("https://orcid.org/(.{4}-.{4}-.{4}-.{4})", link["href"]) if orcid: return orcid.groups()[0] return None + @dataclass -class ArxivPublication(): +class ArxivPublication: arxiv_id: str api: Optional[MardiClient] = None metadata: Arxiv = None @@ -242,42 +382,44 @@ class ArxivPublication(): def __post_init__(self): if self.api is None: - self.api = Importer.get_api('arxiv') - if ' ' in self.arxiv_id: - self.arxiv_id = self.arxiv_id.split(' ')[0] - self.metadata = Arxiv(arxiv_id =self.arxiv_id) + self.api = Importer.get_api("arxiv") + if " " in self.arxiv_id: + self.arxiv_id = self.arxiv_id.split(" ")[0] + self.metadata = Arxiv(arxiv_id=self.arxiv_id) self.title = self.metadata.title self.authors = self.metadata.authors - arxiv_id = 'wdt:P818' + arxiv_id = "wdt:P818" QID_results = self.api.search_entity_by_value(arxiv_id, self.arxiv_id) - if QID_results: self.QID = QID_results[0] + if QID_results: + self.QID = QID_results[0] if self.QID: # Get authors. item = self.api.item.get(self.QID) - author_QID = item.get_value('wdt:P50') + author_QID = item.get_value("wdt:P50") or [] for QID in author_QID: author_item = self.api.item.get(entity_id=QID) - name = str(author_item.labels.get('en')) - orcid = author_item.get_value('wdt:P496') + name = str(author_item.labels.get("en")) + orcid = author_item.get_value("wdt:P496") orcid = orcid[0] if orcid else None - arxiv_author_id = author_item.get_value('wdt:P4594') + arxiv_author_id = author_item.get_value("wdt:P4594") arxiv_author_id = arxiv_author_id[0] if arxiv_author_id else None aliases = [] - if author_item.aliases.get('en'): - for alias in author_item.aliases.get('en'): + if author_item.aliases.get("en"): + for alias in author_item.aliases.get("en"): aliases.append(str(alias)) - author = Author(self.api, - name=name, - orcid=orcid, - arxiv_id=arxiv_author_id, - _aliases=aliases, - _QID=QID) + author = Author( + self.api, + name=name, + orcid=orcid, + arxiv_id=arxiv_author_id, + _aliases=aliases, + _QID=QID, + ) self.authors.append(author) - - def create(self): + def create(self): log = get_logger_safe(__name__) log.debug("Start creating wiki item for arXiv publication") @@ -289,30 +431,29 @@ def create(self): item.labels.set(language="en", value=self.title) if self.title: item.descriptions.set( - language="en", - value="scientific article from arXiv" + language="en", value="scientific article from arXiv" ) # Instance of: scholary article - item.add_claim('wdt:P31','wd:Q13442814') + item.add_claim("wdt:P31", "wd:Q13442814") # Publication date - item.add_claim('wdt:P577', self.metadata.publication_date) + item.add_claim("wdt:P577", self.metadata.publication_date) # Arxiv ID - item.add_claim('wdt:P818', self.arxiv_id) + item.add_claim("wdt:P818", self.arxiv_id) # Arxiv classification category_claims = [] - pattern_msc = re.compile(r'\d\d(?:-(?:XX|\d\d)|[A-Z](?:xx|\d\d))') - pattern_acm = re.compile(r'^[ABCDEFGHIJK]\.[0-9m](\.[0-9m])?$') + pattern_msc = re.compile(r"\d\d(?:-(?:XX|\d\d)|[A-Z](?:xx|\d\d))") + pattern_acm = re.compile(r"^[ABCDEFGHIJK]\.[0-9m](\.[0-9m])?$") for category in self.metadata.arxiv_classification: if pattern_msc.match(category): # MSC ID msc_categories = re.findall(pattern_msc, category) for msc_cat in msc_categories: - claim = self.api.get_claim('wdt:P3285', msc_cat) + claim = self.api.get_claim("wdt:P3285", msc_cat) category_claims.append(claim) elif pattern_acm.match(category) or ";" in category: # ACM Computing Classification System (1998) @@ -321,9 +462,9 @@ def create(self): continue elif category in taxonomy: # arXiv classification - claim = self.api.get_claim('wdt:P820', category) + claim = self.api.get_claim("wdt:P820", category) category_claims.append(claim) - + if category_claims: log.debug( "arxiv category_claims types=%s values=%s", @@ -333,10 +474,7 @@ def create(self): item.add_claims(category_claims) # Authors - author_QID = self.__preprocess_authors() - claims = [] - for author in author_QID: - claims.append(self.api.get_claim("wdt:P50", author)) + claims = self.__preprocess_authors() log.debug( "arxiv author claims types=%s values=%s", [type(c).__name__ for c in claims], @@ -345,33 +483,42 @@ def create(self): item.add_claims(claims) # DOI - doi = '10.48550/arXiv.' + self.arxiv_id - item.add_claim('wdt:P356', doi.upper()) - + doi = "10.48550/arXiv." + self.arxiv_id + item.add_claim("wdt:P356", doi.upper()) + self.QID = item.write().id if self.QID: - log.debug(f"arXiv preprint with arXiv id: {self.arxiv_id} created with ID {self.QID}.") + log.debug( + f"arXiv preprint with arXiv id: {self.arxiv_id} created with ID {self.QID}." + ) return self.QID else: - log.debug(f"arXiv preprint with arXiv id: {self.arxiv_id} could not be created.") + log.debug( + f"arXiv preprint with arXiv id: {self.arxiv_id} could not be created." + ) return None def __preprocess_authors(self) -> List[str]: """Processes the author information of each publication. - Create the author if it does not exist already as an + Create the author if it does not exist already as an entity in wikibase. - + + If an author has no ORCID and no existing QID, store the author as + an author name string (P2093) instead of creating an author item. + Returns: - List[str]: - QIDs corresponding to each author. + List: + Author claims to be added (P50 entity claims and/or P2093 string claims). """ - log = get_logger_safe(__name__) - author_QID = [] + claims = [] for author in self.authors: - if not author.QID: - log.debug(f"Creating author: {author}") - author.create() - author_QID.append(author.QID) - return author_QID + if author.orcid or author.arxiv_id or author.QID: + if not author.QID: + author.create() + claims.append(self.api.get_claim("wdt:P50", author.QID)) + else: + if author.name: + claims.append(self.api.get_claim("wdt:P2093", author.name)) + return claims diff --git a/mardi_importer/mardi_importer/cran/RPackage.py b/mardi_importer/mardi_importer/cran/RPackage.py index b892223..a010447 100644 --- a/mardi_importer/mardi_importer/cran/RPackage.py +++ b/mardi_importer/mardi_importer/cran/RPackage.py @@ -17,7 +17,9 @@ import re import logging -log = logging.getLogger('CRANlogger') + +log = logging.getLogger("CRANlogger") + @dataclass class RPackage: @@ -51,6 +53,7 @@ class RPackage: _QID: Package QID """ + date: str label: str description: str @@ -77,15 +80,15 @@ class RPackage: def __post_init__(self): if self.api is None: - self.api = Importer.get_api('cran') + self.api = Importer.get_api("cran") if self.wdi is None: self.wdi = WikidataImporter() if self.crossref is None: - self.crossref = Importer.create_source('crossref') + self.crossref = Importer.create_source("crossref") if self.arxiv is None: - self.arxiv = Importer.create_source('arxiv') + self.arxiv = Importer.create_source("arxiv") if self.zenodo is None: - self.zenodo = Importer.create_source('zenodo') + self.zenodo = Importer.create_source("zenodo") @property def QID(self) -> str: @@ -97,7 +100,7 @@ def QID(self) -> str: Returns: str: The entity QID representing the R package. """ - self._QID = self._QID or self.item.is_instance_of('wd:Q73539779') + self._QID = self._QID or self.item.is_instance_of("wd:Q73539779") return self._QID @property @@ -115,10 +118,7 @@ def item(self) -> MardiItem: description = self.description if self.label == self.description: description += " (R Package)" - self._item.descriptions.set( - language="en", - value=description - ) + self._item.descriptions.set(language="en", value=description) return self._item def exists(self) -> str: @@ -153,19 +153,19 @@ def pull(self): try: page = requests.get(self.url) - soup = BeautifulSoup(page.content, 'lxml') + soup = BeautifulSoup(page.content, "lxml") except: log.warning(f"Package {self.label} package not found in CRAN.") return None else: - if soup.find_all('table'): - self.long_description = soup.find_all('p')[0].get_text() or "" + if soup.find_all("table"): + self.long_description = soup.find_all("p")[0].get_text() or "" self.parse_publications(self.long_description) self.long_description = re.sub("\n", "", self.long_description).strip() self.long_description = re.sub("\t", "", self.long_description).strip() - table = soup.find_all('table')[0] - package_df = self.clean_package_list(table) + table = soup.find_all("table")[0] + package_df = self.clean_package_list(table) if "Version" in package_df.columns: self.version = package_df.loc[1, "Version"] @@ -182,7 +182,9 @@ def pull(self): self.get_versions() else: - log.warning(f"Metadata table not found in CRAN. Package has probably been archived.") + log.warning( + "Metadata table not found in CRAN. Package has probably been archived." + ) return self def create(self) -> None: @@ -195,16 +197,16 @@ def create(self) -> None: None """ package = self.pull() - + if package: package = package.insert_claims().write() if package: log.info(f"Package created with QID: {package['QID']}.") - #print('package created') + # print('package created') else: log.info(f"Package could not be created.") - #print('package not created') + # print('package not created') def write(self) -> Optional[Dict[str, str]]: """Write the package item to the Wikibase instance. @@ -221,18 +223,19 @@ def write(self) -> Optional[Dict[str, str]]: if self.item.claims: item = self.item.write() if item: - return {'QID': item.id} + return {"QID": item.id} def insert_claims(self): - # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) # Instance of: R package self.item.add_claim("wdt:P31", "wd:Q73539779") @@ -256,37 +259,53 @@ def insert_claims(self): qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")] self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier) - # Disambiguate Authors and create corresponding Author items - self.author_pool = author_factory.disambiguate_authors(self.author_pool) + pool_for_items = [] + for a in self.author_pool: + if a.orcid or a is self.maintainer: + pool_for_items.append(a) + + self.author_pool = author_factory.disambiguate_authors(pool_for_items) # Authors for author in self.authors: - author.pull_QID(self.author_pool) - self.item.add_claim("wdt:P50", author.QID) + if author.orcid: + author.pull_QID(self.author_pool) + if not author.QID: + author.create() + self.item.add_claim("wdt:P50", author.QID) + else: + if author.name: + self.item.add_claim("wdt:P2093", author.name) # Maintainer self.maintainer.pull_QID(self.author_pool) + if not self.maintainer.QID: + self.maintainer.create() self.item.add_claim("wdt:P126", self.maintainer.QID) # Licenses if self.license_data: - claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767') + claims = self.process_claims(self.license_data, "wdt:P275", "wdt:P9767") self.item.add_claims(claims) # Dependencies if self.dependencies: - claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348') + claims = self.process_claims(self.dependencies, "wdt:P1547", "wdt:P348") self.item.add_claims(claims) # Imports if self.imports: prop_nr = self.api.get_local_id_by_label("imports", "property") - claims = self.process_claims(self.imports, prop_nr, 'wdt:P348') + claims = self.process_claims(self.imports, prop_nr, "wdt:P348") self.item.add_claims(claims) # Related publications and sources cites_work = "wdt:P2860" - for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]: + for publications in [ + self.crossref_publications, + self.arxiv_publications, + self.zenodo_resources, + ]: for publication in publications: for author in publication.authors: author.pull_QID(self.author_pool) @@ -298,7 +317,8 @@ def insert_claims(self): # Wikidata QID wikidata_QID = self.get_wikidata_QID() - if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID) + if wikidata_QID: + self.item.add_claim("Wikidata QID", wikidata_QID) return self @@ -317,32 +337,46 @@ def update(self): str: ID of the updated R package. """ # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) if self.pull(): # Obtain current Authors - current_authors = self.item.get_value('wdt:P50') + current_authors = self.item.get_value("wdt:P50") for author_qid in current_authors: author_item = self.api.item.get(entity_id=author_qid) - author_label = str(author_item.labels.get('en')) + author_label = str(author_item.labels.get("en")) current_author = Author(self.api, name=author_label) current_author._QID = author_qid self.author_pool += [current_author] - + # Disambiguate Authors and create corresponding Author items - self.author_pool = author_factory.disambiguate_authors(self.author_pool) + pool_for_items = [] + for a in self.author_pool: + if a.orcid or a.QID or a is self.maintainer: + pool_for_items.append(a) + + self.author_pool = author_factory.disambiguate_authors(pool_for_items) # GUID to remove remove_guid = [] - props_to_delete = ['wdt:P50', 'wdt:P275', 'wdt:P1547', 'imports', 'wdt:P2860'] + props_to_delete = [ + "wdt:P50", + "wdt:P2093", + "wdt:P275", + "wdt:P1547", + "imports", + "wdt:P2860", + ] for prop_str in props_to_delete: - prop_nr = self.api.get_local_id_by_label(prop_str, 'property') + prop_nr = self.api.get_local_id_by_label(prop_str, "property") for claim in self.item.claims.get(prop_nr): remove_guid.append(claim.id) @@ -352,34 +386,41 @@ def update(self): # Restart item state self.exists() - if self.item.descriptions.values.get('en') != self.description: + if self.item.descriptions.values.get("en") != self.description: description = self.description if self.label == self.description: description += " (R Package)" - self.item.descriptions.set( - language="en", - value=description - ) + self.item.descriptions.set(language="en", value=description) # Long description - self.item.add_claim("description", self.long_description, action="replace_all") + self.item.add_claim( + "description", self.long_description, action="replace_all" + ) # Last update date - self.item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z", action="replace_all") + self.item.add_claim( + "wdt:P5017", f"+{self.date}T00:00:00Z", action="replace_all" + ) # Software version identifiers for version, publication_date in self.versions: qualifier = [self.api.get_claim("wdt:P577", publication_date)] self.item.add_claim("wdt:P348", version, qualifiers=qualifier) - + if self.version: qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")] - self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier) + self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier) # Authors for author in self.authors: - author.pull_QID(self.author_pool) - self.item.add_claim("wdt:P50", author.QID) + if author.orcid: + author.pull_QID(self.author_pool) + if not author.QID: + author.create() + self.item.add_claim("wdt:P50", author.QID) + else: + if author.name: + self.item.add_claim("wdt:P2093", author.name) # Maintainer self.maintainer.pull_QID(self.author_pool) @@ -387,23 +428,27 @@ def update(self): # Licenses if self.license_data: - claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767') + claims = self.process_claims(self.license_data, "wdt:P275", "wdt:P9767") self.item.add_claims(claims) # Dependencies if self.dependencies: - claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348') + claims = self.process_claims(self.dependencies, "wdt:P1547", "wdt:P348") self.item.add_claims(claims) # Imports if self.imports: prop_nr = self.api.get_local_id_by_label("imports", "property") - claims = self.process_claims(self.imports, prop_nr, 'wdt:P348') - self.item.add_claims(claims) + claims = self.process_claims(self.imports, prop_nr, "wdt:P348") + self.item.add_claims(claims) # Related publications and sources cites_work = "wdt:P2860" - for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]: + for publications in [ + self.crossref_publications, + self.arxiv_publications, + self.zenodo_resources, + ]: for publication in publications: for author in publication.authors: author.pull_QID(self.author_pool) @@ -415,25 +460,26 @@ def update(self): # Wikidata QID wikidata_QID = self.get_wikidata_QID() - if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID, action="replace_all") + if wikidata_QID: + self.item.add_claim("Wikidata QID", wikidata_QID, action="replace_all") package = self.write() - + if package: print(f"Package with QID updated: {package['QID']}.") else: print(f"Package could not be updated.") def process_claims(self, data, prop_nr, qualifier_nr=None): - claims = [] for value, qualifier_value in data: qualifier_prop_nr = ( - 'wdt:P2699' if qualifier_value.startswith('https') else qualifier_nr + "wdt:P2699" if qualifier_value.startswith("https") else qualifier_nr ) qualifier = ( [self.api.get_claim(qualifier_prop_nr, qualifier_value)] - if qualifier_value else [] + if qualifier_value + else [] ) claims.append(self.api.get_claim(prop_nr, value, qualifiers=qualifier)) return claims @@ -449,43 +495,49 @@ def parse_publications(self, description): List: List containing the wikibase IDs of mentioned publications. """ - doi_references = re.findall('', description) - arxiv_references = re.findall('', description) - zenodo_references = re.findall('', description) - - doi_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, doi_references)) - arxiv_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, arxiv_references)) - zenodo_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, zenodo_references)) + doi_references = re.findall("", description) + arxiv_references = re.findall("", description) + zenodo_references = re.findall("", description) + + doi_references = list( + map(lambda x: x[:-1] if x.endswith(".") else x, doi_references) + ) + arxiv_references = list( + map(lambda x: x[:-1] if x.endswith(".") else x, arxiv_references) + ) + zenodo_references = list( + map(lambda x: x[:-1] if x.endswith(".") else x, zenodo_references) + ) crossref_references = [] for doi in doi_references: doi = doi.strip().upper() - if re.search('10.48550/', doi): - arxiv_id = doi.replace(":",".") - arxiv_id = arxiv_id.replace('10.48550/arxiv.', '') + if re.search("10.48550/", doi): + arxiv_id = doi.replace(":", ".") + arxiv_id = arxiv_id.replace("10.48550/arxiv.", "") arxiv_references.append(arxiv_id.strip()) - elif re.search('10.5281/', doi): - zenodo_id = doi.replace(":",".") - zenodo_id = doi.replace('10.5281/zenodo.', '') + elif re.search("10.5281/", doi): + zenodo_id = doi.replace(":", ".") + zenodo_id = zenodo_id.replace("10.5281/zenodo.", "") zenodo_references.append(zenodo_id.strip()) else: crossref_references.append(doi) for doi in crossref_references: - publication = self.crossref.new_publication(doi.upper(), create_empty=True) + publication = self.crossref.new_publication(doi.upper()) self.author_pool += publication.authors self.crossref_publications.append(publication) for arxiv_id in arxiv_references: - arxiv_id = arxiv_id.replace(":",".") + arxiv_id = arxiv_id.replace(":", ".") publication = self.arxiv.new_publication(arxiv_id) if publication.title != "Error": self.author_pool += publication.authors self.arxiv_publications.append(publication) for zenodo_id in zenodo_references: - zenodo_id = zenodo_id.replace(":",".") + zenodo_id = zenodo_id.replace(":", ".") publication = self.zenodo.new_resource(zenodo_id) self.author_pool += publication.authors self.zenodo_resources.append(publication) @@ -527,10 +579,16 @@ def clean_package_list(self, table_html): if "License" in package_df.columns: package_df["License"] = package_df["License"].apply(self.parse_license) if "Author" in package_df.columns: - package_df["Author"] = str(table_html.find("td", text="Author:").find_next_sibling("td")).replace('\n', '').replace('\r', '') + package_df["Author"] = ( + str(table_html.find("td", text="Author:").find_next_sibling("td")) + .replace("\n", "") + .replace("\r", "") + ) package_df["Author"] = package_df["Author"].apply(self.parse_authors) if "Maintainer" in package_df.columns: - package_df["Maintainer"] = package_df["Maintainer"].apply(self.parse_maintainer) + package_df["Maintainer"] = package_df["Maintainer"].apply( + self.parse_maintainer + ) return package_df def parse_software(self, software_str: str) -> List[Tuple[str, str]]: @@ -617,9 +675,7 @@ def parse_license(self, x: str) -> List[Tuple[str, str]]: ): license_list.append(licenses[i]) i += 1 - elif re.findall(r"\[", licenses[i]) and not re.findall( - r"\]", licenses[i] - ): + elif re.findall(r"\[", licenses[i]) and not re.findall(r"\]", licenses[i]): j = i + 1 license_aux = licenses[i] closed = False @@ -676,7 +732,9 @@ def parse_license(self, x: str) -> List[Tuple[str, str]]: license_str = license_str.strip() if license_str in ["file LICENSE", "file LICENCE"]: - license_qualifier = f"https://cran.r-project.org/web/packages/{self.label}/LICENSE" + license_qualifier = ( + f"https://cran.r-project.org/web/packages/{self.label}/LICENSE" + ) license_QID = self.get_license_QID(license_str) license_tuples.append((license_QID, license_qualifier)) @@ -704,22 +762,25 @@ def parse_authors(self, x): (Dict): Dictionary of authors and corresponding ORCID ID, if provided. """ # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) - td_match = re.match(r'(.*?)', x) - if td_match: x = td_match.groups()[0] + td_match = re.match(r"(.*?)", x) + if td_match: + x = td_match.groups()[0] - x = re.sub("", "", x) # Delete img tags - x = re.sub(r"\(.*?\)", "", x) # Delete text in brackets - x = re.sub(r'"', "", x) # Delete quotation marks - x = re.sub("\t", "", x) # Delete tabs - x = re.sub("ORCID iD", "", x) # Delete orcid id refs + x = re.sub("", "", x) # Delete img tags + x = re.sub(r"\(.*?\)", "", x) # Delete text in brackets + x = re.sub(r'"', "", x) # Delete quotation marks + x = re.sub("\t", "", x) # Delete tabs + x = re.sub("ORCID iD", "", x) # Delete orcid id refs author_list = re.findall(r".*?\]", x) authors = [] @@ -737,7 +798,9 @@ def parse_authors(self, x): author = re.sub(r"^\s?,", "", author) author = re.sub(r"^\s?and\s?", "", author) author = re.sub( - r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", author + r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", + "", + author, ) author = author.strip() multiple_words = author.split(" ") @@ -749,7 +812,9 @@ def parse_authors(self, x): authors_and = x.split(" and ") if len(authors_and) > len(authors_comma): author = re.sub( - r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", authors_and[0] + r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", + "", + authors_and[0], ) else: author = re.sub( @@ -775,15 +840,18 @@ def parse_maintainer(self, name: str) -> str: (str): Name of the maintainer """ # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) - if pd.isna(name): return name + if pd.isna(name): + return name quotes = re.match(r'"(.*?)"', name) if quotes: @@ -792,7 +860,7 @@ def parse_maintainer(self, name: str) -> str: name = re.sub(r"<.*?>", "", name) name = re.sub(r"\(.*?\)", "", name) name = name.strip() - name = name.split(',') + name = name.split(",") maintainer = author_factory(self.api, name=name[0]) self.author_pool += [maintainer] return maintainer @@ -812,6 +880,7 @@ def get_license_QID(self, license_str: str) -> str: Returns: (str): Wikidata item ID. """ + def get_license(label: str) -> str: license_item = self.api.item.new() license_item.labels.set(language="en", value=label) @@ -819,7 +888,7 @@ def get_license(label: str) -> str: license_mapping = { "ACM": get_license("ACM Software License Agreement"), - "AGPL":"wd:Q28130012", + "AGPL": "wd:Q28130012", "AGPL-3": "wd:Q27017232", "Apache License": "wd:Q616526", "Apache License 2.0": "wd:Q13785927", @@ -894,49 +963,53 @@ def get_wikidata_QID(self) -> Optional[str]: """ results = search_entities( search_string=self.label, - mediawiki_api_url='https://www.wikidata.org/w/api.php' - ) + mediawiki_api_url="https://www.wikidata.org/w/api.php", + ) for result in results: item = self.api.item.get( - entity_id=result, - mediawiki_api_url='https://www.wikidata.org/w/api.php' - ) - if 'P31' in item.claims.get_json().keys(): - instance_claims = item.claims.get('P31') + entity_id=result, mediawiki_api_url="https://www.wikidata.org/w/api.php" + ) + if "P31" in item.claims.get_json().keys(): + instance_claims = item.claims.get("P31") if instance_claims: for claim in instance_claims: claim = claim.get_json() - if claim['mainsnak']['datatype'] == "wikibase-item": + if claim["mainsnak"]["datatype"] == "wikibase-item": # If instance of R package - if 'datavalue' in claim['mainsnak'].keys(): - if claim['mainsnak']['datavalue']['value']['id'] == "Q73539779": + if "datavalue" in claim["mainsnak"].keys(): + if ( + claim["mainsnak"]["datavalue"]["value"]["id"] + == "Q73539779" + ): return result - + def get_versions(self): url = f"https://cran.r-project.org/src/contrib/Archive/{self.label}" try: page = requests.get(url) - soup = BeautifulSoup(page.content, 'lxml') + soup = BeautifulSoup(page.content, "lxml") except: log.warning(f"Version page for package {self.label} not found.") else: - if soup.find_all('table'): - table = soup.find_all('table')[0] + if soup.find_all("table"): + table = soup.find_all("table")[0] versions_df = pd.read_html(StringIO(str(table))) versions_df = versions_df[0] - versions_df = versions_df.drop(columns=['Unnamed: 0', 'Size', 'Description']) - versions_df = versions_df.drop(index= [0, 1]) - + versions_df = versions_df.drop( + columns=["Unnamed: 0", "Size", "Description"] + ) + versions_df = versions_df.drop(index=[0, 1]) + for _, row in versions_df.iterrows(): - name = row['Name'] - publication_date = row['Last modified'] + name = row["Name"] + publication_date = row["Last modified"] if isinstance(name, str): - version = re.sub(f'{self.label}_', '', name) - version = re.sub('.tar.gz', '', version) - + version = re.sub(f"{self.label}_", "", name) + version = re.sub(".tar.gz", "", version) + publication_date = publication_date.split()[0] publication_date = f"+{publication_date}T00:00:00Z" - + self.versions.append((version, publication_date)) diff --git a/mardi_importer/mardi_importer/crossref/CrossrefPublication.py b/mardi_importer/mardi_importer/crossref/CrossrefPublication.py index cf9823c..7e24d3d 100644 --- a/mardi_importer/mardi_importer/crossref/CrossrefPublication.py +++ b/mardi_importer/mardi_importer/crossref/CrossrefPublication.py @@ -5,14 +5,15 @@ from dataclasses import dataclass, field from habanero import Crossref from httpx import HTTPStatusError -from typing import List, Optional +from typing import List, Optional, Any from mardi_importer import Importer from mardi_importer.utils import Author from mardi_importer.logger.logging_utils import get_logger_safe + @dataclass -class CrossrefPublication(): +class CrossrefPublication: doi: str authors: List[Author] = field(default_factory=list) title: str = "" @@ -38,216 +39,233 @@ class CrossrefPublication(): month: str = "" year: str = "" preprint: bool = False - create_empty: bool = False + crossref_ok: bool = False identical: str = "" QID: str = None api: Optional[MardiClient] = None def __post_init__(self): - log = get_logger_safe(__name__) - self.crossref_ok = False if self.api is None: - self.api = Importer.get_api('crossref') + self.api = Importer.get_api("crossref") item = self.api.item.new() item.labels.set(language="en", value=self.title) doi_id = "wdt:P356" QID_results = self.api.search_entity_by_value(doi_id, self.doi) - if QID_results: self.QID = QID_results[0] + if QID_results: + self.QID = QID_results[0] if self.QID: # Get authors. item = self.api.item.get(self.QID) - author_QID = item.get_value('wdt:P50') + author_QID = item.get_value("wdt:P50") for QID in author_QID: author_item = self.api.item.get(entity_id=QID) - name = str(author_item.labels.get('en')) - orcid = author_item.get_value('wdt:P496') + name = str(author_item.labels.get("en")) + orcid = author_item.get_value("wdt:P496") orcid = orcid[0] if orcid else None aliases = [] - if author_item.aliases.get('en'): - for alias in author_item.aliases.get('en'): + if author_item.aliases.get("en"): + for alias in author_item.aliases.get("en"): aliases.append(str(alias)) - author = Author(self.api, - name=name, - orcid=orcid, - _aliases=aliases, - _QID=QID) + author = Author( + self.api, name=name, orcid=orcid, _aliases=aliases, _QID=QID + ) self.authors.append(author) else: try: cr = Crossref() response = cr.works(ids=self.doi) except HTTPStatusError as e: - log.error(f"Publication with doi: {self.doi} not found in Crossref: {str(e)}") + log.error( + f"Publication with doi: {self.doi} not found in Crossref: {str(e)}" + ) return None else: - if response['status'] != 'ok': + if response["status"] != "ok": return None self.crossref_ok = True - metadata = response['message'] - if 'title' in metadata.keys(): - if len(metadata['title']) > 0: - title = metadata['title'][0] + metadata = response["message"] + if "title" in metadata.keys(): + if len(metadata["title"]) > 0: + title = metadata["title"][0] groups = re.search("<([a-z]*)>(.*?)<\/\\1>", title) - while groups: - title = title.replace(groups.group(0),groups.group(2)) + while groups: + title = title.replace(groups.group(0), groups.group(2)) groups = re.search("<([a-z]*)>(.*?)<\/\\1>", title) title = " ".join(title.split()) self.title = title - if 'type' in metadata.keys(): - if metadata['type'] == 'journal-article': - self.instance = 'wd:Q13442814' - self.description = 'scientific article' - if 'relation' in metadata.keys(): - if 'is-preprint-of' in metadata['relation'].keys(): + if "type" in metadata.keys(): + if metadata["type"] == "journal-article": + self.instance = "wd:Q13442814" + self.description = "scientific article" + if "relation" in metadata.keys(): + if "is-preprint-of" in metadata["relation"].keys(): self.description += " preprint" - if 'container-title' in metadata.keys(): - if len(metadata['container-title']) > 0: - self.journal = metadata['container-title'][0] - if 'volume' in metadata.keys(): - self.volume = metadata['volume'] - if 'issue' in metadata.keys(): - self.issue = metadata['issue'] - if 'page' in metadata.keys(): - self.page = metadata['page'] - if 'issn-type' in metadata.keys(): - for issn in metadata['issn-type']: - if issn['type'] == "print": - self.issn_print = issn['value'] - elif issn['type'] == "electronic": - self.issn_online = issn['value'] - elif metadata['type'] == 'book': - self.instance = 'wd:Q571' - self.description = 'academic book' + if "container-title" in metadata.keys(): + if len(metadata["container-title"]) > 0: + self.journal = metadata["container-title"][0] + if "volume" in metadata.keys(): + self.volume = metadata["volume"] + if "issue" in metadata.keys(): + self.issue = metadata["issue"] + if "page" in metadata.keys(): + self.page = metadata["page"] + if "issn-type" in metadata.keys(): + for issn in metadata["issn-type"]: + if issn["type"] == "print": + self.issn_print = issn["value"] + elif issn["type"] == "electronic": + self.issn_online = issn["value"] + elif metadata["type"] == "book": + self.instance = "wd:Q571" + self.description = "academic book" self.book = True - if 'ISBN' in metadata.keys(): - if len(metadata['ISBN']) > 0: - self.isbn = metadata['ISBN'][0] - elif metadata['type'] == 'monograph': - self.instance = 'wd:Q193495' - self.description = 'scholarly monograph' + if "ISBN" in metadata.keys(): + if len(metadata["ISBN"]) > 0: + self.isbn = metadata["ISBN"][0] + elif metadata["type"] == "monograph": + self.instance = "wd:Q193495" + self.description = "scholarly monograph" self.monograph = True - if 'ISBN' in metadata.keys(): - if len(metadata['ISBN']) > 0: - self.isbn = metadata['ISBN'][0] - elif metadata['type'] == 'posted-content': - if 'subtype' in metadata.keys(): - if metadata['subtype'] == 'preprint': + if "ISBN" in metadata.keys(): + if len(metadata["ISBN"]) > 0: + self.isbn = metadata["ISBN"][0] + elif metadata["type"] == "posted-content": + if "subtype" in metadata.keys(): + if metadata["subtype"] == "preprint": self.posted = True - self.instance = 'wd:Q13442814' - self.description = 'scientific article preprint' + self.instance = "wd:Q13442814" + self.description = "scientific article preprint" self.preprint = True - elif metadata['type'] == 'proceedings-article': - self.instance = 'wd:Q23927052' - self.description = 'proceedings article' - if 'container-title' in metadata.keys(): - if len(metadata['container-title']) > 0: - self.proceedings = metadata['container-title'][0] - if 'created' in metadata.keys(): - if 'date-parts' in metadata['created'].keys(): - if len(metadata['created']['date-parts'][0]) > 1: - self.proceedings_month = str(metadata['created']['date-parts'][0][1]) + elif metadata["type"] == "proceedings-article": + self.instance = "wd:Q23927052" + self.description = "proceedings article" + if "container-title" in metadata.keys(): + if len(metadata["container-title"]) > 0: + self.proceedings = metadata["container-title"][0] + if "created" in metadata.keys(): + if "date-parts" in metadata["created"].keys(): + if len(metadata["created"]["date-parts"][0]) > 1: + self.proceedings_month = str( + metadata["created"]["date-parts"][0][1] + ) if len(self.proceedings_month) == 1: - self.proceedings_month = "0" + self.proceedings_month - if len(metadata['created']['date-parts'][0]) > 1: - self.proceedings_year = str(metadata['created']['date-parts'][0][0]) - elif metadata['type'] == 'book-chapter': - self.instance = 'wd:Q1980247' - self.description = 'book chapter' + self.proceedings_month = ( + "0" + self.proceedings_month + ) + if len(metadata["created"]["date-parts"][0]) > 1: + self.proceedings_year = str( + metadata["created"]["date-parts"][0][0] + ) + elif metadata["type"] == "book-chapter": + self.instance = "wd:Q1980247" + self.description = "book chapter" self.book_chapter = True - if 'container-title' in metadata.keys(): - if 'ISBN' in metadata.keys(): - if len(metadata['ISBN']) > 0: - self.isbn = metadata['ISBN'][0] - if len(metadata['container-title']) > 0: - book_title = metadata['container-title'][0] + if "container-title" in metadata.keys(): + if "ISBN" in metadata.keys(): + if len(metadata["ISBN"]) > 0: + self.isbn = metadata["ISBN"][0] + if len(metadata["container-title"]) > 0: + book_title = metadata["container-title"][0] self.container_book = self.__preprocess_book(book_title) - elif metadata['type'] == 'journal-issue': - self.instance = 'wd:Q28869365' - self.description = 'journal issue' - elif metadata['type'] == 'journal-volume': - self.instance = 'wd:Q1238720' - self.description = 'journal volume' - elif metadata['type'] == 'journal': - self.instance = 'wd:Q5633421' - self.description = 'scientific journal' - elif metadata['type'] == 'proceedings': - self.instance = 'wd:Q1143604' - self.description = 'conference proceedings' - elif metadata['type'] == 'dataset': - self.instance = 'wd:Q1172284' - self.description = 'dataset' - elif metadata['type'] == 'report': - self.instance = 'wd:Q10870555' - self.description = 'report' - elif metadata['type'] == 'edited-book': - self.instance = 'wd:Q571' - self.description = 'academic book' - elif metadata['type'] == 'reference-book': - self.instance = 'wd:Q571' - self.description = 'academic book' - elif metadata['type'] == 'book-series': - self.instance = 'wd:Q277759' - self.description = 'book series' - elif metadata['type'] == 'book-set': - self.instance = 'wd:Q28062188' - self.description = 'book set' - elif metadata['type'] == 'book-section': - self.instance = 'wd:Q1931107' - self.description = 'book section' - elif metadata['type'] == 'dissertation': - self.instance = 'wd:Q1385450' - self.description = 'dissertation' + elif metadata["type"] == "journal-issue": + self.instance = "wd:Q28869365" + self.description = "journal issue" + elif metadata["type"] == "journal-volume": + self.instance = "wd:Q1238720" + self.description = "journal volume" + elif metadata["type"] == "journal": + self.instance = "wd:Q5633421" + self.description = "scientific journal" + elif metadata["type"] == "proceedings": + self.instance = "wd:Q1143604" + self.description = "conference proceedings" + elif metadata["type"] == "dataset": + self.instance = "wd:Q1172284" + self.description = "dataset" + elif metadata["type"] == "report": + self.instance = "wd:Q10870555" + self.description = "report" + elif metadata["type"] == "edited-book": + self.instance = "wd:Q571" + self.description = "academic book" + elif metadata["type"] == "reference-book": + self.instance = "wd:Q571" + self.description = "academic book" + elif metadata["type"] == "book-series": + self.instance = "wd:Q277759" + self.description = "book series" + elif metadata["type"] == "book-set": + self.instance = "wd:Q28062188" + self.description = "book set" + elif metadata["type"] == "book-section": + self.instance = "wd:Q1931107" + self.description = "book section" + elif metadata["type"] == "dissertation": + self.instance = "wd:Q1385450" + self.description = "dissertation" # The following types are not associated with an instance or description # ['component', 'report-series', 'standard', 'standard-series', # 'book-part', 'book-track', 'reference-entry', 'other', 'peer-review'] - if 'publisher' in metadata.keys(): - self.publisher = metadata['publisher'] + if "publisher" in metadata.keys(): + self.publisher = metadata["publisher"] - if 'published' in metadata.keys(): - if 'date-parts' in metadata['published'].keys(): - if len(metadata['published']['date-parts'][0]) > 2: - self.day = str(metadata['published']['date-parts'][0][2]) + if "published" in metadata.keys(): + if "date-parts" in metadata["published"].keys(): + if len(metadata["published"]["date-parts"][0]) > 2: + self.day = str(metadata["published"]["date-parts"][0][2]) if len(self.day) == 1: self.day = "0" + self.day - if len(metadata['published']['date-parts'][0]) > 1: - self.month = str(metadata['published']['date-parts'][0][1]) + if len(metadata["published"]["date-parts"][0]) > 1: + self.month = str(metadata["published"]["date-parts"][0][1]) if len(self.month) == 1: self.month = "0" + self.month - self.year = str(metadata['published']['date-parts'][0][0]) + self.year = str(metadata["published"]["date-parts"][0][0]) if self.year and self.book: self.description += f" ({self.year})" # Logic to determine if 'Author' is the class or the module containing the class - if hasattr(Author, 'Author') and not isinstance(Author, type): + if hasattr(Author, "Author") and not isinstance(Author, type): author_factory = Author.Author else: author_factory = Author if not callable(author_factory): - raise TypeError(f"Could not resolve a callable Author class. Check your imports.") - - if 'author' in metadata.keys(): - for author in metadata['author']: - if 'given' in author.keys() and 'family' in author.keys(): - author_label = f"{author['given'].title()} {author['family'].title()}" - if 'ORCID' in author.keys(): - orcid_id = re.findall("\d{4}-\d{4}-\d{4}-.{4}", author['ORCID'])[0] - self.authors.append(author_factory(self.api, name=author_label, orcid=orcid_id)) + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) + + if "author" in metadata.keys(): + for author in metadata["author"]: + if "given" in author.keys() and "family" in author.keys(): + author_label = ( + f"{author['given'].title()} {author['family'].title()}" + ) + if "ORCID" in author.keys(): + orcid_id = re.findall( + "\d{4}-\d{4}-\d{4}-.{4}", author["ORCID"] + )[0] + self.authors.append( + author_factory( + self.api, name=author_label, orcid=orcid_id + ) + ) else: - self.authors.append(author_factory(self.api, name=author_label)) + self.authors.append( + author_factory(self.api, name=author_label) + ) - if 'relation' in metadata.keys(): - if 'is-preprint-of' in metadata['relation'].keys(): + if "relation" in metadata.keys(): + if "is-preprint-of" in metadata["relation"].keys(): self.preprint = True - if 'is-identical-to' in metadata['relation'].keys(): - identical_obj = metadata['relation']['is-identical-to'][0] - if 'id' in identical_obj.keys(): - self.identical = identical_obj['id'] + if "is-identical-to" in metadata["relation"].keys(): + identical_obj = metadata["relation"]["is-identical-to"][0] + if "id" in identical_obj.keys(): + self.identical = identical_obj["id"] def create(self): if self.QID: @@ -256,77 +274,71 @@ def create(self): log = get_logger_safe(__name__) log.debug("Start creating wiki item for crossref publication") - if not self.crossref_ok and not self.create_empty: + if not self.crossref_ok: log.warning(f"Skipping creation, DOI {self.doi} not found in Crossref.") return None item = self.api.item.new() if self.title: item.labels.set(language="en", value=self.title) - - if self.description: - item.descriptions.set( - language="en", - value=self.description - ) + + if self.description: + item.descriptions.set(language="en", value=self.description) if self.instance: self.QID = item.is_instance_of_with_property( - self.instance, - "wdt:P356", - self.doi - ) - item.add_claim('wdt:P31', self.instance) + self.instance, "wdt:P356", self.doi + ) + item.add_claim("wdt:P31", self.instance) if self.identical: # Check if an identical crossref publication already exists existing_item = item.is_instance_of_with_property( - self.instance, - "wdt:P356", - self.identical - ) - if existing_item: return existing_item + self.instance, "wdt:P356", self.identical + ) + if existing_item: + return existing_item - item.add_claim('wdt:P356', self.doi) + item.add_claim("wdt:P356", self.doi) if self.journal: journal_id = self.__preprocess_journal() - item.add_claim('wdt:P1433', journal_id) + item.add_claim("wdt:P1433", journal_id) if len(self.volume) > 0: - item.add_claim('wdt:P478', self.volume) + item.add_claim("wdt:P478", self.volume) if len(self.issue) > 0: - item.add_claim('wdt:P433', self.issue) + item.add_claim("wdt:P433", self.issue) if len(self.page) > 0: - item.add_claim('wdt:P304', self.page) + item.add_claim("wdt:P304", self.page) elif (self.book or self.monograph) and self.isbn: if len(self.isbn) == 13: - isbn_prop_nr = 'wdt:P212' + isbn_prop_nr = "wdt:P212" elif len(self.isbn) == 10: - isbn_prop_nr = 'wdt:P957' + isbn_prop_nr = "wdt:P957" item.add_claim(isbn_prop_nr, self.isbn) self.QID = item.is_instance_of_with_property( - self.instance, - isbn_prop_nr, - self.isbn - ) + self.instance, isbn_prop_nr, self.isbn + ) elif self.book_chapter: if len(self.container_book) > 0: - item.add_claim('wdt:P1433', self.container_book) + item.add_claim("wdt:P1433", self.container_book) elif self.proceedings: proceedings_id = self.__preprocess_proceedings() - item.add_claim('wdt:P1433', proceedings_id) + item.add_claim("wdt:P1433", proceedings_id) if len(self.day) > 0: - item.add_claim("wdt:P577", f"+{self.year}-{self.month}-{self.day}T00:00:00Z", precision=11) + item.add_claim( + "wdt:P577", + f"+{self.year}-{self.month}-{self.day}T00:00:00Z", + precision=11, + ) elif len(self.month) > 0: - item.add_claim("wdt:P577", f"+{self.year}-{self.month}-00T00:00:00Z", precision=10) + item.add_claim( + "wdt:P577", f"+{self.year}-{self.month}-00T00:00:00Z", precision=10 + ) elif len(self.year) > 0: item.add_claim("wdt:P577", f"+{self.year}-00-00T00:00:00Z", precision=9) - author_QID = self.__preprocess_authors() - author_claims = [] - for author in author_QID: - author_claims.append(self.api.get_claim("wdt:P50", author)) - + author_claims = self.__preprocess_authors() log.debug( "crossref author_claims types=%s values=%s", [type(c).__name__ for c in author_claims], @@ -334,23 +346,19 @@ def create(self): ) item.add_claims(author_claims) - + if not self.QID: self.QID = item.write().id else: - item.descriptions.set( - language="en", - value="scientific article" - ) + item.descriptions.set(language="en", value="scientific article") - scholarly_article = 'wd:Q13442814' - item.add_claim('wdt:P31', scholarly_article) - item.add_claim('wdt:P356', self.doi) + scholarly_article = "wd:Q13442814" + item.add_claim("wdt:P31", scholarly_article) + item.add_claim("wdt:P356", self.doi) if not self.QID: self.QID = item.write().id - if self.QID: log.info(f"Publication with DOI: {self.doi} created with ID {self.QID}.") return self.QID @@ -358,78 +366,90 @@ def create(self): log.warning(f"Publication with DOI: {self.doi} could not be created.") return None - def __preprocess_authors(self) -> List[str]: - """Processes the author information of each publication. + def __preprocess_authors(self) -> List[Any]: + """Processes the author information of each publication. + + Create the author if it does not exist already as an + entity in wikibase. If an author has no ORCID and no existing + QID, save the author as an author name string instead. - Create the author if it does not exist already as an - entity in wikibase. - Returns: - List[str]: - QIDs corresponding to each author. + List[Any]: + Author claims to be added to the publication item (wdt:P50 for + author entities, wdt:P2093 for author name strings). """ - author_QID = [] + claims = [] log = get_logger_safe(__name__) log.debug(f"Start preprocessing authors for: {self.authors}") for author in self.authors: - if not author.QID: - log.debug(f"Creating author item for: {author}") - author.create() - log.debug(f"Created author with QID: {author.QID}") - author_QID.append(author.QID) - return author_QID + if author.orcid or author.QID: + if not author.QID: + log.debug(f"Creating author item for: {author}") + author.create() + log.debug(f"Created author with QID: {author.QID}") + claims.append(self.api.get_claim("wdt:P50", author.QID)) + else: + # if it does not exist yet and there is not orcid, it should be a name string + if author.name: + claims.append(self.api.get_claim("wdt:P2093", author.name)) + return claims def __preprocess_journal(self): item = self.api.item.new() item.labels.set(language="en", value=self.journal) - item.descriptions.set( - language="en", - value="scientific journal" - ) - journal_id = item.is_instance_of('wd:Q5633421') + item.descriptions.set(language="en", value="scientific journal") + journal_id = item.is_instance_of("wd:Q5633421") if journal_id: return journal_id else: - item.add_claim('wdt:P31', 'wd:Q5633421') + item.add_claim("wdt:P31", "wd:Q5633421") claims = [] if len(self.issn_print) > 0: - qualifier = [self.api.get_claim('wdt:P437', 'wd:Q1261026')] - claims.append(self.api.get_claim('wdt:P236', self.issn_print, qualifiers=qualifier)) + qualifier = [self.api.get_claim("wdt:P437", "wd:Q1261026")] + claims.append( + self.api.get_claim( + "wdt:P236", self.issn_print, qualifiers=qualifier + ) + ) if len(self.issn_online) > 0: - qualifier = [self.api.get_claim('wdt:P437', 'wd:Q1714118')] - claims.append(self.api.get_claim('wdt:P236', self.issn_online, qualifiers=qualifier)) + qualifier = [self.api.get_claim("wdt:P437", "wd:Q1714118")] + claims.append( + self.api.get_claim( + "wdt:P236", self.issn_online, qualifiers=qualifier + ) + ) return item.write().id def __preprocess_book(self, container_book): item = self.api.item.new() item.labels.set(language="en", value=container_book) - item.descriptions.set( - language="en", - value="academic book" - ) - book_id = item.is_instance_of('wd:Q571') + item.descriptions.set(language="en", value="academic book") + book_id = item.is_instance_of("wd:Q571") if book_id: return book_id else: - item.add_claim('wdt:P31', 'wd:Q571') + item.add_claim("wdt:P31", "wd:Q571") if len(self.isbn) == 13: - item.add_claim('wdt:P212', self.isbn) + item.add_claim("wdt:P212", self.isbn) elif len(self.isbn) == 10: - item.add_claim('wdt:P957', self.isbn) + item.add_claim("wdt:P957", self.isbn) return item.write().id def __preprocess_proceedings(self): item = self.api.item.new() item.labels.set(language="en", value=self.proceedings) - proceedings_id = item.is_instance_of('wd:Q1143604') + proceedings_id = item.is_instance_of("wd:Q1143604") if proceedings_id: return proceedings_id else: - item.add_claim('wdt:P31', 'wd:Q1143604') + item.add_claim("wdt:P31", "wd:Q1143604") if len(self.proceedings_month) > 0: - item.add_claim("wdt:P577", f"+{self.proceedings_year}-{self.proceedings_month}-00T00:00:00Z") + item.add_claim( + "wdt:P577", + f"+{self.proceedings_year}-{self.proceedings_month}-00T00:00:00Z", + ) elif len(self.proceedings_year) > 0: item.add_claim("wdt:P577", f"+{self.proceedings_year}-00-00T00:00:00Z") - return item.write().id \ No newline at end of file + return item.write().id diff --git a/mardi_importer/mardi_importer/utils/Author.py b/mardi_importer/mardi_importer/utils/Author.py index 8f91bdc..c63f396 100644 --- a/mardi_importer/mardi_importer/utils/Author.py +++ b/mardi_importer/mardi_importer/utils/Author.py @@ -5,8 +5,10 @@ from typing import List from nameparser import HumanName from nameparser.config import CONSTANTS -CONSTANTS.titles.remove('Mahdi') -CONSTANTS.titles.remove('Bodhisattva') + +CONSTANTS.titles.remove("Mahdi") +CONSTANTS.titles.remove("Bodhisattva") + @dataclass class Author: @@ -38,20 +40,35 @@ def __eq__(self, other): other_name = HumanName(other.name) if self_name.first == other_name.first and self_name.last == other_name.last: return True - if ( self_name.first.lower().replace('-','') == other_name.first.lower().replace('-','') and - self_name.last.lower().replace('-','') == other_name.last.lower().replace('-','') ): + if self_name.first.lower().replace("-", "") == other_name.first.lower().replace( + "-", "" + ) and self_name.last.lower().replace( + "-", "" + ) == other_name.last.lower().replace("-", ""): return True - if ( len(self_name.first) == 2 and self_name.first[0] == other_name.first[0] - and self_name.last == other_name.last ): + if ( + len(self_name.first) == 2 + and self_name.first[0] == other_name.first[0] + and self_name.last == other_name.last + ): return True - if ( len(other_name.first) == 2 and self_name.first[0] == other_name.first[0] - and self_name.last == other_name.last ): + if ( + len(other_name.first) == 2 + and self_name.first[0] == other_name.first[0] + and self_name.last == other_name.last + ): return True - if ( self_name.first == other_name.first and len(self_name.last) == 2 - and self_name.last[0] == other_name.last[0] ): + if ( + self_name.first == other_name.first + and len(self_name.last) == 2 + and self_name.last[0] == other_name.last[0] + ): return True - if ( self_name.first == other_name.first and len(other_name.last) == 2 - and self_name.last[0] == other_name.last[0] ): + if ( + self_name.first == other_name.first + and len(other_name.last) == 2 + and self_name.last[0] == other_name.last[0] + ): return True return False @@ -59,7 +76,7 @@ def __add__(self, other): self_name = HumanName(self.name) other_name = HumanName(other.name) - name_attributes = ['first', 'middle', 'last'] + name_attributes = ["first", "middle", "last"] name_values = [ getattr(self_name, attr) if len(getattr(self_name, attr)) >= len(getattr(other_name, attr)) @@ -75,7 +92,7 @@ def __add__(self, other): aliases.append(other.name) aliases += [x for x in self.aliases if x != long_name] aliases += [x for x in other.aliases if (x != long_name and x not in aliases)] - + orcid = self.orcid if self.orcid else other.orcid arxiv_id = self.arxiv_id if self.arxiv_id else other.arxiv_id affiliation = self.affiliation if self.affiliation else other.affiliation @@ -87,28 +104,30 @@ def __add__(self, other): item.aliases.set(language="en", values=aliases) item.write() - return Author(self.api, - name=long_name, - orcid=orcid, - arxiv_id=arxiv_id, - affiliation=affiliation, - _aliases=aliases, - _QID=QID) + return Author( + self.api, + name=long_name, + orcid=orcid, + arxiv_id=arxiv_id, + affiliation=affiliation, + _aliases=aliases, + _QID=QID, + ) def __repr__(self): - rep = f'Author: {self.name}, ORCID: {self.orcid}, arXiv: {self.arxiv_id}, QID: {self.QID}, {self.aliases}' + rep = f"Author: {self.name}, ORCID: {self.orcid}, arXiv: {self.arxiv_id}, QID: {self.QID}, {self.aliases}" return rep - + @property def aliases(self) -> List[str]: if self._aliases: return self._aliases - + aliases = [] if self.QID: item = self.api.item.get(entity_id=self.QID) - if item.aliases.get('en'): - for alias in item.aliases.get('en'): + if item.aliases.get("en"): + for alias in item.aliases.get("en"): aliases.append(str(alias)) self._aliases = aliases return aliases @@ -120,14 +139,17 @@ def QID(self) -> str: if not self.orcid: return None - - results = self.api.search_entity_by_value('wdt:P496', self.orcid) + + results = self.api.search_entity_by_value("wdt:P496", self.orcid) if results: self._QID = results[0] return self._QID @classmethod def disambiguate_authors(cls, authors): + # Return empty input immediately + if not authors: + return [] disambiguated_authors = [authors[0]] for author in authors: if author not in disambiguated_authors: @@ -145,31 +167,33 @@ def pull_QID(self, author_pool): self._QID = author_pool[index].QID def create(self): - if self.QID: + if self.QID: # Update orcid and arxiv_id if given if self.orcid or self.arxiv_id: update_item = self.api.item.get(entity_id=self.QID) - current_orcid = update_item.get_value('wdt:P496') - current_arxiv_id = update_item.get_value('wdt:P4594') + current_orcid = update_item.get_value("wdt:P496") + current_arxiv_id = update_item.get_value("wdt:P4594") if not current_orcid or not current_arxiv_id: if not current_orcid and self.orcid: - update_item.add_claim('wdt:P496', self.orcid) + update_item.add_claim("wdt:P496", self.orcid) if not current_arxiv_id and self.arxiv_id: - update_item.add_claim('wdt:P4594', self.arxiv_id) + update_item.add_claim("wdt:P4594", self.arxiv_id) update_item.write() return self.QID - teams = {'r foundation': 'Q111430684', - 'the r foundation': 'Q111430684', - 'r core team': 'Q116739338', - 'the r core team': 'Q116739338', - 'cran team': 'Q116739332', - 'microsoft corporation': 'Q2283'} - + teams = { + "r foundation": "Q111430684", + "the r foundation": "Q111430684", + "r core team": "Q116739338", + "the r core team": "Q116739338", + "cran team": "Q116739332", + "microsoft corporation": "Q2283", + } + if self.name.lower() in teams.keys(): - self._QID = self.wdi.query('local_id', teams[self.name.lower()]) + self._QID = self.wdi.query("local_id", teams[self.name.lower()]) return self.QID - + self._item = self.api.item.new() self._item.labels.set(language="en", value=self.name) self._item.aliases.set(language="en", values=self.aliases) @@ -180,7 +204,7 @@ def create(self): # Orcid ID if self.orcid: - self._item.add_claim('ORCID iD', self.orcid) + self._item.add_claim("ORCID iD", self.orcid) if self.QID: self._item = self.api.item.get(self.QID) diff --git a/mardi_importer/mardi_importer/zenodo/ZenodoResource.py b/mardi_importer/mardi_importer/zenodo/ZenodoResource.py index 317ee95..9efb468 100644 --- a/mardi_importer/mardi_importer/zenodo/ZenodoResource.py +++ b/mardi_importer/mardi_importer/zenodo/ZenodoResource.py @@ -9,10 +9,13 @@ from dataclasses import dataclass, field from typing import Dict, List, Optional -CLEANR = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});') # used to parse out html tags +CLEANR = re.compile( + "<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});" +) # used to parse out html tags + @dataclass -class ZenodoResource(): +class ZenodoResource: zenodo_id: str title: str = None _description: str = None @@ -23,46 +26,58 @@ class ZenodoResource(): _license: str = None _version: str = None _communities: List[Community] = field(default_factory=list) - _projects: List[Project] = field(default_factory = list) + _projects: List[Project] = field(default_factory=list) metadata: Dict[str, object] = field(default_factory=dict) QID: str = None wdi: WikidataImporter = None api: Optional[MardiClient] = None def __post_init__(self): + # Logic to determine if 'Author' is the class or the module containing the class + if hasattr(Author, "Author") and not isinstance(Author, type): + author_factory = Author.Author + else: + author_factory = Author + + if not callable(author_factory): + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) + if self.api is None: - self.api = Importer.get_api('zenodo') + self.api = Importer.get_api("zenodo") if self.wdi is None: self.wdi = WikidataImporter() - with urllib.request.urlopen(f"https://zenodo.org/api/records/{self.zenodo_id}") as url: + with urllib.request.urlopen( + f"https://zenodo.org/api/records/{self.zenodo_id}" + ) as url: json_data = json.load(url) - self.metadata = json_data['metadata'] + self.metadata = json_data["metadata"] if self.metadata: - self.title = self.metadata['title'] + self.title = self.metadata["title"] - zenodo_id = 'wdt:P4901' + zenodo_id = "wdt:P4901" QID_results = self.api.search_entity_by_value(zenodo_id, self.zenodo_id) - if QID_results: self.QID = QID_results[0] + if QID_results: + self.QID = QID_results[0] if self.QID: # Get authors. item = self.api.item.get(self.QID) - author_QID = item.get_value('wdt:P50') + author_QID = item.get_value("wdt:P50") for QID in author_QID: author_item = self.api.item.get(entity_id=QID) - name = str(author_item.labels.get('en')) - orcid = author_item.get_value('wdt:P496') + name = str(author_item.labels.get("en")) + orcid = author_item.get_value("wdt:P496") orcid = orcid[0] if orcid else None aliases = [] - if author_item.aliases.get('en'): - for alias in author_item.aliases.get('en'): + if author_item.aliases.get("en"): + for alias in author_item.aliases.get("en"): aliases.append(str(alias)) - author = Author(self.api, - name=name, - orcid=orcid, - _aliases=aliases, - _QID=QID) + author = author_factory( + self.api, name=name, orcid=orcid, _aliases=aliases, _QID=QID + ) self._authors.append(author) return self.QID @@ -71,49 +86,71 @@ def description(self): desc_long = "" if "description" in self.metadata.keys(): desc_long = self.metadata["description"] - desc_long = re.sub(CLEANR, '', desc_long) # parse out html tags from the description - desc_long = re.sub(r'\n|\\N|\t|\\T', ' ', desc_long) # parse out tabs and new lines - desc_long = re.sub(r'^\s+|\s+$', '', desc_long) # parse out leading and trailing white space + desc_long = re.sub( + CLEANR, "", desc_long + ) # parse out html tags from the description + desc_long = re.sub( + r"\n|\\N|\t|\\T", " ", desc_long + ) # parse out tabs and new lines + desc_long = re.sub( + r"^\s+|\s+$", "", desc_long + ) # parse out leading and trailing white space if re.match("\w+", desc_long): self._description = desc_long return self._description - @property def publication_date(self): if not self._publication_date: - if re.match("\d{4}-\d{2}-\d{2}",self.metadata['publication_date']): - publication_date = f"{self.metadata['publication_date']}T00:00:00Z" - self._publication_date = publication_date + pub_date = self.metadata.get("publication_date") + if pub_date and re.match(r"\d{4}-\d{2}-\d{2}", pub_date): + self._publication_date = f"{pub_date}T00:00:00Z" return self._publication_date - + @property def license(self): - if not self._license and ('license' in self.metadata.keys()): - self._license = self.metadata['license'] - return self._license + if not self._license and ("license" in self.metadata.keys()): + self._license = self.metadata["license"] + return self._license @property def version(self): - if not self._version and ('version' in self.metadata.keys()): - self._version = self.metadata['version'] + if not self._version and ("version" in self.metadata.keys()): + self._version = self.metadata["version"] return self._version @property def authors(self): + # Logic to determine if 'Author' is the class or the module containing the class + if hasattr(Author, "Author") and not isinstance(Author, type): + author_factory = Author.Author + else: + author_factory = Author + + if not callable(author_factory): + raise TypeError( + "Could not resolve a callable Author class. Check your imports." + ) + if not self._authors: - for creator in self.metadata['creators']: - name = creator.get('name') - orcid = creator.get('orcid') - affiliation = creator.get('affiliation') - author = Author(self.api, name=name, orcid=orcid, affiliation=affiliation) + for creator in self.metadata.get("creators", []): + name = creator.get("name") + orcid = creator.get("orcid") + affiliation = creator.get("affiliation") + author = author_factory( + self.api, name=name, orcid=orcid, affiliation=affiliation + ) self._authors.append(author) return self._authors @property def resource_type(self): if not self._resource_type: - resource_type = self.metadata['resource_type']['title'] + resource_type_data = self.metadata.get("resource_type", {}) + resource_type = resource_type_data.get("title") + if not resource_type: + return self._resource_type + if resource_type == "Dataset": self._resource_type = "wd:Q1172284" self._mardi_type = "MaRDI dataset profile" @@ -144,14 +181,16 @@ def resource_type(self): @property def communities(self): if not self._communities and "communities" in self.metadata.keys(): - #if "communities" in self.metadata.keys(): - for communityCur in self.metadata["communities"]: - community_id = communityCur.get("id") - if community_id == "mathplus": - community = Community(api = self.api, wdi=self.wdi, community_id = community_id) - self._communities.append(community) + # if "communities" in self.metadata.keys(): + for communityCur in self.metadata["communities"]: + community_id = communityCur.get("id") + if community_id == "mathplus": + community = Community( + api=self.api, wdi=self.wdi, community_id=community_id + ) + self._communities.append(community) return self._communities - + @property def projects(self): community = None @@ -160,15 +199,23 @@ def projects(self): if communityCur.community_id == "mathplus": community = communityCur break - if not self._projects and community and self.metadata.get("related_identifiers"): + if ( + not self._projects + and community + and self.metadata.get("related_identifiers") + ): for related_ids in self.metadata.get("related_identifiers"): - #print("identifier: " + related_ids["identifier"]) + # print("identifier: " + related_ids["identifier"]) if related_ids["identifier"] in Project.get_project_ids(): - project = Project(api = self.api, community = community, project_id = related_ids["identifier"]) + project = Project( + api=self.api, + community=community, + project_id=related_ids["identifier"], + ) self._projects.append(project) return self._projects - def exists(self): + def exists(self): if self.QID: return self.QID @@ -177,23 +224,23 @@ def update(self): zenodo_item = self.api.item.new() zenodo_item.labels.set(language="en", value=self.title) - zenodo_id = zenodo_item.is_instance_of_with_property("wd:Q1172284", "wdt:P4901", self.zenodo_id) + zenodo_id = zenodo_item.is_instance_of_with_property( + "wd:Q1172284", "wdt:P4901", self.zenodo_id + ) new_item = self.api.item.get(entity_id=zenodo_id) - - if self.license['id'] == "cc-by-4.0": + + if self.license["id"] == "cc-by-4.0": new_item.add_claim("wdt:P275", "wd:Q20007257") - elif self.license['id'] == "cc-by-sa-4.0": + elif self.license["id"] == "cc-by-sa-4.0": new_item.add_claim("wdt:P275", "wd:Q18199165") - elif self.license['id'] == "cc-by-nc-sa-4.0": + elif self.license["id"] == "cc-by-nc-sa-4.0": new_item.add_claim("wdt:P275", "wd:Q42553662") - elif self.license['id'] == "mit-license": + elif self.license["id"] == "mit-license": new_item.add_claim("wdt:P275", "wd:Q334661") - return new_item.write() - - - def create(self, update = False): + return new_item.write() + def create(self, update=False): if not update: if self.QID: return self.QID @@ -201,130 +248,123 @@ def create(self, update = False): update_claim = "append_or_replace" else: item = self.api.item.get(entity_id=self.QID) - update_claim = "replace_all" - + update_claim = "replace_all" + if self.title: item.labels.set(language="en", value=self.title) if self.resource_type and self.resource_type != "wd:Q37866906": - desc = f"{self.metadata['resource_type']['title']} published at Zenodo repository. " - item.add_claim('wdt:P31', self.resource_type, action = update_claim) + desc = f"{self.metadata['resource_type']['title']} published at Zenodo repository. " + item.add_claim("wdt:P31", self.resource_type, action=update_claim) else: desc = "Resource published at Zenodo repository. " item.descriptions.set(language="en", value=desc) if self.description: prop_nr = self.api.get_local_id_by_label("description", "property") - item.add_claim(prop_nr, self.description, action = update_claim) - + item.add_claim(prop_nr, self.description, action=update_claim) + # Publication date if self.publication_date: - item.add_claim('wdt:P577', self.publication_date, action = update_claim) - + item.add_claim("wdt:P577", self.publication_date, action=update_claim) + # Authors if update: - # Delete all authors and keep just the new - author_prop_nr = self.api.get_local_id_by_label("wdt:P50", "property") - original_claims = item.claims.get(author_prop_nr) - - new_authors = [] - for creator in self.metadata['creators']: - name = creator.get('name') - orcid = creator.get('orcid') - affiliation = creator.get('affiliation') - author = Author(self.api, name=name, orcid=orcid, affiliation=affiliation) - new_authors.append(author) - - author_QID = self.__preprocess_authors() - new_authors_QID = [] - authors_to_remove_QID = [] - for autor in self.authors: - if autor in new_authors: - new_authors_QID.append(autor.QID) - else: - authors_to_remove_QID.append(autor.QID) - - claims = [] - for author in new_authors_QID: - claims.append(self.api.get_claim("wdt:P50", author)) - item.add_claims(claims) + # Remove all existing P50 and P2093 claims + author_item_prop = self.api.get_local_id_by_label("wdt:P50", "property") + author_string_prop = self.api.get_local_id_by_label("wdt:P2093", "property") - for author in authors_to_remove_QID: - for claim in original_claims: - if claim.mainsnak.datavalue['value']['id'] == author: - claim.remove() + for prop in (author_item_prop, author_string_prop): + old_claims = item.claims.get(prop) + if old_claims: + for c in old_claims: + c.remove() + + # Add new author claims + item.add_claims(self.__preprocess_authors()) else: - author_QID = self.__preprocess_authors() - claims = [] - for author in author_QID: - claims.append(self.api.get_claim("wdt:P50", author)) + claims = self.__preprocess_authors() item.add_claims(claims) # Zenodo ID & DOI if self.zenodo_id: - item.add_claim('wdt:P4901', self.zenodo_id, action = update_claim) + item.add_claim("wdt:P4901", self.zenodo_id, action=update_claim) doi = f"10.5281/zenodo.{self.zenodo_id}" - item.add_claim('wdt:P356', doi.upper(), action = update_claim) + item.add_claim("wdt:P356", doi.upper(), action=update_claim) # License if self.license: - if self.license['id'] == "cc-by-4.0": - item.add_claim("wdt:P275", "wd:Q20007257", action = update_claim) - elif self.license['id'] == "cc-by-sa-4.0": - item.add_claim("wdt:P275", "wd:Q18199165", action = update_claim) - elif self.license['id'] == "cc-by-nc-sa-4.0": - item.add_claim("wdt:P275", "wd:Q42553662", action = update_claim) - elif self.license['id'] == "mit-license": - item.add_claim("wdt:P275", "wd:Q334661", action = update_claim) + if self.license["id"] == "cc-by-4.0": + item.add_claim("wdt:P275", "wd:Q20007257", action=update_claim) + elif self.license["id"] == "cc-by-sa-4.0": + item.add_claim("wdt:P275", "wd:Q18199165", action=update_claim) + elif self.license["id"] == "cc-by-nc-sa-4.0": + item.add_claim("wdt:P275", "wd:Q42553662", action=update_claim) + elif self.license["id"] == "mit-license": + item.add_claim("wdt:P275", "wd:Q334661", action=update_claim) if self.version: if self.resource_type: - if self.resource_type == "wd:Q1172284": #dataset - prop_nr = self.api.get_local_id_by_label("dataset version identifier", "property") - item.add_claim(prop_nr, self.version, action = update_claim) - elif self.resource_type == "wd:Q7397": #software: - item.add_claim("wdt:P348", self.version, action = update_claim) + if self.resource_type == "wd:Q1172284": # dataset + prop_nr = self.api.get_local_id_by_label( + "dataset version identifier", "property" + ) + item.add_claim(prop_nr, self.version, action=update_claim) + elif self.resource_type == "wd:Q7397": # software: + item.add_claim("wdt:P348", self.version, action=update_claim) # Communities if self.communities: for community in self.communities: prop_nr = self.api.get_local_id_by_label("community", "property") - item.add_claim(prop_nr, community.QID, action = update_claim) + item.add_claim(prop_nr, community.QID, action=update_claim) # Projects if self.projects: for project in self.projects: project.create() - prop_nr = self.api.get_local_id_by_label("Internal Project ID", "property") - item.add_claim(prop_nr, project.QID, action = update_claim) + prop_nr = self.api.get_local_id_by_label( + "Internal Project ID", "property" + ) + item.add_claim(prop_nr, project.QID, action=update_claim) if self._mardi_type: - item.add_claim('MaRDI profile type', self._mardi_type, action = update_claim) - + item.add_claim("MaRDI profile type", self._mardi_type, action=update_claim) + self.QID = item.write().id if self.QID: - print(f"Zenodo resource with Zenodo id: {self.zenodo_id} created with ID {self.QID}.") + print( + f"Zenodo resource with Zenodo id: {self.zenodo_id} created with ID {self.QID}." + ) return self.QID else: - print(f"Zenodo resource with Zenodo id: {self.zenodo_id} could not be created.") + print( + f"Zenodo resource with Zenodo id: {self.zenodo_id} could not be created." + ) return None def __preprocess_authors(self) -> List[str]: """Processes the author information of each publication. - Create the author if it does not exist already as an + Create the author if it does not exist already as an entity in wikibase. - + + If an author has no ORCID and no existing QID, store the author as + an author name string (P2093) instead of creating an author item. + Returns: - List[str]: - QIDs corresponding to each author. + List: + Author claims to be added (P50 entity claims and/or P2093 string claims). """ - author_QID = [] + claims = [] for author in self.authors: - if not author.QID: - author.create() - author_QID.append(author.QID) - return author_QID - + if author.orcid or author.QID: + if not author.QID: + author.create() + claims.append(self.api.get_claim("wdt:P50", author.QID)) + else: + if author.name: + claims.append(self.api.get_claim("wdt:P2093", author.name)) + return claims