Skip to content

Commit 8b013e7

Browse files
committed
feat: support vefaas deployment
Signed-off-by: Meinhard Zhou <[email protected]>
1 parent d88b177 commit 8b013e7

File tree

6 files changed

+297
-3
lines changed

6 files changed

+297
-3
lines changed

src/swerex/deployment/config.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,53 @@ def get_deployment(self) -> AbstractDeployment:
186186

187187
return DummyDeployment.from_config(self)
188188

189+
class VeFaasDeploymentConfig(BaseModel):
190+
image: str = "python:3.11"
191+
platform: str | None = None
192+
startup_timeout: float = 180.0
193+
model_config = ConfigDict(extra="forbid")
194+
ak: str | None = None
195+
sk: str | None = None
196+
region: str | None = None
197+
mount_path: str = "/mnt/nas/"
198+
apigateway_service_id: str | None = None
199+
function_id: str | None = None
200+
request_timeout: int = 300
201+
client_side_validation: bool = True
202+
203+
@model_validator(mode="before")
204+
def validate_platform_args(cls, data: dict) -> dict:
205+
if not isinstance(data, dict):
206+
return data
207+
208+
docker_args = data.get("docker_args", [])
209+
platform = data.get("platform")
210+
211+
platform_arg_idx = next((i for i, arg in enumerate(docker_args) if arg.startswith("--platform")), -1)
212+
213+
if platform_arg_idx != -1:
214+
if platform is not None:
215+
msg = "Cannot specify platform both via 'platform' field and '--platform' in docker_args"
216+
raise ValueError(msg)
217+
# Extract platform value from --platform argument
218+
if "=" in docker_args[platform_arg_idx]:
219+
# Handle case where platform is specified as --platform=value
220+
data["platform"] = docker_args[platform_arg_idx].split("=", 1)[1]
221+
data["docker_args"] = docker_args[:platform_arg_idx] + docker_args[platform_arg_idx + 1 :]
222+
elif platform_arg_idx + 1 < len(docker_args):
223+
data["platform"] = docker_args[platform_arg_idx + 1]
224+
# Remove the --platform and its value from docker_args
225+
data["docker_args"] = docker_args[:platform_arg_idx] + docker_args[platform_arg_idx + 2 :]
226+
else:
227+
msg = "--platform argument must be followed by a value"
228+
raise ValueError(msg)
229+
230+
return data
231+
232+
def get_deployment(self) -> AbstractDeployment:
233+
from swerex.deployment.vefaas import VeFaasDeployment
234+
235+
return VeFaasDeployment.from_config(self)
189236

190237
class DaytonaDeploymentConfig(BaseModel):
191238
"""Configuration for Daytona deployment."""
@@ -211,6 +258,7 @@ def get_deployment(self) -> AbstractDeployment:
211258
| RemoteDeploymentConfig
212259
| DummyDeploymentConfig
213260
| DaytonaDeploymentConfig
261+
| VeFaasDeploymentConfig
214262
)
215263
"""Union of all deployment configurations. Useful for type hints."""
216264

src/swerex/deployment/vefaas.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import logging
2+
import time
3+
import uuid
4+
5+
from typing import Any
6+
from typing_extensions import Self
7+
8+
from swerex.exceptions import DeploymentNotStartedError, DeploymentStartupError
9+
10+
from swerex.deployment.abstract import AbstractDeployment
11+
from swerex.deployment.config import VeFaasDeploymentConfig
12+
from swerex.deployment.hooks.abstract import DeploymentHook, CombinedDeploymentHook
13+
from swerex.runtime.abstract import IsAliveResponse
14+
from swerex.runtime.remote import RemoteRuntime
15+
from swerex.runtime.config import RemoteRuntimeConfig
16+
17+
from swerex.utils.log import get_logger
18+
from swerex.utils.wait import _wait_until_alive
19+
20+
21+
from volcenginesdkvefaas import (
22+
VEFAASApi,
23+
InstanceImageInfoForCreateSandboxInput,
24+
CreateSandboxRequest,
25+
CreateSandboxResponse,
26+
KillSandboxRequest,
27+
)
28+
from volcenginesdkcore import ApiClient, Configuration
29+
30+
from volcenginesdkapig import APIGApi
31+
from volcenginesdkapig.models import (
32+
GetGatewayServiceRequest,
33+
GetGatewayServiceResponse,
34+
)
35+
36+
class VeFaasDeployment(AbstractDeployment):
37+
def __init__(
38+
self,
39+
*,
40+
logger: logging.Logger | None = None,
41+
**kwargs: Any
42+
):
43+
self._config = VeFaasDeploymentConfig(**kwargs)
44+
self._runtime: RemoteRuntime | None = None
45+
self._container_name = None
46+
self._hooks = CombinedDeploymentHook()
47+
self.logger = logger or get_logger("rex-deploy")
48+
self._runtime_timeout = 0.15
49+
self._api_client = None
50+
51+
self._sandbox_id = ""
52+
53+
@classmethod
54+
def from_config(cls, config: VeFaasDeploymentConfig) -> Self:
55+
return cls(**config.model_dump())
56+
57+
def add_hook(self, hook: DeploymentHook):
58+
self._hooks.add_hook(hook)
59+
60+
def _get_token(self) -> str:
61+
return str(uuid.uuid4())
62+
63+
async def is_alive(self, *, timeout: float | None = None) -> IsAliveResponse:
64+
"""Checks if the runtime is alive. The return value can be
65+
tested with bool().
66+
67+
Raises:
68+
DeploymentNotStartedError: If the deployment was not started.
69+
"""
70+
if self._runtime is None:
71+
msg = "Runtime not started"
72+
raise RuntimeError(msg)
73+
return await self._runtime.is_alive(timeout=timeout)
74+
75+
async def _wait_until_alive(self, timeout: float = 10.0):
76+
try:
77+
return await _wait_until_alive(self.is_alive, timeout=timeout, function_timeout=self._runtime_timeout)
78+
except TimeoutError as e:
79+
self.logger.error("Runtime did not start within timeout. Here's the output from the container process.")
80+
await self.stop()
81+
raise e
82+
83+
def _get_domain(self, apigs_id):
84+
api_instance = APIGApi(self._get_api_client())
85+
req = GetGatewayServiceRequest(
86+
id = apigs_id,
87+
)
88+
response = api_instance.get_gateway_service(req)
89+
if not isinstance(response, GetGatewayServiceResponse):
90+
raise Exception(response)
91+
92+
https_domains = [d.domain for d in response.gateway_service.domains if d.domain.startswith("https://")]
93+
94+
if https_domains:
95+
return https_domains[0]
96+
elif response.gateway_service.domains:
97+
return response.gateway_service.domains[0].domain
98+
else:
99+
return None
100+
101+
def _get_container_name(self) -> str:
102+
"""Returns a unique container name based on the image name."""
103+
image_str = self._config.image.split('/')
104+
image_name_sanitized = image_str[-1].replace('_', '-')
105+
image_name_sanitized = image_name_sanitized.replace(':', '-')
106+
image_name_sanitized = image_name_sanitized.replace('.', '-')
107+
108+
return image_name_sanitized[:-14]
109+
110+
def _get_api_client(self) -> ApiClient:
111+
if self._api_client:
112+
return self._api_client
113+
114+
access_key = self._config.ak
115+
secret_key = self._config.sk
116+
region = self._config.region
117+
118+
if not access_key or not secret_key:
119+
emsg = "VOLCENGINE_ACCESS_KEY and VOLCENGINE_SECRET_KEY must be set"
120+
raise DeploymentStartupError(emsg)
121+
122+
config = Configuration()
123+
config.ak = access_key
124+
config.sk = secret_key
125+
config.region = region
126+
_api_client = ApiClient(config)
127+
128+
self._api_client = _api_client
129+
return self._api_client
130+
131+
async def create_sandbox(self, function_id, image, cmd, request_timeout) -> str:
132+
client = VEFAASApi(self._get_api_client())
133+
134+
instance_image_info = InstanceImageInfoForCreateSandboxInput(
135+
image=image,
136+
port=8000,
137+
command=cmd
138+
)
139+
140+
response = client.create_sandbox(
141+
CreateSandboxRequest(
142+
function_id=function_id,
143+
instance_image_info=instance_image_info,
144+
request_timeout=request_timeout,
145+
timeout=120,
146+
)
147+
)
148+
if not isinstance(response, CreateSandboxResponse):
149+
emsg = "Failed to create sandbox"
150+
raise DeploymentStartupError(emsg)
151+
if not response.sandbox_id:
152+
emsg = "Failed to create sandbox: no sandbox id"
153+
raise DeploymentStartupError(emsg)
154+
return response.sandbox_id
155+
156+
async def kill_sandbox(self) -> str:
157+
client = VEFAASApi(self._get_api_client())
158+
159+
if self._sandbox_id:
160+
response = client.kill_sandbox(
161+
KillSandboxRequest(
162+
function_id=self._config.function_id,
163+
sandbox_id=self._sandbox_id
164+
)
165+
)
166+
if not isinstance(response, CreateSandboxResponse):
167+
self.logger.warning(
168+
f"Kill Sandbox {self._sandbox_id} Failed"
169+
)
170+
self._sandbox_id = ""
171+
172+
async def start(self):
173+
""" Start Faas runtime"""
174+
175+
assert self._container_name is None
176+
self._container_name = self._get_container_name()
177+
178+
self.logger.info(
179+
f"Starting container {self._container_name}"
180+
)
181+
182+
# Gen swe-rex command
183+
token = self._get_token()
184+
mount_path = self._config.mount_path
185+
cmd = f"source {mount_path}/SWE-ReX/bin/activate && python3 {mount_path}/SWE-ReX/src/swerex/server.py --auth-token {token}"
186+
187+
# create sandbox
188+
sandbox_id = await self.create_sandbox(self._config.function_id, self._config.image, cmd, self._config.request_timeout)
189+
self._sandbox_id = sandbox_id
190+
191+
domain = self._get_domain(self._config.apigateway_service_id)
192+
193+
self._runtime = RemoteRuntime.from_config(
194+
RemoteRuntimeConfig(host=domain, timeout=self._runtime_timeout, auth_token=token, faas_instance_name=self._sandbox_id)
195+
)
196+
197+
t0 = time.time()
198+
await self._wait_until_alive(timeout=self._config.startup_timeout)
199+
self.logger.info(f"Runtime started in {time.time() - t0:.2f}s")
200+
201+
async def stop(self):
202+
""" Stop the runtime """
203+
if self._runtime is not None:
204+
await self._runtime.close()
205+
self._runtime = None
206+
207+
# kill sandbox
208+
await self.kill_sandbox()
209+
210+
@property
211+
def runtime(self) -> RemoteRuntime:
212+
"""Returns the runtime if running.
213+
214+
Raises:
215+
DeploymentNotStartedError: If the deployment was not started.
216+
"""
217+
if self._runtime is None:
218+
raise DeploymentNotStartedError()
219+
return self._runtime

