Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions a_sync/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async def __aiter__(self, pop: bool = False) -> AsyncIterator[Tuple[K, V]]:
else:
while not self._init_loader.done():
await self._wait_for_next_key()
while unyielded := tuple(key for key in self if key not in yielded):
while unyielded := tuple(filterfalse(yielded.__contains__, self)):
if ready := tuple(key for key in unyielded if self[key].done()):
if pop:
self_pop = self.pop
Expand All @@ -259,7 +259,7 @@ async def __aiter__(self, pop: bool = False) -> AsyncIterator[Tuple[K, V]]:
else:
await self._next.wait()
# loader is already done by this point, but we need to check for exceptions
await self._init_loader
self._init_loader.result()
# if there are any tasks that still need to complete, we need to await them and yield them
if unyielded := {key: self[key] for key in self if key not in yielded}:
if pop:
Expand Down Expand Up @@ -575,13 +575,18 @@ async def _tasks_for_iterables(
for iterable in iterables
if not isinstance(iterable, AsyncIterable) and isinstance(iterable, Iterable)
)
for iterable in containers:
async for key in _yield_keys(iterable):

i = 0
for container in containers:
for key in container:
yield key, self[key]
i += 1
if not i % 5_000:
await yield_to_loop()

if remaining := tuple(iterable for iterable in iterables if iterable not in containers):
if remaining := tuple(filterfalse(containers.__contains__, iterables)):
try:
async for key in as_yielded(*(_yield_keys(iterable) for iterable in remaining)): # type: ignore [attr-defined]
async for key in as_yielded(*map(_yield_keys, remaining)):
yield key, self[key] # ensure task is running
except _EmptySequenceError:
if len(iterables) == 1:
Expand All @@ -599,13 +604,18 @@ async def _start_tasks_for_iterables(
for iterable in iterables
if not isinstance(iterable, AsyncIterable) and isinstance(iterable, Iterable)
)
for iterable in containers:
async for key in _yield_keys(iterable):

i = 0
for container in containers:
for key in container:
yield key, self.__start_task(key)
i += 1
if not i % 5_000:
await yield_to_loop()

if remaining := tuple(iterable for iterable in iterables if iterable not in containers):
if remaining := tuple(filterfalse(containers.__contains__, iterables)):
try:
async for key in as_yielded(*(_yield_keys(iterable) for iterable in remaining)): # type: ignore [attr-defined]
async for key in as_yielded(*map(_yield_keys, remaining)):
yield key, self.__start_task(key)
except _EmptySequenceError:
if len(iterables) == 1:
Expand Down