diff --git a/pydnsbl/__init__.py b/pydnsbl/__init__.py index d03096b..7b0261b 100644 --- a/pydnsbl/__init__.py +++ b/pydnsbl/__init__.py @@ -1 +1,7 @@ from .checker import DNSBLChecker, DNSBLDomainChecker, DNSBLIpChecker + +__all__ = ( + "DNSBLChecker", + "DNSBLDomainChecker", + "DNSBLIpChecker" +) \ No newline at end of file diff --git a/pydnsbl/checker.py b/pydnsbl/checker.py index aa91cf1..dff0bc1 100644 --- a/pydnsbl/checker.py +++ b/pydnsbl/checker.py @@ -7,22 +7,28 @@ print(result.categories) print(result.detected_by) """ + import abc import asyncio -import idna import ipaddress import re import sys import threading import warnings +from typing import Iterable, NamedTuple, Optional, Union import aiodns +import idna +from pycares import ares_query_a_result -from .providers import Provider, BASE_PROVIDERS, BASE_DOMAIN_PROVIDERS +from .providers import BASE_DOMAIN_PROVIDERS, BASE_PROVIDERS, Provider -if sys.platform == 'win32' and sys.version_info >= (3, 8): +if sys.platform == "win32" and sys.version_info >= (3, 8): # fixes https://github.com/dmippolitov/pydnsbl/issues/12 - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + # Some users may want to use winloop which is a windows version of uvloop + if asyncio.DefaultEventLoopPolicy.__name__ == "WindowsProactorEventLoopPolicy": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + class DNSBLResult: """ @@ -37,18 +43,23 @@ class DNSBLResult: this dnsbls. dict: {'dnsbl_list_name': list(categories_from_this_dnsbl)} * categories - set of dnsbl categories from all providers (subset of DNSBL_CATEGORIES) """ - def __init__(self, addr=None, results=None): + + def __init__( + self, + addr: Optional[list[Union[str, bytes]]] = None, + results: Optional[list["DNSBLResponse"]] = None, + ): self.addr = addr self._results = results self.blacklisted = False - self.providers = [] - self.failed_providers = [] - self.detected_by = {} + self.providers: list[Provider] = [] + self.failed_providers: list[str] = [] + self.detected_by: dict = {} self.categories = set() self.process_results() - def process_results(self): - """ Process results by providers """ + def process_results(self) -> None: + """Process results by providers""" for result in self._results: provider = result.provider self.providers.append(provider) @@ -63,35 +74,47 @@ def process_results(self): self.categories = self.categories.union(provider_categories) self.detected_by[provider.host] = list(provider_categories) - def __repr__(self): - blacklisted = '[BLACKLISTED]' if self.blacklisted else '' - return "" % (self.addr, blacklisted, len(self.detected_by), - len(self.providers)) + def __repr__(self) -> str: + blacklisted = "[BLACKLISTED]" if self.blacklisted else "" + return "" % ( + self.addr, + blacklisted, + len(self.detected_by), + len(self.providers), + ) -class DNSBLResponse: + +class DNSBLResponse(NamedTuple): """ DNSBL Response object """ - def __init__(self, addr=None, provider=None, response=None, error=None): - self.addr = addr - self.provider = provider - self.response = response - self.error = error + + addr: Optional[Union[str, bytes]] = None + provider: Optional[Provider] = None + response: Optional[list[ares_query_a_result]] = None + error: Optional[aiodns.error.DNSError] = None + class BaseDNSBLChecker(abc.ABC): - """ BASE Checker for DNSBL lists - Arguments: - * providers(list) - list of providers (Provider instance or str) - * timeout(int) - timeout of dns requests will be passed to resolver - * tries(int) - retry times + """BASE Checker for DNSBL lists + Arguments: + * providers(list) - list of providers (Provider instance or str) + * timeout(int) - timeout of dns requests will be passed to resolver + * tries(int) - retry times """ - def __init__(self, providers=BASE_PROVIDERS, timeout=5, - tries=2, concurrency=200, loop=None): + def __init__( + self, + providers: list[Provider] = BASE_PROVIDERS, + timeout: float = 5, + tries: int = 2, + concurrency: int = 200, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): self.providers = [] for provider in providers: if not isinstance(provider, Provider): - raise ValueError('providers should contain only Provider instances') + raise ValueError("providers should contain only Provider instances") self.providers.append(provider) if not loop: if threading.current_thread() == threading.main_thread(): @@ -101,11 +124,20 @@ def __init__(self, providers=BASE_PROVIDERS, timeout=5, asyncio.set_event_loop(self._loop) else: self._loop = loop - self._resolver = aiodns.DNSResolver(timeout=timeout, tries=tries, loop=self._loop) + self._resolver = aiodns.DNSResolver( + timeout=timeout, tries=tries, loop=self._loop + ) self._semaphore = asyncio.Semaphore(concurrency) + async def __aenter__(self): + return self - async def dnsbl_request(self, request, provider): + async def __aexit__(self, *args): + return await self._resolver.close() + + async def dnsbl_request( + self, request: Union[bytes, str], provider: str + ) -> DNSBLResponse: """ Make lookup to dnsbl provider for ip Parameters: @@ -123,32 +155,34 @@ async def dnsbl_request(self, request, provider): dnsbl_query = "%s.%s" % (self.prepare_query(request), provider.host) try: async with self._semaphore: - response = await self._resolver.query(dnsbl_query, 'A') + response = await self._resolver.query(dnsbl_query, "A") except aiodns.error.DNSError as exc: - if exc.args[0] != 4: # 4: domain name not found: + if exc.args[0] != 4: # 4: domain name not found: error = exc - return DNSBLResponse(addr=request, provider=provider, response=response, error=error) + return DNSBLResponse( + addr=request, provider=provider, response=response, error=error + ) @abc.abstractmethod - def prepare_query(self, request): + def prepare_query(self, request: Union[bytes, str]) -> str: """ Prepare query to dnsbl """ return NotImplemented - async def check_async(self, request): - tasks = [] + async def check_async(self, request: Union[bytes, str]): + tasks: list[asyncio.Task[DNSBLResponse]] = [] for provider in self.providers: tasks.append(self.dnsbl_request(request, provider)) results = await asyncio.gather(*tasks) return DNSBLResult(addr=request, results=results) - def check(self, request): + def check(self, request: Union[bytes, str]) -> DNSBLResult: return self._loop.run_until_complete(self.check_async(request)) - def bulk_check(self, requests): - tasks = [] + def bulk_check(self, requests: Iterable[Union[bytes, str]]) -> list[DNSBLResult]: + tasks: list[asyncio.Task[DNSBLResult]] = [] for request in requests: tasks.append(self.check_async(request)) return self._loop.run_until_complete(asyncio.gather(*tasks)) @@ -158,18 +192,17 @@ class DNSBLIpChecker(BaseDNSBLChecker): """ Checker for ips """ - def prepare_query(self, request): + + def prepare_query(self, request: Union[bytes, str]) -> str: address = ipaddress.ip_address(request) if address.version == 4: - return '.'.join(reversed(request.split('.'))) + return ".".join(reversed(request.split("."))) elif address.version == 6: # according to RFC: https://tools.ietf.org/html/rfc5782#section-2.4 - request_stripped = address.exploded.replace(':', '') - return '.'.join(reversed([x for x in request_stripped])) + request_stripped = address.exploded.replace(":", "") + return ".".join(reversed([x for x in request_stripped])) else: - raise ValueError('unknown ip version') - - + raise ValueError("unknown ip version") class DNSBLDomainChecker(BaseDNSBLChecker): @@ -178,33 +211,50 @@ class DNSBLDomainChecker(BaseDNSBLChecker): """ # https://regex101.com/r/vdrgm7/1 - DOMAIN_REGEX = re.compile(r"^(((?!-))(xn--|_{1,1})?[a-z0-9-]{0,61}[a-z0-9]{1,1}\.)*(xn--[a-z0-9][a-z0-9\-]{0,60}|[a-z0-9-]{1,30}\.[a-z]{2,})$") + DOMAIN_REGEX = re.compile( + r"^(((?!-))(xn--|_{1,1})?[a-z0-9-]{0,61}[a-z0-9]{1,1}\.)*(xn--[a-z0-9][a-z0-9\-]{0,60}|[a-z0-9-]{1,30}\.[a-z]{2,})$" + ) - def __init__(self, providers=BASE_DOMAIN_PROVIDERS, timeout=5, - tries=2, concurrency=200, loop=None): - super().__init__(providers=providers, timeout=timeout, - tries=tries, concurrency=concurrency, loop=loop) + def __init__( + self, + providers: list[Provider] = BASE_DOMAIN_PROVIDERS, + timeout: float = 5, + tries: int = 2, + concurrency: int = 200, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): + super().__init__( + providers=providers, + timeout=timeout, + tries=tries, + concurrency=concurrency, + loop=loop, + ) - def prepare_query(self, request): - request = request.lower() # Adding support for capitalized letters in domain name. + def prepare_query(self, request: Union[str, bytes]) -> str: + request = ( + request.lower() + ) # Adding support for capitalized letters in domain name. domain_idna = idna.encode(request).decode() if not self.DOMAIN_REGEX.match(domain_idna): - raise ValueError('should be valid domain, got %s' % domain_idna) + raise ValueError("should be valid domain, got %s" % domain_idna) return domain_idna + # COMPAT class DNSBLChecker(DNSBLIpChecker): """ Will be deprecated, use DNSBLIpChecker """ + def __init__(self, *args, **kwargs): - warnings.warn('deprecated, use DNSBLIpChecker', DeprecationWarning) + warnings.warn("deprecated, use DNSBLIpChecker", DeprecationWarning) super().__init__(*args, **kwargs) def check_ip(self, addr): - warnings.warn('deprecated, use check method instead', DeprecationWarning) + warnings.warn("deprecated, use check method instead", DeprecationWarning) return self.check(addr) def check_ips(self, addrs): - warnings.warn('deprecated, use bulk check method instead', DeprecationWarning) + warnings.warn("deprecated, use bulk check method instead", DeprecationWarning) return self.bulk_check(addrs) diff --git a/pydnsbl/providers.py b/pydnsbl/providers.py index dd91b2a..c4d909b 100644 --- a/pydnsbl/providers.py +++ b/pydnsbl/providers.py @@ -2,23 +2,29 @@ Place to define providers. Most part of _BASE_PROVIDERS was taken from https://github.com/vincecarney/dnsbl """ + ### DNSBL CATEGORIES ### # providers answers could be interpreted in one of the following categories -DNSBL_CATEGORY_UNKNOWN = 'unknown' -DNSBL_CATEGORY_SPAM = 'spam' -DNSBL_CATEGORY_EXPLOITS = 'exploits' -DNSBL_CATEGORY_PHISH = 'phish' -DNSBL_CATEGORY_MALWARE = 'malware' -DNSBL_CATEGORY_CNC = 'cnc' -DNSBL_CATEGORY_ABUSED = 'abused' -DNSBL_CATEGORY_LEGIT = 'legit' +from pycares import ares_query_a_result +from typing import Optional + +DNSBL_CATEGORY_UNKNOWN = "unknown" +DNSBL_CATEGORY_SPAM = "spam" +DNSBL_CATEGORY_EXPLOITS = "exploits" +DNSBL_CATEGORY_PHISH = "phish" +DNSBL_CATEGORY_MALWARE = "malware" +DNSBL_CATEGORY_CNC = "cnc" +DNSBL_CATEGORY_ABUSED = "abused" +DNSBL_CATEGORY_LEGIT = "legit" + class Provider(object): + __slots__ = ("host",) - def __init__(self, host): + def __init__(self, host: str): self.host = host - def process_response(self, response): + def process_response(self, response: list[ares_query_a_result]) -> set[str]: """ Usually DNSBL lists returns ip-codes like this: 127.0.0.2 Some of the lists provides some specification about @@ -34,111 +40,140 @@ def process_response(self, response): set of categories (DNSBL_CATEGORIES subset) """ - result = set() + result: set[str] = set() if response: result.add(DNSBL_CATEGORY_UNKNOWN) return result - def __repr__(self): + def __repr__(self) -> str: return "" % self.host + class ZenSpamhaus(Provider): - """ Combined spamhaus list: - https://www.spamhaus.org/faq/section/DNSBL%20Usage#200 + """Combined spamhaus list: + https://www.spamhaus.org/faq/section/DNSBL%20Usage#200 """ - def __init__(self, host='zen.spamhaus.org'): + def __init__(self, host="zen.spamhaus.org"): Provider.__init__(self, host=host) - def process_response(self, response): - categories = set() + def process_response( + self, response: Optional[list[ares_query_a_result]] + ) -> set[str]: + categories: set[str] = set() for result in response: - if result.host in ['127.0.0.2', '127.0.0.3', '127.0.0.9']: + if result.host in ["127.0.0.2", "127.0.0.3", "127.0.0.9"]: categories.add(DNSBL_CATEGORY_SPAM) - elif result.host in ['127.0.0.4', '127.0.0.5', '127.0.0.6', '127.0.0.7']: + elif result.host in ["127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7"]: categories.add(DNSBL_CATEGORY_EXPLOITS) else: categories.add(DNSBL_CATEGORY_UNKNOWN) return categories + # this list is converted into list of Providers bellow _BASE_PROVIDERS = [ - 'all.s5h.net', - 'aspews.ext.sorbs.net', - 'b.barracudacentral.org', - 'bl.nordspam.com', - 'blackholes.five-ten-sg.com', - 'blacklist.woody.ch', - 'bogons.cymru.com', - 'cbl.abuseat.org', - 'combined.abuse.ch', - 'combined.rbl.msrbl.net', - 'db.wpbl.info', - 'dnsbl-2.uceprotect.net', - 'dnsbl-3.uceprotect.net', - 'dnsbl.cyberlogic.net', - 'dnsbl.sorbs.net', - 'drone.abuse.ch', - 'images.rbl.msrbl.net', - 'ips.backscatterer.org', - 'korea.services.net', - 'matrix.spfbl.net', - 'phishing.rbl.msrbl.net', - 'proxy.bl.gweep.ca', - 'proxy.block.transip.nl', - 'psbl.surriel.com', - 'rbl.interserver.net', - 'relays.bl.gweep.ca', - 'relays.bl.kundenserver.de', - 'relays.nether.net', - 'residential.block.transip.nl', - 'singular.ttk.pte.hu', - 'spam.dnsbl.sorbs.net', - 'spam.rbl.msrbl.net', - 'spambot.bls.digibase.ca', - 'spamrbl.imp.ch', - 'spamsources.fabel.dk', - 'ubl.lashback.com', - 'virbl.bit.nl', - 'virus.rbl.msrbl.net', - 'virus.rbl.jp', - 'wormrbl.imp.ch', - 'z.mailspike.net', + "all.s5h.net", + "aspews.ext.sorbs.net", + "b.barracudacentral.org", + "bl.nordspam.com", + "blackholes.five-ten-sg.com", + "blacklist.woody.ch", + "bogons.cymru.com", + "cbl.abuseat.org", + "combined.abuse.ch", + "combined.rbl.msrbl.net", + "db.wpbl.info", + "dnsbl-2.uceprotect.net", + "dnsbl-3.uceprotect.net", + "dnsbl.cyberlogic.net", + "dnsbl.sorbs.net", + "drone.abuse.ch", + "images.rbl.msrbl.net", + "ips.backscatterer.org", + "korea.services.net", + "matrix.spfbl.net", + "phishing.rbl.msrbl.net", + "proxy.bl.gweep.ca", + "proxy.block.transip.nl", + "psbl.surriel.com", + "rbl.interserver.net", + "relays.bl.gweep.ca", + "relays.bl.kundenserver.de", + "relays.nether.net", + "residential.block.transip.nl", + "singular.ttk.pte.hu", + "spam.dnsbl.sorbs.net", + "spam.rbl.msrbl.net", + "spambot.bls.digibase.ca", + "spamrbl.imp.ch", + "spamsources.fabel.dk", + "ubl.lashback.com", + "virbl.bit.nl", + "virus.rbl.msrbl.net", + "virus.rbl.jp", + "wormrbl.imp.ch", + "z.mailspike.net", ] + class DblSpamhaus(Provider): - """ Spamhaus domain blacklist - https://www.spamhaus.org/faq/section/Spamhaus%20DBL#291 + """Spamhaus domain blacklist + https://www.spamhaus.org/faq/section/Spamhaus%20DBL#291 """ + CATEGORY_MAPPING = { - '127.0.1.2': {DNSBL_CATEGORY_SPAM}, - '127.0.1.4': {DNSBL_CATEGORY_PHISH}, - '127.0.1.5': {DNSBL_CATEGORY_MALWARE}, - '127.0.1.6': {DNSBL_CATEGORY_CNC}, - '127.0.1.102': {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_LEGIT, DNSBL_CATEGORY_SPAM}, - '127.0.1.103': {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_SPAM}, - '127.0.1.104': {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_LEGIT, DNSBL_CATEGORY_PHISH}, - '127.0.1.105': {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_LEGIT, DNSBL_CATEGORY_MALWARE}, - '127.0.1.106': {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_LEGIT, DNSBL_CATEGORY_CNC} + "127.0.1.2": {DNSBL_CATEGORY_SPAM}, + "127.0.1.4": {DNSBL_CATEGORY_PHISH}, + "127.0.1.5": {DNSBL_CATEGORY_MALWARE}, + "127.0.1.6": {DNSBL_CATEGORY_CNC}, + "127.0.1.102": { + DNSBL_CATEGORY_ABUSED, + DNSBL_CATEGORY_LEGIT, + DNSBL_CATEGORY_SPAM, + }, + "127.0.1.103": {DNSBL_CATEGORY_ABUSED, DNSBL_CATEGORY_SPAM}, + "127.0.1.104": { + DNSBL_CATEGORY_ABUSED, + DNSBL_CATEGORY_LEGIT, + DNSBL_CATEGORY_PHISH, + }, + "127.0.1.105": { + DNSBL_CATEGORY_ABUSED, + DNSBL_CATEGORY_LEGIT, + DNSBL_CATEGORY_MALWARE, + }, + "127.0.1.106": { + DNSBL_CATEGORY_ABUSED, + DNSBL_CATEGORY_LEGIT, + DNSBL_CATEGORY_CNC, + }, } + __slots__ = ( + "host", + "CATEGORY_MAPPING", + ) - def __init__(self, host='dbl.spamhaus.org'): + def __init__(self, host="dbl.spamhaus.org"): Provider.__init__(self, host=host) - def process_response(self, response): + def process_response(self, response: list[ares_query_a_result]) -> set[str]: categories = set() for result in response: - result_categories = self.CATEGORY_MAPPING.get(result.host, {DNSBL_CATEGORY_UNKNOWN}) + result_categories = self.CATEGORY_MAPPING.get( + result.host, {DNSBL_CATEGORY_UNKNOWN} + ) categories.update(result_categories) return categories + # list of domain providers _DOMAIN_PROVIDERS = [ - 'uribl.spameatingmonkey.net', - 'multi.surbl.org', - 'rhsbl.sorbs.net ' + "uribl.spameatingmonkey.net", + "multi.surbl.org", + "rhsbl.sorbs.net ", ] BASE_PROVIDERS = [Provider(host) for host in _BASE_PROVIDERS] + [ZenSpamhaus()] diff --git a/pydnsbl/py.typed b/pydnsbl/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pydnsbl/tests.py b/pydnsbl/tests.py index 760f446..c5e8657 100644 --- a/pydnsbl/tests.py +++ b/pydnsbl/tests.py @@ -1,84 +1,91 @@ -import asyncio import threading import pytest -from .checker import DNSBLChecker, DNSBLIpChecker, DNSBLDomainChecker + +from .checker import DNSBLChecker, DNSBLDomainChecker, DNSBLIpChecker from .providers import Provider + # IP TESTS -def test_checker(): +def test_checker() -> None: checker = DNSBLIpChecker() - res = checker.check('68.128.212.240') + res = checker.check("68.128.212.240") assert res.blacklisted assert res.categories assert res.detected_by - results = checker.bulk_check(['68.128.212.240', '8.8.8.8']) + results = checker.bulk_check(["68.128.212.240", "8.8.8.8"]) # check bulk check assert results[0].detected_by == res.detected_by assert not results[1].blacklisted -def test_checker_ipv6(): + +def test_checker_ipv6() -> None: checker = DNSBLIpChecker() - res = checker.check('2001:4860:4860::8844') + res = checker.check("2001:4860:4860::8844") assert not res.blacklisted assert not res.categories assert not res.detected_by assert not res.failed_providers - checker = DNSBLIpChecker(providers=[Provider('v6.fullbogons.cymru.com')]) - res = checker.check('::1') + checker = DNSBLIpChecker(providers=[Provider("v6.fullbogons.cymru.com")]) + res = checker.check("::1") assert res.blacklisted assert res.categories assert res.detected_by -def test_providers(): - """ Providers should not mark google ip as bad """ + +def test_providers() -> None: + """Providers should not mark google ip as bad""" checker = DNSBLIpChecker() - res = checker.check('8.8.8.8') + res = checker.check("8.8.8.8") assert not res.blacklisted assert not res.categories assert not res.detected_by assert not res.failed_providers -def test_wrong_ip_format(): - misformated_ips = ['abc', '8.8.8.256'] + +def test_wrong_ip_format() -> None: + misformated_ips = ["abc", "8.8.8.256"] for ip in misformated_ips: checker = DNSBLIpChecker() with pytest.raises(ValueError): - checker.check(ip) + checker.check(ip) + # DOMAIN TESTS -def test_domain_checker(): +def test_domain_checker() -> None: checker = DNSBLDomainChecker() - malicious_domain = 'dbltest.com' + malicious_domain = "dbltest.com" res = checker.check(malicious_domain) assert res.blacklisted assert res.categories assert res.detected_by - results = checker.bulk_check([malicious_domain, 'google.com']) + results = checker.bulk_check([malicious_domain, "google.com"]) # check bulk check assert results[0].detected_by == res.detected_by assert not results[1].blacklisted -def test_domain_providers(): - """ Domain Providers should not mark google.com as bad """ + +def test_domain_providers() -> None: + """Domain Providers should not mark google.com as bad""" checker = DNSBLDomainChecker() - res = checker.check('google.com') + res = checker.check("google.com") assert not res.blacklisted assert not res.categories assert not res.detected_by assert not res.failed_providers -def test_wrong_domain_format(): - misformated_ips = ['abc-', '8.8.8.256', 'bababa'] + +def test_wrong_domain_format() -> None: + misformated_ips = ["abc-", "8.8.8.256", "bababa"] for ip in misformated_ips: checker = DNSBLDomainChecker() with pytest.raises(ValueError): - print(checker.check(ip)) + print(checker.check(ip)) -def test_domain_variants(): +def test_domain_variants() -> None: # capitalized / 3th+ levels, idna - capitalized_domains = ['Google.com', 'дом.рф', 'www.digital.govt.nz'] + capitalized_domains = ["Google.com", "дом.рф", "www.digital.govt.nz"] for domain in capitalized_domains: checker = DNSBLDomainChecker() res = checker.check(domain) @@ -87,50 +94,62 @@ def test_domain_variants(): assert not res.detected_by assert not res.failed_providers + # Threading tests -def test_main_thread(): +def test_main_thread() -> None: result = None + def test(): nonlocal result checker = DNSBLIpChecker() - result = checker.check('68.128.212.240') + result = checker.check("68.128.212.240") + thr = threading.Thread(target=test) thr.start() thr.join() assert result.blacklisted + # ipv6 tests -def test_ipv6_converting(): +def test_ipv6_converting() -> None: # https://datatracker.ietf.org/doc/html/rfc5782#section-2.4 checker = DNSBLIpChecker() - assert checker.prepare_query('2001:db8:1:2:3:4:567:89ab') == "b.a.9.8.7.6.5.0.4.0.0.0.3.0.0.0.2.0.0.0.1.0.0.0.8.b.d.0.1.0.0.2" - assert checker.prepare_query('2600:2600::f03c:91ff:fe50:d2') == "2.d.0.0.0.5.e.f.f.f.1.9.c.3.0.f.0.0.0.0.0.0.0.0.0.0.6.2.0.0.6.2" + assert ( + checker.prepare_query("2001:db8:1:2:3:4:567:89ab") + == "b.a.9.8.7.6.5.0.4.0.0.0.3.0.0.0.2.0.0.0.1.0.0.0.8.b.d.0.1.0.0.2" + ) + assert ( + checker.prepare_query("2600:2600::f03c:91ff:fe50:d2") + == "2.d.0.0.0.5.e.f.f.f.1.9.c.3.0.f.0.0.0.0.0.0.0.0.0.0.6.2.0.0.6.2" + ) ## COMPAT TESTS -def test_checker_compat_0_6(): +def test_checker_compat_0_6() -> None: checker = DNSBLChecker() - res = checker.check_ip('68.128.212.240') + res = checker.check_ip("68.128.212.240") assert res.blacklisted assert res.categories assert res.detected_by - results = checker.check_ips(['68.128.212.240', '8.8.8.8']) + results = checker.check_ips(["68.128.212.240", "8.8.8.8"]) # check bulk check assert results[0].detected_by == res.detected_by assert not results[1].blacklisted -def test_providers_compat_0_6(): - """ Providers should not mark google ip as bad """ + +def test_providers_compat_0_6() -> None: + """Providers should not mark google ip as bad""" checker = DNSBLChecker() - res = checker.check_ip('8.8.8.8') + res = checker.check_ip("8.8.8.8") assert not res.blacklisted assert not res.categories assert not res.detected_by assert not res.failed_providers -def test_wrong_ip_format_compat_0_6(): - misformated_ips = ['abc', '8.8.8.256'] + +def test_wrong_ip_format_compat_0_6() -> None: + misformated_ips = ["abc", "8.8.8.256"] for ip in misformated_ips: checker = DNSBLChecker() with pytest.raises(ValueError): - checker.check_ip(ip) + checker.check_ip(ip)