Skip to content

Conversation

@wangxiaoteng888
Copy link
Contributor

@wangxiaoteng888 wangxiaoteng888 commented Oct 29, 2025

What this PR does / why we need it?

Refactored the layerwise code to send to the D node first, preventing P-node hangs due to communication timeouts when DP > 1.

Does this PR introduce any user-facing change?

No

How was this patch tested?

By ci

@github-actions
Copy link

👋 Hi! Thank you for contributing to the vLLM Ascend project. The following points will speed up your PR merge:‌‌

  • A PR should do only one thing, smaller PRs enable faster reviews.
  • Every PR should include unit tests and end-to-end tests ‌to ensure it works and is not broken by other future PRs.
  • Write the commit message by fulfilling the PR description to help reviewer and future developers understand.

If CI fails, you can run linting and testing checks locally according Contributing and Testing.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the mooncake_layerwise_connector.py to implement a "D first plan", where the decoder (consumer) initiates the KV cache transfer by having the producer (prefiller) query it for metadata. This is a significant logic change that simplifies some parts of the code by removing the old handshake mechanism and task tracking. The changes are extensive and involve refactoring several classes and methods. My review identifies a few critical and high-severity issues that should be addressed before merging, including a potential NameError that could crash the worker, an invalid type hint, a redundant attribute initialization, and leftover debugging print statements.

Comment on lines 474 to 475
self._reqs_need_recv: dict[str, list[int], tuple[Request,
list[int]]] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The type hint for _reqs_need_recv is syntactically incorrect. A dict type hint takes two arguments: dict[KeyType, ValueType]. Based on its usage, the value is a tuple. The current type hint will cause issues with static type checkers.

        self._reqs_need_recv: dict[str, tuple["Request", list[int], list[int]]] = {}

Comment on lines 995 to 1011
try:
# path = make_zmq_path("tcp", req_meta_update.remote_host, req_meta_update.remote_port)
# msg_encoder = msgspec.msgpack.Encoder()
encoded_data = self.encoder.encode((GET_META_MSG, req_id))
# with zmq_ctx(zmq.REQ, path) as sock: # type: ignore
sock = self._get_remote_socket(req_meta_update.remote_host,
req_meta_update.remote_port)
ensure_zmq_send(sock, encoded_data)
metadata_bytes = ensure_zmq_recv(sock, self.remote_poller)
agent_meta = self.decoder.decode(metadata_bytes)
# ack = sock.recv()
# if ack != b"ACK":
# raise ValueError(f"Unexpected ACK response: {ack}")
except Exception as e:
logger.error(
f"Query te port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} fail with error: {e}"
)
assert req_meta_update.remote_engine_id != self.engine_id, (
f"Conflict engine id {req_meta_update.remote_engine_id} with local engine id "
f"{self.local_engine_id}.")
self.remote_kv_caches_base_addr[req_meta_update.remote_engine_id][
req_meta_update.remote_port] = agent_meta.kv_caches_base_addr
self.remote_te_port[req_meta_update.remote_engine_id][
req_meta_update.remote_port] = agent_meta.te_rpc_port
logger.info(
f"Query te port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} success {agent_meta.kv_caches_base_addr=} {agent_meta.te_rpc_port=}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

In update_decoder_info, if an exception occurs during the ZMQ communication within the try block, agent_meta will not be defined. However, the code continues execution after the except block and attempts to access agent_meta on line 1016, which will raise a NameError and crash the worker. The logic that depends on agent_meta should be moved inside the try block, or the except block should re-raise the exception to prevent this crash.

            try:
                # path = make_zmq_path("tcp", req_meta_update.remote_host, req_meta_update.remote_port)
                # msg_encoder = msgspec.msgpack.Encoder()
                encoded_data = self.encoder.encode((GET_META_MSG, req_id))
                # with zmq_ctx(zmq.REQ, path) as sock:  # type: ignore
                sock = self._get_remote_socket(req_meta_update.remote_host,
                                               req_meta_update.remote_port)
                ensure_zmq_send(sock, encoded_data)
                metadata_bytes = ensure_zmq_recv(sock, self.remote_poller)
                agent_meta = self.decoder.decode(metadata_bytes)
                # ack = sock.recv()
                # if ack != b"ACK":
                #     raise ValueError(f"Unexpected ACK response: {ack}")
                assert req_meta_update.remote_engine_id != self.engine_id, (
                    f"Conflict engine id {req_meta_update.remote_engine_id} with local engine id "
                    f"{self.engine_id}.")
                self.remote_kv_caches_base_addr[req_meta_update.remote_engine_id][
                    req_meta_update.remote_port] = agent_meta.kv_caches_base_addr
                self.remote_te_port[req_meta_update.remote_engine_id][
                    req_meta_update.remote_port] = agent_meta.te_rpc_port
                logger.info(
                    f"Query te port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} success {agent_meta.kv_caches_base_addr=} {agent_meta.te_rpc_port=}"
                )
            except Exception as e:
                logger.error(
                    f"Query te port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} fail with error: {e}"
                )
                raise

self.k_buffer = torch.zeros(first_kv_cache.numel() + alignment,
dtype=first_kv_cache.dtype,
device=first_kv_cache.device)
print(f"before {self.k_buffer=}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are several print() statements in the code, likely left over from debugging (e.g., lines 95, 99, 112, 132, 176). These should be removed or replaced with appropriate logging calls (e.g., logger.debug()) before merging.

self.remote_sockets: dict[ # type: ignore
str, deque[zmq.Socket]] = defaultdict( # type: ignore
deque)
self.remote_poller = zmq.Poller() # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The attribute self.remote_poller is initialized twice in the __init__ method. The second initialization on this line overwrites the one on line 695. This is redundant and should be removed to avoid confusion.

@github-actions
Copy link

This pull request has conflicts, please resolve those before we can evaluate the pull request.

@wangxiaoteng888 wangxiaoteng888 force-pushed the new_layerwise branch 3 times, most recently from 3c39356 to 18f166a Compare October 30, 2025 03:05
wangxiaoteng888 and others added 2 commits October 30, 2025 11:19
Signed-off-by: wangxiaoteng <[email protected]>
@wangxiyuan wangxiyuan merged commit 2c291bc into vllm-project:main Oct 30, 2025
25 checks passed
@wangxiaoteng888 wangxiaoteng888 changed the title [bugfix] layerwise D first plan [bugfix][P/D] layerwise D first plan Nov 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants