Skip to content

Commit 7324b96

Browse files
ikaadilYuhanLiu11FabhiahnShaoting-Fengzerofishnoodles
authored
[Router] Replace httpx with aiohttp in vllm_router for enhanced high-concurrency performance (#589)
* feat: update requirements to include aiohttp Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * feat: replace HTTPX with aiohttp for asynchronous client handling Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * feat: replace HTTPXClientWrapper with AiohttpClientWrapper for improved async handling Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * feat: replace HTTPXClientWrapper with AiohttpClientWrapper in lifespan management Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * feat: remove httpx from requirements for aiohttp integration Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * refactor: switch from httpx to aiohttp for asynchronous client handling in service discovery Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * feat: add AiohttpClientWrapper for asynchronous HTTP client handling Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * chore: pin aiohttp version to 3.9.5 in requirements Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * refactor: migrate from httpx to aiohttp for asynchronous HTTP requests in request service Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * refactor: update file upload implementation to use aiohttp for asynchronous requests Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * chore: replace httpx with aiohttp in project dependencies and test requirements Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * chore: format dependencies in pyproject.toml for consistency Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * chore: clean up imports and ensure consistent aiohttp version in requirements Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * fix: update aiohttp client closure method for consistency and accuracy in logging Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * fix: correct status code handling in request processing for improved response accuracy Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * bumping version (#593) Signed-off-by: YuhanLiu11 <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Added option to specify priority class (#557) Signed-off-by: Fabijan Marič Vild <[email protected]> Co-authored-by: Yuhan Liu <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * [CI/Build] Change CI runner to L4 (#595) * Add env clean up before run Signed-off-by: Shaoting Feng <[email protected]> * Delete all pods Signed-off-by: Shaoting Feng <[email protected]> * Delete all release Signed-off-by: Shaoting Feng <[email protected]> * test ci runner Signed-off-by: Shaoting <[email protected]> * test ci runner Signed-off-by: Shaoting <[email protected]> * Test again Signed-off-by: Shaoting Feng <[email protected]> * Test again Signed-off-by: Shaoting Feng <[email protected]> * Change python version from 3.12 to 3.10 Signed-off-by: Shaoting Feng <[email protected]> * delete get helm Signed-off-by: Shaoting Feng <[email protected]> * change back python version Signed-off-by: Shaoting Feng <[email protected]> * trigger Signed-off-by: Shaoting Feng <[email protected]> * Add cache: pip Signed-off-by: Shaoting Feng <[email protected]> * Try check-latest: true Signed-off-by: Shaoting Feng <[email protected]> * Changing to 3.10 Signed-off-by: Shaoting Feng <[email protected]> * change to 3.9 Signed-off-by: Shaoting Feng <[email protected]> * Specify 3.9.2 Signed-off-by: Shaoting Feng <[email protected]> * Use conda python Signed-off-by: Shaoting Feng <[email protected]> * Fix vllm without conda Signed-off-by: Shaoting Feng <[email protected]> --------- Signed-off-by: Shaoting Feng <[email protected]> Signed-off-by: Shaoting <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * [Bugfix] fix dynamic config (#598) * [hotfix] fix dynamic config Signed-off-by: Rui Zhang <[email protected]> * [Bugfix] Add missing field for reconfiguration Signed-off-by: Rui Zhang <[email protected]> --------- Signed-off-by: Rui Zhang <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * [refactor] redesign RST documentation (#592) * initial reorg Signed-off-by: Kobe Chen <[email protected]> * pass CICD Signed-off-by: Kobe Chen <[email protected]> * fix Signed-off-by: Kobe Chen <[email protected]> * fix Signed-off-by: Kobe Chen <[email protected]> * fix: update helm.rst Signed-off-by: Kobe Chen <[email protected]> * fix Signed-off-by: Kobe Chen <[email protected]> * fix Signed-off-by: Kobe Chen <[email protected]> * Sharing KV Cache Across Instances Signed-off-by: Kobe Chen <[email protected]> * docker Signed-off-by: Kobe Chen <[email protected]> * community meeting Signed-off-by: Kobe Chen <[email protected]> * add deployment 3 options Signed-off-by: Kobe Chen <[email protected]> * remove A: Signed-off-by: Kobe Chen <[email protected]> * docker Signed-off-by: Kobe Chen <[email protected]> --------- Signed-off-by: Kobe Chen <[email protected]> Co-authored-by: Yuhan Liu <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * [Misc] revert uv.lock (#604) Signed-off-by: Kobe Chen <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * fix: update response content iteration method for improved performance Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Replace httpx with aiohttp for prefill and decode clients in K8sServiceNameServiceDiscovery, updating timeout handling accordingly. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Update file upload function to include file parameter in aiohttp post request Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Refactor response handling in request service to return JSON data and correctly set response status code in sleep/wakeup routing. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Add initialization of aiohttp ClientSession in service discovery during app startup - Implemented method in , , and classes to set up aiohttp ClientSession for prefill and decode endpoints. - Updated function in to call if available, ensuring proper session management during application startup. - Adjusted logging statements in to reference the attribute of the client sessions for accurate logging of routing requests. Signed-off-by: Ifta Khairul Alam Adil <[email protected]> * Refactor whitespace and formatting in app.py for improved readability Signed-off-by: Ifta Khairul Alam Adil <[email protected]> --------- Signed-off-by: Ifta khairul Alam Adil <[email protected]> Signed-off-by: Ifta Khairul Alam Adil <[email protected]> Signed-off-by: YuhanLiu11 <[email protected]> Signed-off-by: Fabijan Marič Vild <[email protected]> Signed-off-by: Shaoting Feng <[email protected]> Signed-off-by: Shaoting <[email protected]> Signed-off-by: Rui Zhang <[email protected]> Signed-off-by: Kobe Chen <[email protected]> Co-authored-by: Yuhan Liu <[email protected]> Co-authored-by: Fabhiahn <[email protected]> Co-authored-by: Shaoting <[email protected]> Co-authored-by: Rui Zhang <[email protected]> Co-authored-by: Kobe Chen <[email protected]>
1 parent a547d6a commit 7324b96

File tree

9 files changed

+134
-102
lines changed

9 files changed

+134
-102
lines changed

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def add_line(self, line: str, source: str, *lineno: int) -> None:
9595
"prometheus_client",
9696
"uhashring",
9797
"lmcache",
98-
"httpx",
98+
"aiohttp",
9999
"transformers",
100100
"os",
101101
]

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ dependencies = [
1515
"aiofiles==24.1.0",
1616
"black>=25.1.0",
1717
"fastapi==0.115.8",
18-
"httpx==0.28.1",
18+
"aiohttp==3.9.1",
1919
"kubernetes==32.0.0",
2020
"numpy==1.26.4",
2121
"prometheus-client==0.21.1",
2222
"python-multipart==0.0.20",
23-
"sentry-sdk[fastapi,httpx]==2.27.0",
23+
"sentry-sdk[fastapi]==2.27.0",
2424
"uhashring==2.3",
2525
"uvicorn==0.34.0",
2626
"xxhash==3.5.0",
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
import argparse
22

3-
import httpx
3+
import aiohttp
44

55

6-
def upload_file(server_url: str, file_path: str):
6+
async def upload_file(server_url: str, file_path: str):
77
"""Uploads a file to the production stack."""
88
try:
99
with open(file_path, "rb") as file:
1010
files = {"file": (file_path, file, "application/octet-stream")}
1111
data = {"purpose": "unknown"}
1212

13-
with httpx.Client() as client:
14-
response = client.post(server_url, files=files, data=data)
15-
16-
if response.status_code == 200:
17-
print("File uploaded successfully:", response.json())
18-
else:
19-
print("Failed to upload file:", response.text)
13+
async with aiohttp.ClientSession() as client:
14+
async with client.post(server_url, files=files, data=data) as response:
15+
if response.status == 200:
16+
result = await response.json()
17+
print("File uploaded successfully:", result)
18+
else:
19+
text = await response.text()
20+
print("Failed to upload file:", text)
2021
except Exception as e:
2122
print(f"Error: {e}")
2223

@@ -31,7 +32,9 @@ def parse_args():
3132

3233

3334
if __name__ == "__main__":
35+
import asyncio
36+
3437
args = parse_args()
3538
endpoint = args.url
3639
file_to_upload = args.path
37-
upload_file(endpoint, file_to_upload)
40+
asyncio.run(upload_file(endpoint, file_to_upload))

src/tests/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
aiohttp
12
fastapi
2-
httpx
33
openai
44
uvicorn
55
vllm

src/vllm_router/httpx_client.py renamed to src/vllm_router/aiohttp_client.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,38 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import httpx
14+
import aiohttp
1515

1616
from vllm_router.log import init_logger
1717

1818
logger = init_logger(__name__)
1919

2020

21-
class HTTPXClientWrapper:
21+
class AiohttpClientWrapper:
2222

2323
async_client = None
2424

2525
def start(self):
2626
"""Instantiate the client. Call from the FastAPI startup hook."""
2727
# To fully leverage the router's concurrency capabilities,
2828
# we set the maximum number of connections to be unlimited.
29-
limits = httpx.Limits(max_connections=None)
30-
self.async_client = httpx.AsyncClient(limits=limits)
31-
logger.info(f"httpx AsyncClient instantiated. Id {id(self.async_client)}")
29+
self.async_client = aiohttp.ClientSession()
30+
logger.info(f"aiohttp ClientSession instantiated. Id {id(self.async_client)}")
3231

3332
async def stop(self):
3433
"""Gracefully shutdown. Call from FastAPI shutdown hook."""
3534
logger.info(
36-
f"httpx async_client.is_closed(): {self.async_client.is_closed} - Now close it. Id (will be unchanged): {id(self.async_client)}"
35+
f"aiohttp async_client.closed: {self.async_client.closed} - Now close it. Id (will be unchanged): {id(self.async_client)}"
3736
)
38-
await self.async_client.aclose()
37+
await self.async_client.close()
3938
logger.info(
40-
f"httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}"
39+
f"aiohttp async_client.closed: {self.async_client.closed}. Id (will be unchanged): {id(self.async_client)}"
4140
)
4241
self.async_client = None
43-
logger.info("httpx AsyncClient closed")
42+
logger.info("aiohttp ClientSession closed")
4443

4544
def __call__(self):
46-
"""Calling the instantiated HTTPXClientWrapper returns the wrapped singleton."""
45+
"""Calling the instantiated AiohttpClientWrapper returns the wrapped singleton."""
4746
# Ensure we don't use it if not started / running
4847
assert self.async_client is not None
4948
return self.async_client

src/vllm_router/app.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import uvicorn
2020
from fastapi import FastAPI
2121

22+
from vllm_router.aiohttp_client import AiohttpClientWrapper
2223
from vllm_router.dynamic_config import (
2324
DynamicRouterConfig,
2425
get_dynamic_config_watcher,
2526
initialize_dynamic_config_watcher,
2627
)
2728
from vllm_router.experimental import get_feature_gates, initialize_feature_gates
28-
from vllm_router.httpx_client import HTTPXClientWrapper
2929
from vllm_router.parsers.parser import parse_args
3030
from vllm_router.routers.batches_router import batches_router
3131
from vllm_router.routers.files_router import files_router
@@ -82,11 +82,16 @@
8282

8383
@asynccontextmanager
8484
async def lifespan(app: FastAPI):
85-
app.state.httpx_client_wrapper.start()
85+
app.state.aiohttp_client_wrapper.start()
8686
if hasattr(app.state, "batch_processor"):
8787
await app.state.batch_processor.initialize()
88+
89+
service_discovery = get_service_discovery()
90+
if hasattr(service_discovery, "initialize_client_sessions"):
91+
await service_discovery.initialize_client_sessions()
92+
8893
yield
89-
await app.state.httpx_client_wrapper.stop()
94+
await app.state.aiohttp_client_wrapper.stop()
9095

9196
# Close the threaded-components
9297
logger.info("Closing engine stats scraper")
@@ -265,7 +270,7 @@ def initialize_all(app: FastAPI, args):
265270
app.include_router(files_router)
266271
app.include_router(batches_router)
267272
app.include_router(metrics_router)
268-
app.state.httpx_client_wrapper = HTTPXClientWrapper()
273+
app.state.aiohttp_client_wrapper = AiohttpClientWrapper()
269274
app.state.semantic_cache_available = semantic_cache_available
270275

271276

src/vllm_router/requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
aiofiles==24.1.0
2+
aiohttp==3.9.5
23
fastapi==0.115.8
3-
httpx==0.28.1
44
kubernetes==32.0.0
55
numpy==1.26.4
66
prometheus_client==0.21.1
77
psutil==7.0.0
88
python-multipart==0.0.20
9-
sentry-sdk[fastapi,httpx]==2.27.0
9+
sentry-sdk[fastapi]==2.27.0
1010
uhashring==2.3
1111
uvicorn==0.34.0
1212
xxhash==3.5.0

src/vllm_router/service_discovery.py

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from dataclasses import dataclass
2323
from typing import Dict, List, Optional
2424

25-
import httpx
25+
import aiohttp
2626
import requests
2727
from kubernetes import client, config, watch
2828

@@ -308,22 +308,29 @@ def get_endpoint_info(self) -> List[EndpointInfo]:
308308
model_info=self._get_model_info(model),
309309
)
310310
endpoint_infos.append(endpoint_info)
311+
return endpoint_infos
312+
313+
async def initialize_client_sessions(self) -> None:
314+
"""
315+
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
316+
This must be called from an async context during app startup.
317+
"""
311318
if (
312319
self.prefill_model_labels is not None
313320
and self.decode_model_labels is not None
314321
):
322+
endpoint_infos = self.get_endpoint_info()
315323
for endpoint_info in endpoint_infos:
316324
if endpoint_info.model_label in self.prefill_model_labels:
317-
self.app.state.prefill_client = httpx.AsyncClient(
325+
self.app.state.prefill_client = aiohttp.ClientSession(
318326
base_url=endpoint_info.url,
319-
timeout=None,
327+
timeout=aiohttp.ClientTimeout(total=None),
320328
)
321329
elif endpoint_info.model_label in self.decode_model_labels:
322-
self.app.state.decode_client = httpx.AsyncClient(
330+
self.app.state.decode_client = aiohttp.ClientSession(
323331
base_url=endpoint_info.url,
324-
timeout=None,
332+
timeout=aiohttp.ClientTimeout(total=None),
325333
)
326-
return endpoint_infos
327334

328335

329336
class K8sPodIPServiceDiscovery(ServiceDiscovery):
@@ -629,20 +636,7 @@ def _add_engine(
629636
namespace=self.namespace,
630637
model_info=model_info,
631638
)
632-
if (
633-
self.prefill_model_labels is not None
634-
and self.decode_model_labels is not None
635-
):
636-
if model_label in self.prefill_model_labels:
637-
self.app.state.prefill_client = httpx.AsyncClient(
638-
base_url=f"http://{engine_ip}:{self.port}",
639-
timeout=None,
640-
)
641-
elif model_label in self.decode_model_labels:
642-
self.app.state.decode_client = httpx.AsyncClient(
643-
base_url=f"http://{engine_ip}:{self.port}",
644-
timeout=None,
645-
)
639+
646640
# Store model information in the endpoint info
647641
self.available_engines[engine_name].model_info = model_info
648642

@@ -720,6 +714,28 @@ def close(self):
720714
self.k8s_watcher.stop()
721715
self.watcher_thread.join()
722716

717+
async def initialize_client_sessions(self) -> None:
718+
"""
719+
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
720+
This must be called from an async context during app startup.
721+
"""
722+
if (
723+
self.prefill_model_labels is not None
724+
and self.decode_model_labels is not None
725+
):
726+
endpoint_infos = self.get_endpoint_info()
727+
for endpoint_info in endpoint_infos:
728+
if endpoint_info.model_label in self.prefill_model_labels:
729+
self.app.state.prefill_client = aiohttp.ClientSession(
730+
base_url=endpoint_info.url,
731+
timeout=aiohttp.ClientTimeout(total=None),
732+
)
733+
elif endpoint_info.model_label in self.decode_model_labels:
734+
self.app.state.decode_client = aiohttp.ClientSession(
735+
base_url=endpoint_info.url,
736+
timeout=aiohttp.ClientTimeout(total=None),
737+
)
738+
723739

724740
class K8sServiceNameServiceDiscovery(ServiceDiscovery):
725741
def __init__(
@@ -1024,20 +1040,7 @@ def _add_engine(self, engine_name: str, model_names: List[str], model_label: str
10241040
namespace=self.namespace,
10251041
model_info=model_info,
10261042
)
1027-
if (
1028-
self.prefill_model_labels is not None
1029-
and self.decode_model_labels is not None
1030-
):
1031-
if model_label in self.prefill_model_labels:
1032-
self.app.state.prefill_client = httpx.AsyncClient(
1033-
base_url=f"http://{engine_name}:{self.port}",
1034-
timeout=None,
1035-
)
1036-
elif model_label in self.decode_model_labels:
1037-
self.app.state.decode_client = httpx.AsyncClient(
1038-
base_url=f"http://{engine_name}:{self.port}",
1039-
timeout=None,
1040-
)
1043+
10411044
# Store model information in the endpoint info
10421045
self.available_engines[engine_name].model_info = model_info
10431046

@@ -1114,6 +1117,28 @@ def close(self):
11141117
self.k8s_watcher.stop()
11151118
self.watcher_thread.join()
11161119

1120+
async def initialize_client_sessions(self) -> None:
1121+
"""
1122+
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
1123+
This must be called from an async context during app startup.
1124+
"""
1125+
if (
1126+
self.prefill_model_labels is not None
1127+
and self.decode_model_labels is not None
1128+
):
1129+
endpoint_infos = self.get_endpoint_info()
1130+
for endpoint_info in endpoint_infos:
1131+
if endpoint_info.model_label in self.prefill_model_labels:
1132+
self.app.state.prefill_client = aiohttp.ClientSession(
1133+
base_url=endpoint_info.url,
1134+
timeout=aiohttp.ClientTimeout(total=None),
1135+
)
1136+
elif endpoint_info.model_label in self.decode_model_labels:
1137+
self.app.state.decode_client = aiohttp.ClientSession(
1138+
base_url=endpoint_info.url,
1139+
timeout=aiohttp.ClientTimeout(total=None),
1140+
)
1141+
11171142

11181143
def _create_service_discovery(
11191144
service_discovery_type: ServiceDiscoveryType, *args, **kwargs

0 commit comments

Comments
 (0)