src/swerex/exceptions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ def __init__(self, message="Deployment not started"):
4242
super().__init__(message)
4343

4444

45-
class DeploymentStartupError(SwerexException, RuntimeError): ...
45+
class DeploymentStartupError(SwerexException, RuntimeError):
46+
def __init__(self, message="Deployment startup error"):
47+
super().__init__(message)
4648

4749

4850
class DockerPullError(DeploymentStartupError): ...

src/swerex/runtime/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ class RemoteRuntimeConfig(BaseModel):
2929
timeout: float = 0.15
3030
"""The timeout for the runtime."""
3131

32+
faas_instance_name: str | None = None
33+
"""For Vefaas instance."""
34+
3235
type: Literal["remote"] = "remote"
3336
"""Discriminator for (de)serialization/CLI. Do not change."""
3437

src/swerex/runtime/remote.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,13 @@ def _get_timeout(self, timeout: float | None = None) -> float:
7171
@property
7272
def _headers(self) -> dict[str, str]:
7373
"""Request headers to use for authentication."""
74+
headers = {}
75+
7476
if self._config.auth_token:
75-
return {"X-API-Key": self._config.auth_token}
76-
return {}
77+
headers["X-API-Key"] = self._config.auth_token
78+
if self._config.faas_instance_name:
79+
headers["x-faas-instance-name"] = self._config.faas_instance_name
80+
return headers
7781

7882
@property
7983
def _api_url(self) -> str:

tests/test_faas_deployment.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import pytest
2+
3+
from swerex.deployment.vefaas import VeFaasDeployment
4+
5+
async def test_faas_deployment():
6+
f = VeFaasDeployment(
7+
image="enterprise-public-cn-beijing.cr.volces.com/swe-bench/sweb.eval.x86_64.django_1776_django-15414:latest",
8+
ak="",
9+
sk="",
10+
region="cn-beijing",
11+
function_id="awokjltn",
12+
apigateway_service_id="sd2on64i5ni4n75n9unpg",
13+
)
14+
with pytest.raises(RuntimeError):
15+
await f.is_alive()
16+
await f.start()
17+
assert await f.is_alive()
18+
await f.stop()

0 commit comments

Comments
 (0)