Skip to content

Conversation

lla-dane
Copy link
Contributor

@lla-dane lla-dane commented Jul 10, 2025

Fixed a TODO: Implement throttle on async validators in libp2p/pubsub/pubsub.py::validate_msg().

Used semaphores to limit concurrency while running concurrent async_validators:

semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS)

async def run_async_validator(func: AsyncValidatorFn) -> None:
    async with semaphore:
        result = await func(msg_forwarder, msg)
        results.append(result)

async with trio.open_nursery() as nursery:
    for async_validator in async_topic_validators:
        nursery.start_soon(run_async_validator, async_validator)

@lla-dane
Copy link
Contributor Author

lla-dane commented Jul 10, 2025

Will add the tests for this and make the concurrency limit configurable. By the mean time, please do check if the solution is correct @seetadev @pacrob

@lla-dane
Copy link
Contributor Author

Added the tests to check that the concurrency limit is respected. Please check i f everything is alright, and the default concurrency limit is correct. @pacrob @seetadev

@paschal533
Copy link
Contributor

Hi @lla-dane, This looks like a solid fix for the async validator throttling TODO, but there's a critical issue where the default parameter limit: trio.Semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) creates a new semaphore on every call instead of sharing one globally, you should move the semaphore to be an instance variable in the __init__ method like self._validator_semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) and use that directly in the run_async_validator function. Also consider making the concurrency limit configurable via a constructor parameter, and the test could be simplified to focus more on the actual semaphore behavior rather than mocking the entire validate_msg method. Overall it's a good implementation that properly addresses resource exhaustion concerns, just needs that parameter fix to actually work correctly.

@lla-dane
Copy link
Contributor Author

Hey @paschal533

Hi @lla-dane, This looks like a solid fix for the async validator throttling TODO, but there's a critical issue where the default parameter limit: trio.Semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) creates a new semaphore on every call instead of sharing one globally, you should move the semaphore to be an instance variable in the init method like self._validator_semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) and use that directly in the run_async_validator function. Also consider making the concurrency limit configurable via a constructor parameter.

Did this, added a self._validator_semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) in the PubSub constructor, and is also configurable via the constructor parameter.

the test could be simplified to focus more on the actual semaphore behavior rather than mocking the entire validate_msg method.

For this, added the concurrency checker part in the original validate_msg test. so now the throttle and validate_msg both are getting tested in the same test.

Does this work @paschal533 ?

@lla-dane lla-dane force-pushed the todo/throttle-async-val branch from 79620cf to 0731202 Compare July 12, 2025 02:56
@seetadev
Copy link
Contributor

@lla-dane : Hi Abhinav. Thank you for submitting the PR. Appreciate your great efforts and initiative.

Wish to ask whether you got a chance to review Varun's efforts at #647 and #710 (reference issue: #709 )

@pacrob
Copy link
Member

pacrob commented Jul 12, 2025

@lla-dane - Looking good! I'm concerned about copying so much code directly from Pubsub though. What do you think about extracting run_async_validator out to a class method _run_async_validator and then just mocking that? You'd need to adjust arguments, but way less copied code in test.

@paschal533
Copy link
Contributor

paschal533 commented Jul 13, 2025

Hi @lla-dane, This looks like a solid fix for the async validator throttling TODO, but there's a critical issue where the default parameter limit: trio.Semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) creates a new semaphore on every call instead of sharing one globally, you should move the semaphore to be an instance variable in the init method like self._validator_semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) and use that directly in the run_async_validator function. Also consider making the concurrency limit configurable via a constructor parameter.

Did this, added a self._validator_semaphore = trio.Semaphore(MAX_CONCURRENT_VALIDATORS) in the PubSub constructor, and is also configurable via the constructor parameter.

the test could be simplified to focus more on the actual semaphore behavior rather than mocking the entire validate_msg method.

For this, added the concurrency checker part in the original validate_msg test. so now the throttle and validate_msg both are getting tested in the same test.

Does this work @paschal533 ?

This is a great approach. this definitely fixes the main issue, and integrating the concurrency checking into the existing validate_msg test is actually a cleaner solution than having separate tests. The combined test approach makes sense since you're testing that the throttling works correctly within the actual validation flow rather than in isolation. This should work well now. The semaphore will now properly limit concurrency across all calls to validate_msg on the same Pubsub instance, which is exactly what we want for resource management. Great job and Well done @lla-dane 👏

Comment on lines +715 to +724
async def _run_async_validator(
self,
func: AsyncValidatorFn,
msg_forwarder: ID,
msg: rpc_pb2.Message,
results: list[bool],
) -> None:
async with self._validator_semaphore:
result = await func(msg_forwarder, msg)
results.append(result)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pacrob: separated the run_async_validator method to a separate class method as you suggested. Now there is only this much duplicated code from the pubsub.py in the tests:

async def mock_run_async_validator(
        self,
        func: AsyncValidatorFn,
        msg_forwarder: ID,
        msg: rpc_pb2.Message,
        results: list[bool],
    ) -> None:
        async with self._validator_semaphore:
            async with lock:
                state["concurrency_counter"] += 1
                if state["concurrency_counter"] > state["max_observed"]:
                    state["max_observed"] = state["concurrency_counter"]

            try:
                result = await func(msg_forwarder, msg)
                results.append(result)
            finally:
                async with lock:
                    state["concurrency_counter"] -= 1

@lla-dane
Copy link
Contributor Author

@lla-dane - Looking good! I'm concerned about copying so much code directly from Pubsub though. What do you think about extracting run_async_validator out to a class method _run_async_validator and then just mocking that? You'd need to adjust arguments, but way less copied code in test.

@pacrob : Did a few changes as you suggested here. Please see if everything is alright.

Comment on lines 697 to 700
async def run_async_validator(func: AsyncValidatorFn) -> None:
result = await func(msg_forwarder, msg)
results.append(result)
async with self._validator_semaphore:
result = await func(msg_forwarder, msg)
results.append(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you extracted the logic out, this code can be deleted, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, forgot to remove this. Now removed!!

@lla-dane lla-dane force-pushed the todo/throttle-async-val branch from e3bc7bb to 3ef5fe8 Compare July 18, 2025 04:55
@pacrob pacrob merged commit 11560f5 into libp2p:main Jul 18, 2025
28 checks passed
@lla-dane lla-dane deleted the todo/throttle-async-val branch September 1, 2025 11:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants