diff --git a/setup.py b/setup.py index 2c3b8f5..24295a9 100644 --- a/setup.py +++ b/setup.py @@ -5,6 +5,8 @@ import os from setuptools import Extension, setup +from setuptools.command.build_ext import build_ext +import sys cchardet_dir = "src/cchardet/" uchardet_dir = "src/ext/uchardet/src" @@ -12,6 +14,7 @@ sources = cchardet_sources uchardet_sources = [ + os.path.join(cchardet_dir, "_cchardet.pyx"), os.path.join(uchardet_dir, "LangModels/LangArabicModel.cpp"), os.path.join(uchardet_dir, "LangModels/LangBelarusianModel.cpp"), os.path.join(uchardet_dir, "LangModels/LangBulgarianModel.cpp"), @@ -75,6 +78,84 @@ os.path.join(uchardet_dir, "uchardet.cpp"), ] sources += uchardet_sources +print(sources) + +class ccardet_build_ext(build_ext): + user_options = build_ext.user_options + [ + ("cython-always", None, "run cythonize() even if .c files are present"), + ( + "cython-annotate", + None, + "Produce a colorized HTML version of the Cython source.", + ), + ("cython-directives=", None, "Cythion compiler directives"), + ] + + def initialize_options(self): + super().initialize_options() + self.cython_always = False + self.cython_annotate = False + self.cython_directives = None + + def finalize_options(self): + return super().finalize_options() + + def finalize_options(self): + need_cythonize = self.cython_always + cfiles = {} + + for extension in self.distribution.ext_modules: + for i, sfile in enumerate(extension.sources): + if sfile.endswith(".pyx"): + prefix, ext = os.path.splitext(sfile) + cfile = prefix + ".c" + + if os.path.exists(cfile) and not self.cython_always: + extension.sources[i] = cfile + else: + if os.path.exists(cfile): + cfiles[cfile] = os.path.getmtime(cfile) + else: + cfiles[cfile] = 0 + need_cythonize = True + + if need_cythonize: + + # Double check Cython presence in case setup_requires + # didn't go into effect (most likely because someone + # imported Cython before setup_requires injected the + # correct egg into sys.path. + try: + import Cython + except ImportError: + raise RuntimeError( + "please install cython to compile cchardet from source" + ) + + from Cython.Build import cythonize + + directives = {} + if self.cython_directives: + for directive in self.cython_directives.split(","): + k, _, v = directive.partition("=") + if v.lower() == "false": + v = False + if v.lower() == "true": + v = True + directives[k] = v + self.cython_directives = directives + + self.distribution.ext_modules[:] = cythonize( + self.distribution.ext_modules, + compiler_directives=directives, + annotate=self.cython_annotate, + emit_linenums=self.debug, + ) + + return super().finalize_options() + + + setup( package_dir={"": "src"}, @@ -87,7 +168,10 @@ sources=sources, include_dirs=[uchardet_dir], language="c++", - extra_compile_args=['-std=c++11'], + extra_compile_args=['-std=c++11'] if sys.platform != "win32" else [], # Satisfy MSVC Compiler it should default to C++17 ) ], + cmdclass={ + "build_ext":ccardet_build_ext + } ) diff --git a/src/cchardet/__init__.py b/src/cchardet/__init__.py index f616d7f..704233e 100644 --- a/src/cchardet/__init__.py +++ b/src/cchardet/__init__.py @@ -1,10 +1,10 @@ -from . import _cchardet - +from ._cchardet import UniversalDetector as UniversalDetector, detect_with_confidence as detect_with_confidence +from .typedefs import DecodeResultDict version = (2, 2, 0, "alpha", 3) __version__ = "2.2.0a3" -def detect(msg): +def detect(msg: bytes) -> DecodeResultDict: """ Args: msg: str @@ -14,8 +14,8 @@ def detect(msg): "confidence": float } """ - encoding, confidence = _cchardet.detect_with_confidence(msg) - if isinstance(encoding, bytes): + encoding, confidence = detect_with_confidence(msg) + if encoding is not None: encoding = encoding.decode() if encoding == "MAC-CENTRALEUROPE": @@ -24,33 +24,10 @@ def detect(msg): return {"encoding": encoding, "confidence": confidence} -class UniversalDetector(object): - def __init__(self): - self._detector = _cchardet.UniversalDetector() - - def __enter__(self): - return self - - def __exit__(self, exception_type, exception_value, traceback): - self.close() - return False - - def reset(self): - self._detector.reset() - - def feed(self, data): - self._detector.feed(data) - - def close(self): - self._detector.close() - - @property - def done(self): - return self._detector.done - - @property - def result(self): - encoding, confidence = self._detector.result - if isinstance(encoding, bytes): - encoding = encoding.decode() - return {"encoding": encoding, "confidence": confidence} +__all__ = ( + "detect", + "detect_with_confidence", + "DecodeResultDict", + "UniversalDetector", + "__version__" +) diff --git a/src/cchardet/_cchardet.pyi b/src/cchardet/_cchardet.pyi new file mode 100644 index 0000000..38ce5a9 --- /dev/null +++ b/src/cchardet/_cchardet.pyi @@ -0,0 +1,72 @@ +import sys +from typing import Union + +from .typedefs import DecodeResultDict + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + + +def detect_with_confidence(msg: bytes) -> Union[tuple[bytes, float], tuple[None, None]]: + """same as detect but it returns back a tuple[encoding, confidence] + + Args: + msg: A give string of bytes to detect + + Raises: + MemoryError: If Internal handle could not be allocated + """ + ... + +class UniversalDetector: + """Detects character encodings from an input stream""" + def __init__(self) -> None: + ... + + def reset(self) -> None: + """ + Resets the universal detector allow it to handle a + new stream of data + """ + + def feed(self, msg: bytes) -> None: + """ + feeds a stream of characters for detection. + + Args: + msg: a steam of data to pass through + + Raises: + + MemoryError: if memory to memory couldn't + be allocated during handling of the given data + """ + + def close(self) -> None: + """Closes handle and frees internal memory""" + + @property + def closed(self) -> bool: + """Determines if UniversalDetector was closed or not""" + + @property + def done(self) -> bool: + """ + Determines if character detection has finished + """ + + @property + def result(self) -> DecodeResultDict: + """ + The Result of the given stream, + values will be None if stream is not + considered done or otherwise + """ + + def __enter__(self) -> Self:... + def __exit__(self, *args) -> None:... + + + diff --git a/src/cchardet/_cchardet.pyx b/src/cchardet/_cchardet.pyx index 27d9f55..da7ba7b 100644 --- a/src/cchardet/_cchardet.pyx +++ b/src/cchardet/_cchardet.pyx @@ -1,44 +1,43 @@ # coding: utf-8 -#cython: embedsignature=True, c_string_encoding=ascii, language_level=3 +#cython: embedsignature=True, c_string_encoding=ascii, language_level=3, freethreading_compatible = True + +from libc.string cimport memcpy, strlen +from cpython.bool cimport PyBool_FromLong +from cpython.bytes cimport PyBytes_GET_SIZE, PyBytes_FromString +from cpython.bytearray cimport PyByteArray_Resize, PyByteArray_AS_STRING, PyByteArray_FromStringAndSize -cdef extern from *: - ctypedef char* const_char_ptr "const char*" - ctypedef unsigned long size_t # uchardet v0.0.8 cdef extern from "uchardet.h": ctypedef void* uchardet_t cdef uchardet_t uchardet_new() cdef void uchardet_delete(uchardet_t ud) - cdef int uchardet_handle_data(uchardet_t ud, const_char_ptr data, size_t length) + cdef int uchardet_handle_data(uchardet_t ud, const char* data, size_t length) cdef void uchardet_data_end(uchardet_t ud) cdef void uchardet_reset(uchardet_t ud) - cdef const_char_ptr uchardet_get_charset(uchardet_t ud) + cdef const char* uchardet_get_charset(uchardet_t ud) cdef float uchardet_get_confidence(uchardet_t ud, size_t i) - # cdef const_char_ptr uchardet_get_encoding(uchardet_t ud, size_t i) - # cdef const_char_ptr uchardet_get_language(uchardet_t ud, size_t i) + # cdef const char* uchardet_get_encoding(uchardet_t ud, size_t i) + # cdef const char* uchardet_get_language(uchardet_t ud, size_t i) def detect_with_confidence(bytes msg): - cdef size_t length = len(msg) + """same as detect but it returns back a tuple[encoding, confidence]""" + cdef size_t length = PyBytes_GET_SIZE(msg) cdef uchardet_t ud = uchardet_new() + if ud == NULL: + raise MemoryError - cdef int result = uchardet_handle_data(ud, msg, length) - if result == -1: + if uchardet_handle_data(ud, msg, length) == -1: uchardet_delete(ud) - raise Exception("Handle data error") + raise MemoryError uchardet_data_end(ud) - cdef bytes detected_charset - # cdef bytes detected_encoding - # cdef const_char_ptr detected_language - cdef float detected_confidence - - detected_charset = uchardet_get_charset(ud) - # detected_encoding = uchardet_get_encoding(ud, 0) - # detected_language = uchardet_get_language(ud, 0) - detected_confidence = uchardet_get_confidence(ud, 0) + cdef bytes detected_charset = PyBytes_FromString(uchardet_get_charset(ud)) + # cdef bytes detected_encoding = uchardet_get_encoding(ud, 0) + # cdef const char* detected_language = uchardet_get_language(ud, 0) + cdef float detected_confidence = uchardet_get_confidence(ud, 0) uchardet_reset(ud) uchardet_delete(ud) @@ -48,76 +47,125 @@ def detect_with_confidence(bytes msg): return None, None +cdef inline int set_to_bytearray(bytearray arr, const char* data) except -1: + cdef Py_ssize_t data_size = strlen(data) + cdef Py_ssize_t arr_size + if not data_size: + return 0 + + if PyByteArray_Resize(arr, data_size) < 0: + return -1 + + memcpy(PyByteArray_AS_STRING(arr), data, data_size) + return 0 + + cdef class UniversalDetector: - cdef uchardet_t _ud - cdef int _done - cdef int _closed - cdef bytes _detected_charset - # cdef bytes _detected_encoding - # cdef const_char_ptr _detected_language - cdef float _detected_confidence - - def __init__(self): + """Detects character encodings from an input stream""" + cdef: + uchardet_t _ud + bytearray _detected_charset + float _detected_confidence + bint _done + bint _closed + + def __cinit__(self): self._ud = uchardet_new() - self._done = 0 - self._closed = 0 - self._detected_charset = b"" + if self._ud == NULL: + raise MemoryError + + self._done = False + self._closed = False # self._detected_encoding = b"" # self._detected_language = b"" + + # NOTE: these are internal so bytearrays should be acceptable here + self._detected_charset = PyByteArray_FromStringAndSize(NULL, 0) self._detected_confidence = 0.0 + + # Aggressive check incase of abrupt closure + def __dealloc__(self): + if not self._closed: + self.close() def reset(self): if not self._closed: - self._done = 0 - self._closed = 0 - self._detected_charset = b"" - # self._detected_encoding = b"" - # self._detected_language = b"" + self._done = False + self._closed = False + # reset bytearray + PyByteArray_Resize(self._detected_charset, 0) self._detected_confidence = 0.0 uchardet_reset(self._ud) def feed(self, bytes msg): - cdef int length + cdef Py_ssize_t length cdef int result if self._closed: return - length = len(msg) + length = PyBytes_GET_SIZE(msg) if length > 0: - result = uchardet_handle_data(self._ud, msg, length) + result = uchardet_handle_data(self._ud, msg, length) if result == -1: - self._closed = 1 + self._closed = True uchardet_delete(self._ud) - raise Exception("Handle data error") + raise MemoryError + elif result == 0: - self._done = 1 + self._done = True - self._detected_charset = uchardet_get_charset(self._ud) + if set_to_bytearray(self._detected_charset, uchardet_get_charset(self._ud)) < 0: + # Throw the latest exception given from CPython + raise + # self._detected_encoding = uchardet_get_encoding(self._ud, 0) # self._detected_language = uchardet_get_language(self._ud, 0) self._detected_confidence = uchardet_get_confidence(self._ud, 0) - def close(self): + cpdef object close(self): if not self._closed: uchardet_data_end(self._ud) - self._detected_charset = uchardet_get_charset(self._ud) + if set_to_bytearray(self._detected_charset, uchardet_get_charset(self._ud)) < 0: + raise + # self._detected_encoding = uchardet_get_encoding(self._ud, 0) # self._detected_language = uchardet_get_language(self._ud, 0) self._detected_confidence = uchardet_get_confidence(self._ud, 0) uchardet_delete(self._ud) - self._closed = 1 + self._closed = True + + @property + def closed(self): + """Determines if UniversalDetector was closed or not""" + return PyBool_FromLong(self._closed) @property def done(self): - return bool(self._done) + """ + Determines if character detection is over returns True done, + false if otherwise + """ + return PyBool_FromLong(self._done) @property def result(self): - if len(self._detected_charset): - return self._detected_charset, self._detected_confidence + if PyBytes_GET_SIZE(self._detected_charset): + return { + "encoding": self._detected_charset.decode('utf-8', 'surrogateescape'), + "confidence": self._detected_confidence + } else: - return None, None + return { + "encoding": None, + "confidence": None + } + + def __enter__(self): + return self + + def __exit__(self, *args): + return self.close() diff --git a/src/cchardet/py.typed b/src/cchardet/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/cchardet/typedefs.py b/src/cchardet/typedefs.py new file mode 100644 index 0000000..4ef0787 --- /dev/null +++ b/src/cchardet/typedefs.py @@ -0,0 +1,8 @@ +from typing import Optional, TypedDict + + +class DecodeResultDict(TypedDict): + """typehints dictionary values of the given results""" + + encoding: Optional[str] + confidence: Optional[str]