- 
                Notifications
    You must be signed in to change notification settings 
- Fork 530
[bugfix_v0.11.0-dev] layerwise D first plan #3907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bugfix_v0.11.0-dev] layerwise D first plan #3907
Conversation
Signed-off-by: nwpu-zxr <[email protected]>
Signed-off-by: liziyu <[email protected]>
| 👋 Hi! Thank you for contributing to the vLLM Ascend project. The following points will speed up your PR merge: 
 If CI fails, you can run linting and testing checks locally according Contributing and Testing. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change to the layer-wise execution flow, implementing a "Decoder First" strategy. The changes are extensive, refactoring the proxy server example, the mooncake layer-wise connector, and its corresponding test suite. My review focuses on ensuring the robustness and correctness of this new implementation. I've identified several critical areas concerning error handling and potential performance bottlenecks that could impact the stability and efficiency of the system. Specifically, there are cases where exceptions could lead to hanging services or crashing worker threads. Additionally, synchronous network operations within the critical forward pass path could introduce latency. Addressing these points will improve the resilience of the new architecture.
| if ret < 0: | ||
| logger.error("Mooncake transfer failed for request %s", | ||
| req_meta.req_id) | ||
| logger.error("Mooncake transfer failed for request %s", req_id) | ||
| raise RuntimeError(f"Mooncake transfer failed, ret: {ret}") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a mooncake transfer fails, a RuntimeError is raised. Since this happens within the _handle_request method called from the thread's main loop, this unhandled exception will terminate the KVCacheSendingLayerThread. As this thread is a daemon, the main process won't crash, but this worker will lose the capability to send any subsequent KV caches, effectively breaking it for layer-wise decoding. Instead of raising an exception that kills the thread, consider implementing a more resilient error handling strategy, such as logging the error and marking the specific request as failed, without stopping the thread.
| if ret < 0: | |
| logger.error("Mooncake transfer failed for request %s", | |
| req_meta.req_id) | |
| logger.error("Mooncake transfer failed for request %s", req_id) | |
| raise RuntimeError(f"Mooncake transfer failed, ret: {ret}") | |
| if ret < 0: | |
| logger.error("Mooncake transfer failed for request %s with ret: %d", req_id, ret) | |
| # Don't raise an exception that would kill the thread. | |
| # The failure of one request should not stop the worker from processing others. | |
| return | 
| except Exception as e: | ||
| logger.error( | ||
| f"Query to port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} fail with error: {e}" | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except block catches any exception during the ZMQ communication but only logs it. It does not re-raise the exception or handle the failure path. If an exception occurs, agent_meta will not be defined, and the code will crash with an UnboundLocalError on line 1005. This will bring down the worker thread. The exception should be re-raised to ensure the failure is handled properly.
            except Exception as e:
                logger.error(
                    f"Query to port and kv base addr for request {req_id} from {req_meta_update.remote_host}:{req_meta_update.remote_port} fail with error: {e}"
                )
                raise| kv_transfer_params = await request.json() | ||
|  | ||
| request_id = kv_transfer_params["request_id"] | ||
| assert request_id in proxy_state.req_data_dict | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assert for input validation in an API endpoint is not robust. If the assertion fails (e.g., due to a race condition or a bug where req_data_dict is cleared prematurely), it will raise an AssertionError and cause a 500 Internal Server Error, which is not a clean API response. It's better to handle this case gracefully by returning a specific HTTP error, like a 404 Not Found, to provide a clearer error to the client. You will need to add JSONResponse to the imports from fastapi.responses.
| assert request_id in proxy_state.req_data_dict | |
| if request_id not in proxy_state.req_data_dict: | |
| logger.error(f"Request ID {request_id} not found in req_data_dict.") | |
| return JSONResponse(status_code=404, content={"error": f"Request ID {request_id} not found"}) | 
| except Exception as e: | ||
| logger.error(f"Post metaserver failed with: {str(e)}") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broad except Exception as e: block only logs the error. Since it doesn't return any response, the client (which is another service in this architecture) will hang until it times out. This can lead to resource exhaustion and cascading failures. It's better to return an appropriate HTTP error response (e.g., 500 Internal Server Error) to the client. You will need to add JSONResponse to the imports from fastapi.responses.
| except Exception as e: | |
| logger.error(f"Post metaserver failed with: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Post metaserver failed with: {str(e)}") | |
| return JSONResponse(status_code=500, content={"error": "Internal server error in metaserver"}) | 
| msg = decoder.decode(payload[0]) | ||
| if msg[0] == GET_META_MSG: | ||
| logger.info("Got GET META INFO for request %s", msg[0]) | ||
| sock.send_multipart((identity, b"", encoded_data)) | ||
| elif msg[0] == DONE_SENDING_MSG: | ||
| logger.debug("Got DONE_RECVING_MSG for request %s", | ||
| msg[1]) | ||
| request_id = msg[1] | ||
| self.update_task(request_id) | ||
| sock.send_multipart((identity, b"", b"ACK")) | ||
| else: | ||
| logger.error( | ||
| "Connection listener got unexpected message %s", | ||
| msg) | ||
| except Exception as e: | ||
| logger.error("Failed to decode message: %s", e) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an unexpected or undecodable message is received, the error is logged, but no response is sent back to the client. The client, which sent the message using a zmq.ROUTER socket, will be left waiting for a reply and will eventually time out. This can lead to resource leaks and hanging clients. It's important to always send a reply, even if it's an error message, to unblock the client.
| msg = decoder.decode(payload[0]) | |
| if msg[0] == GET_META_MSG: | |
| logger.info("Got GET META INFO for request %s", msg[0]) | |
| sock.send_multipart((identity, b"", encoded_data)) | |
| elif msg[0] == DONE_SENDING_MSG: | |
| logger.debug("Got DONE_RECVING_MSG for request %s", | |
| msg[1]) | |
| request_id = msg[1] | |
| self.update_task(request_id) | |
| sock.send_multipart((identity, b"", b"ACK")) | |
| else: | |
| logger.error( | |
| "Connection listener got unexpected message %s", | |
| msg) | |
| except Exception as e: | |
| logger.error("Failed to decode message: %s", e) | |
| msg = decoder.decode(payload[0]) | |
| if msg[0] == GET_META_MSG: | |
| logger.info("Got GET META INFO for request %s", msg[0]) | |
| sock.send_multipart((identity, b"", encoded_data)) | |
| elif msg[0] == DONE_SENDING_MSG: | |
| logger.debug("Got DONE_RECVING_MSG for request %s", | |
| msg[1]) | |
| request_id = msg[1] | |
| self.update_task(request_id) | |
| sock.send_multipart((identity, b"", b"ACK")) | |
| else: | |
| logger.error( | |
| "Connection listener got unexpected message %s", | |
| msg) | |
| sock.send_multipart((identity, b"", b"NACK_UNEXPECTED_MSG")) | |
| except Exception as e: | |
| logger.error("Failed to decode message: %s", e) | |
| if 'identity' in locals(): | |
| sock.send_multipart((identity, b"", b"NACK_DECODE_ERROR")) | 
| layer_index=self.current_layer, | ||
| key=key, | ||
| value=value) | ||
| req_meta_update = self.update_decoder_info(req_id, req_meta) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update_decoder_info method is called from within save_kv_layer, which is on the critical path of the model's forward pass. This method can perform synchronous network I/O to fetch metadata if it's not already cached. This can introduce significant latency and impact performance. Consider moving this metadata fetching logic to an earlier stage, outside of the forward pass, for instance, when the request is first received or scheduled.
| except Exception as e: | ||
| logger.error( | ||
| f"Sending done sending signal for request {req_id} to {req_meta.remote_host}:{req_meta.remote_port} fail with error: {e}" | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except block catches any exception during the ZMQ communication but only logs it. If sending the DONE_SENDING_MSG fails, the remote (decoder) side will never receive this completion signal and may hang waiting for it. This can lead to stuck requests and resource leaks. The exception should be handled more robustly, for example by re-raising it to make the failure visible.
        except Exception as e:
            logger.error(
                f"Sending done sending signal for request {req_id} to {req_meta.remote_host}:{req_meta.remote_port} fail with error: {e}"
            )
            raisef5984c3    to
    d81339d      
    Compare
  
    Signed-off-by: wangxiaoteng <[email protected]>
d81339d    to
    07251ed      
    Compare
  
    
What this PR does / why we need it?
Refactored the layerwise code to send to the D node first, preventing P-node hangs due to communication timeouts when DP > 1.
Does this PR introduce any user-facing change?
No
How was this patch tested?
By ci