From 141f61361ad8d17d9114e9baf9dd40d93f00878a Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Sat, 29 Mar 2025 04:31:52 +0000 Subject: [PATCH 1/4] feat: PyObject_CallMethodObjArgs --- a_sync/a_sync/_helpers.pyx | 6 +- a_sync/a_sync/modifiers/manager.pyx | 2 +- a_sync/async_property/proxy.pyx | 25 +++-- a_sync/asyncio/create_task.pyx | 10 +- a_sync/primitives/_debug.pyx | 9 +- a_sync/primitives/_loggable.pyx | 7 +- a_sync/primitives/locks/prio_semaphore.pyx | 117 +++++++++++++-------- a_sync/primitives/locks/semaphore.pyx | 7 +- 8 files changed, 116 insertions(+), 67 deletions(-) diff --git a/a_sync/a_sync/_helpers.pyx b/a_sync/a_sync/_helpers.pyx index 893dca86..6a03596e 100644 --- a/a_sync/a_sync/_helpers.pyx +++ b/a_sync/a_sync/_helpers.pyx @@ -6,6 +6,7 @@ and converting synchronous functions to asynchronous ones. from asyncio import iscoroutinefunction, new_event_loop, set_event_loop from asyncio import get_event_loop as _get_event_loop from asyncio.futures import _chain_future +from cpython.object cimport PyObject, PyObject_CallMethodObjArgs from a_sync import exceptions from a_sync._typing import * @@ -46,7 +47,7 @@ cdef object _await(object awaitable): - :func:`asyncio.run`: For running the main entry point of an asyncio program. """ try: - return get_event_loop().run_until_complete(awaitable) + return PyObject_CallMethodObjArgs(get_event_loop(), "run_until_complete", awaitable, NULL) except RuntimeError as e: if str(e) == "This event loop is already running": raise exceptions.SyncModeInAsyncContextError from None @@ -111,8 +112,7 @@ cdef object _asyncify(object func, executor: Executor): # type: ignore [misc] @wraps(func) async def _asyncify_wrap(*args: P.args, **kwargs: P.kwargs) -> T: - loop = get_event_loop() - fut = loop.create_future() + fut = PyObject_CallMethodObjArgs(get_event_loop(), "create_future", NULL) cf_fut = submit(func, *args, **kwargs) _chain_future(cf_fut, fut) return await fut diff --git a/a_sync/a_sync/modifiers/manager.pyx b/a_sync/a_sync/modifiers/manager.pyx index 4514f13b..f9c0f1f1 100644 --- a/a_sync/a_sync/modifiers/manager.pyx +++ b/a_sync/a_sync/modifiers/manager.pyx @@ -238,7 +238,7 @@ cdef class ModifierManager: >>> list(iter(manager)) ['cache_type'] """ - return self._modifiers.__iter__() + return iter(self._modifiers) def __len__(self) -> uint8_t: """Returns the number of modifiers. diff --git a/a_sync/async_property/proxy.pyx b/a_sync/async_property/proxy.pyx index 65fe8110..0fd73371 100644 --- a/a_sync/async_property/proxy.pyx +++ b/a_sync/async_property/proxy.pyx @@ -1,14 +1,21 @@ +from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs, PyObject_CallMethodObjArgs + + cdef class AwaitableOnly: """This wraps a coroutine will call it on await.""" - def __init__(self, coro): + def __cinit__(self, object coro): self._coro = coro def __repr__(self): return f'' def __await__(self): - return self._coro().__await__() + return PyObject_CallMethodObjArgs( + PyObject_CallFunctionObjArgs(self._coro, NULL), + "__await__", + NULL, + ) # PURE PYTHON @@ -422,7 +429,7 @@ class ObjectProxy(metaclass=_ObjectProxyMetaType): del self.__wrapped__[i:j] def __enter__(self): - return self.__wrapped__.__enter__() + return PyObject_CallMethodObjArgs(self.__wrapped__, "__enter__", NULL) def __exit__(self, *args, **kwargs): return self.__wrapped__.__exit__(*args, **kwargs) @@ -450,19 +457,23 @@ class ObjectProxy(metaclass=_ObjectProxyMetaType): class AwaitableProxy(ObjectProxy): def __await__(self): - return self.__get_wrapped().__await__() + return PyObject_CallMethodObjArgs( + PyObject_CallMethodObjArgs(self, "_AwaitableProxy__get_wrapped", NULL), + "__await__", + NULL, + ) async def __aenter__(self): - return await self.__wrapped__.__aenter__() + return await PyObject_CallMethodObjArgs(self.__wrapped__, "__aenter__", NULL) async def __aexit__(self, *args, **kwargs): return await self.__wrapped__.__aexit__(*args, **kwargs) async def __aiter__(self): - return await self.__wrapped__.__aiter__() + return await PyObject_CallMethodObjArgs(self.__wrapped__, "__aiter__", NULL) async def __anext__(self): - return await self.__wrapped__.__anext__() + return await PyObject_CallMethodObjArgs(self.__wrapped__, "__anext__", NULL) async def __get_wrapped(self): return self.__wrapped__ diff --git a/a_sync/asyncio/create_task.pyx b/a_sync/asyncio/create_task.pyx index 522e7f6b..3909079d 100644 --- a/a_sync/asyncio/create_task.pyx +++ b/a_sync/asyncio/create_task.pyx @@ -5,6 +5,7 @@ manage task lifecycle, and enhance error handling. import asyncio.tasks as aiotasks from asyncio import Future, InvalidStateError, Task, get_running_loop, iscoroutine +from cpython.object cimport PyObject, PyObject_CallMethodObjArgs from logging import getLogger from a_sync._smart import SmartTask, smart_task_factory @@ -180,7 +181,12 @@ cdef void __prune_persisted_tasks(): } if task._source_traceback: context["source_traceback"] = task._source_traceback - task._loop.call_exception_handler(context) + PyObject_CallMethodObjArgs( + task._loop, + "call_exception_handler", + context, + NULL, + ) cdef inline bint _is_done(fut: Future): @@ -200,7 +206,7 @@ cdef object _get_exception(fut: Future): fut._Future__log_traceback = False return fut._exception if state == "CANCELLED": - raise fut._make_cancelled_error() + raise PyObject_CallMethodObjArgs(fut, "_make_cancelled_error", NULL) raise InvalidStateError('Exception is not set.') diff --git a/a_sync/primitives/_debug.pyx b/a_sync/primitives/_debug.pyx index d48d6ebe..bc9d8152 100644 --- a/a_sync/primitives/_debug.pyx +++ b/a_sync/primitives/_debug.pyx @@ -7,6 +7,7 @@ The mixin provides a framework for managing a debug daemon task, which can be us import os from asyncio import AbstractEventLoop, Future, Task from asyncio.events import _running_loop +from cpython.object cimport PyObject_CallMethodObjArgs from threading import Lock from typing import Optional @@ -134,9 +135,9 @@ cdef class _DebugDaemonMixin(_LoopBoundMixin): cdef object _c_start_debug_daemon(self, tuple[object] args, dict[str, object] kwargs): cdef object loop = self._c_get_loop() - if self.check_debug_logs_enabled() and loop.is_running(): + if self.check_debug_logs_enabled() and PyObject_CallMethodObjArgs(loop, "is_running", NULL) is True: return ccreate_task_simple(self._debug_daemon(*args, **kwargs)) - return loop.create_future() + return PyObject_CallMethodObjArgs(loop, "create_future", NULL) def _ensure_debug_daemon(self, *args, **kwargs) -> None: """ @@ -173,7 +174,7 @@ cdef class _DebugDaemonMixin(_LoopBoundMixin): daemon.add_done_callback(self._stop_debug_daemon) self._daemon = daemon else: - self._daemon = self._c_get_loop().create_future() + self._daemon = PyObject_CallMethodObjArgs(self._c_get_loop(), "create_future", NULL) self._has_daemon = True @@ -202,6 +203,6 @@ cdef class _DebugDaemonMixin(_LoopBoundMixin): """ if t and t != self._daemon: raise ValueError(f"{t} is not {self._daemon}") - t.cancel() + PyObject_CallMethodObjArgs(t, "cancel", NULL) self._daemon = None self._has_daemon = False diff --git a/a_sync/primitives/_loggable.pyx b/a_sync/primitives/_loggable.pyx index 10578c84..efd18566 100644 --- a/a_sync/primitives/_loggable.pyx +++ b/a_sync/primitives/_loggable.pyx @@ -3,9 +3,12 @@ This module provides a mixin class to add debug logging capabilities to other classes. """ +from cpython.object cimport PyObject, PyObject_CallMethodObjArgs from logging import Logger, getLogger, DEBUG +cdef PyObject* _DEBUG = DEBUG + cdef class _LoggerMixin: """ A mixin class that adds logging capabilities to other classes. @@ -80,10 +83,10 @@ cdef class _LoggerMixin: See Also: - :attr:`logging.Logger.isEnabledFor` """ - return self.get_logger().isEnabledFor(DEBUG) + return PyObject_CallMethodObjArgs(self.get_logger(), "isEnabledFor", _DEBUG, NULL) cdef inline bint check_debug_logs_enabled(self): - return self.get_logger().isEnabledFor(DEBUG) + return PyObject_CallMethodObjArgs(self.get_logger(), "isEnabledFor", _DEBUG, NULL) cdef dict[str, object] _class_loggers = {} diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index 373edff3..f58bb76f 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -7,7 +7,9 @@ processed before lower priority ones. from asyncio import Future from collections import deque -from heapq import heappop, heappush +from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs, PyObject_CallMethodObjArgs +from heapq import heappop as _heappop +from heapq import heappush as _heappush from logging import DEBUG, getLogger from a_sync._typing import * @@ -16,7 +18,9 @@ from a_sync.primitives.locks.semaphore cimport Semaphore logger = getLogger(__name__) cdef object c_logger = logger - +cdef object heappush = _heappush +cdef object heappop = _heappop +cdef object cdeque = deque class Priority(Protocol): def __lt__(self, other) -> bool: ... @@ -135,19 +139,26 @@ cdef class _AbstractPrioritySemaphore(Semaphore): return self.c_getitem(priority) cdef object c_getitem(self, object priority): - if self._Semaphore__value is None: - raise ValueError(self._Semaphore__value) - cdef _AbstractPrioritySemaphoreContextManager context_manager + cdef dict[object, _AbstractPrioritySemaphoreContextManager] context_managers + context_managers = self._context_managers priority = self._top_priority if priority is None else priority - if priority not in self._context_managers: - context_manager = self._context_manager_class( - self, priority, name=self.name - ) - heappush(self._Semaphore__waiters, context_manager) # type: ignore [misc] - self._context_managers[priority] = context_manager - return self._context_managers[priority] + context_manager = context_managers.get(priority) + if context_manager is not None: + return context_manager + + context_manager = self._context_manager_class( + self, priority, name=self.name + ) + PyObject_CallFunctionObjArgs( + heappush, + self._Semaphore__waiters, + context_manager, + NULL + ) + context_managers[priority] = context_manager + return context_manager cpdef bint locked(self): """Checks if the semaphore is locked. @@ -209,14 +220,14 @@ cdef class _AbstractPrioritySemaphore(Semaphore): cdef Py_ssize_t start_len, end_len cdef bint woke_up - cdef bint debug_logs = c_logger.isEnabledFor(DEBUG) + cdef list potential_lost_waiters = self._potential_lost_waiters + cdef bint debug_logs = PyObject_CallFunctionObjArgs(_logger_is_enabled_for, _DEBUG, NULL) while self._Semaphore__waiters: manager = heappop(self._Semaphore__waiters) if len(manager) == 0: # There are no more waiters, get rid of the empty manager if debug_logs: - c_logger._log( - DEBUG, + log_debug( "manager %s has no more waiters, popping from %s", (manager._c_repr_no_parent_(), self), ) @@ -226,19 +237,20 @@ cdef class _AbstractPrioritySemaphore(Semaphore): woke_up = False start_len = len(manager) + manager_waiters = manager._Semaphore__waiters if debug_logs: - c_logger._log(DEBUG, "waking up next for %s", (manager._c_repr_no_parent_(), )) - if not manager._Semaphore__waiters: - c_logger._log(DEBUG, "not manager._Semaphore__waiters") + log_debug("waking up next for %s", (manager._c_repr_no_parent_(), )) + if not manager_waiters: + log_debug("not manager._Semaphore__waiters", ()) - while manager._Semaphore__waiters: - waiter = manager._Semaphore__waiters.popleft() - self._potential_lost_waiters.remove(waiter) + while manager_waiters: + waiter = PyObject_CallMethodObjArgs(manager_waiters, "popleft", NULL) + potential_lost_waiters.remove(waiter) if _is_not_done(waiter): - waiter.set_result(None) + PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) woke_up = True if debug_logs: - c_logger._log(DEBUG, "woke up %s", (waiter, )) + log_debug("woke up %s", (waiter, )) break if not woke_up: @@ -251,7 +263,12 @@ cdef class _AbstractPrioritySemaphore(Semaphore): if end_len: # There are still waiters, put the manager back - heappush(self._Semaphore__waiters, manager) # type: ignore [misc] + PyObject_CallFunctionObjArgs( + heappush, + self._Semaphore__waiters, + manager, + NULL, + ) else: # There are no more waiters, get rid of the empty manager self._context_managers.pop(manager._priority) @@ -259,22 +276,22 @@ cdef class _AbstractPrioritySemaphore(Semaphore): # emergency procedure (hopefully temporary): if not debug_logs: - while self._potential_lost_waiters: - waiter = self._potential_lost_waiters.pop(0) + while potential_lost_waiters: + waiter = potential_lost_waiters.pop(0) if _is_not_done(waiter): - waiter.set_result(None) + PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) return return - while self._potential_lost_waiters: - waiter = self._potential_lost_waiters.pop(0) - c_logger._log(DEBUG, "we found a lost waiter %s", (waiter, )) + while potential_lost_waiters: + waiter = potential_lost_waiters.pop(0) + log_debug("we found a lost waiter %s", (waiter, )) if _is_not_done(waiter): - waiter.set_result(None) - c_logger._log(DEBUG, "woke up lost waiter %s", (waiter, )) + PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) + log_debug("woke up lost waiter %s", (waiter, )) return - c_logger._log(DEBUG, "%s has no waiters to wake", (self, )) + log_debug("%s has no waiters to wake", (self, )) cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): @@ -361,26 +378,26 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): """ if self._parent._Semaphore__value <= 0: self._c_ensure_debug_daemon((),{}) - return self.__acquire() + return PyObject_CallMethodObjArgs(self, "_AbstractPrioritySemaphoreContextManager__acquire", NULL) cdef object c_acquire(self): if self._parent._Semaphore__value <= 0: self._c_ensure_debug_daemon((),{}) - return self.__acquire() + return PyObject_CallMethodObjArgs(self, "_AbstractPrioritySemaphoreContextManager__acquire", NULL) async def __acquire(self) -> Literal[True]: - cdef object loop, fut + cdef object fut while self._parent._Semaphore__value <= 0: if self._Semaphore__waiters is None: - self._Semaphore__waiters = deque() - fut = self._c_get_loop().create_future() + self._Semaphore__waiters = PyObject_CallFunctionObjArgs(cdeque, NULL) + fut = PyObject_CallMethodObjArgs(self._c_get_loop(), "create_future", NULL) self._Semaphore__waiters.append(fut) self._parent._potential_lost_waiters.append(fut) try: await fut except: # See the similar code in Queue.get. - fut.cancel() + PyObject_CallMethodObjArgs(fut, "cancel", NULL) if self._parent._Semaphore__value > 0 and _is_not_cancelled(fut): self._parent._wake_up_next() raise @@ -447,7 +464,7 @@ cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextMan raise TypeError(f"{other} is not type {self.__class__.__name__}") return self._priority < other._priority -cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type-var] +cdef class PrioritySemaphore(_AbstractPrioritySemaphore): """Semaphore that uses numeric priorities for waiters. This class extends :class:`_AbstractPrioritySemaphore` and provides a concrete implementation @@ -519,10 +536,20 @@ cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type- cdef dict[int, _PrioritySemaphoreContextManager] context_managers = self._context_managers if priority not in context_managers: context_manager = _PrioritySemaphoreContextManager(self, priority, name=self.name) - heappush( - self._Semaphore__waiters, - context_manager, - ) # type: ignore [misc] + PyObject_CallFunctionObjArgs( + heappush, + self._Semaphore__waiters, + context_manager, + NULL + ) context_managers[priority] = context_manager return context_manager - return context_managers[priority] \ No newline at end of file + return context_managers[priority] + + +cdef object _logger_log = c_logger._log +cdef object _logger_is_enabled_for = c_logger.isEnabledFor +cdef PyObject* _DEBUG = DEBUG + +cdef void log_debug(str message, tuple args): + PyObject_CallFunctionObjArgs(_logger_log, message, args) \ No newline at end of file diff --git a/a_sync/primitives/locks/semaphore.pyx b/a_sync/primitives/locks/semaphore.pyx index 39139eb2..5dbe7224 100644 --- a/a_sync/primitives/locks/semaphore.pyx +++ b/a_sync/primitives/locks/semaphore.pyx @@ -5,6 +5,7 @@ a dummy semaphore that does nothing, and a threadsafe semaphore for use in multi from asyncio import CancelledError, Future, iscoroutinefunction, sleep from collections import defaultdict, deque +from cpython.object cimport PyObject, PyObject_CallMethodObjArgs from libc.string cimport strcpy from libc.stdlib cimport malloc, free from threading import Thread, current_thread @@ -204,20 +205,20 @@ cdef class Semaphore(_DebugDaemonMixin): self._Semaphore__value -= 1 return _noop() - return self.__acquire() + return PyObject_CallMethodObjArgs(self, "_Semaphore__acquire", NULL) async def __acquire(self) -> Literal[True]: # Finally block should be called before the CancelledError # handling as we don't want CancelledError to call # _wake_up_first() and attempt to wake up itself. - cdef object fut = self._c_get_loop().create_future() + cdef object fut = PyObject_CallMethodObjArgs(self._c_get_loop(), "create_future", NULL) self._Semaphore__waiters.append(fut) try: try: await fut finally: - self._Semaphore__waiters.remove(fut) + PyObject_CallMethodObjArgs(self._Semaphore__waiters, "remove", fut, NULL) except CancelledError: if _is_not_cancelled(fut): self._Semaphore__value += 1 From f62c9fc3b4260f50787ab5ad47d61f19c1bc0a45 Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Sat, 29 Mar 2025 04:33:40 +0000 Subject: [PATCH 2/4] probably good, test --- a_sync/primitives/locks/prio_semaphore.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index f58bb76f..a341f550 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -223,7 +223,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): cdef list potential_lost_waiters = self._potential_lost_waiters cdef bint debug_logs = PyObject_CallFunctionObjArgs(_logger_is_enabled_for, _DEBUG, NULL) while self._Semaphore__waiters: - manager = heappop(self._Semaphore__waiters) + manager = PyObject_CallFunctionObjArgs(heappop, self._Semaphore__waiters, NULL) if len(manager) == 0: # There are no more waiters, get rid of the empty manager if debug_logs: @@ -552,4 +552,4 @@ cdef object _logger_is_enabled_for = c_logger.isEnabledFor cdef PyObject* _DEBUG = DEBUG cdef void log_debug(str message, tuple args): - PyObject_CallFunctionObjArgs(_logger_log, message, args) \ No newline at end of file + PyObject_CallFunctionObjArgs(_logger_log, _DEBUG, message, args) \ No newline at end of file From 79fd256f810fa33ac76d9ccd0a39085bcbde9025 Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Sat, 29 Mar 2025 04:53:06 +0000 Subject: [PATCH 3/4] feat: PyObject_CallMethodObjArgs --- a_sync/primitives/locks/prio_semaphore.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index a341f550..f74ad73c 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -220,10 +220,12 @@ cdef class _AbstractPrioritySemaphore(Semaphore): cdef Py_ssize_t start_len, end_len cdef bint woke_up + cdef list self_waiters = self._Semaphore__waiters + cdef PyObject* waiters_pointer = self_waiters cdef list potential_lost_waiters = self._potential_lost_waiters cdef bint debug_logs = PyObject_CallFunctionObjArgs(_logger_is_enabled_for, _DEBUG, NULL) - while self._Semaphore__waiters: - manager = PyObject_CallFunctionObjArgs(heappop, self._Semaphore__waiters, NULL) + while self_waiters: + manager = PyObject_CallFunctionObjArgs(heappop, waiters_pointer, NULL) if len(manager) == 0: # There are no more waiters, get rid of the empty manager if debug_logs: From 7c4a0fd516afc5fd1e3368c50cbaa3307be7a85c Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Sat, 29 Mar 2025 05:44:35 +0000 Subject: [PATCH 4/4] fix: Future.set_result() takes exactly one argument (0 given) --- a_sync/primitives/locks/prio_semaphore.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index f74ad73c..573ef468 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -249,7 +249,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): waiter = PyObject_CallMethodObjArgs(manager_waiters, "popleft", NULL) potential_lost_waiters.remove(waiter) if _is_not_done(waiter): - PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) + PyObject_CallMethodObjArgs(waiter, "set_result", None, NULL) woke_up = True if debug_logs: log_debug("woke up %s", (waiter, )) @@ -281,7 +281,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): while potential_lost_waiters: waiter = potential_lost_waiters.pop(0) if _is_not_done(waiter): - PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) + PyObject_CallMethodObjArgs(waiter, "set_result", None, NULL) return return @@ -289,7 +289,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): waiter = potential_lost_waiters.pop(0) log_debug("we found a lost waiter %s", (waiter, )) if _is_not_done(waiter): - PyObject_CallMethodObjArgs(waiter, "set_result", NULL, NULL) + PyObject_CallMethodObjArgs(waiter, "set_result", None, NULL) log_debug("woke up lost waiter %s", (waiter, )) return