Skip to content

Commit b783ebe

Browse files
authored
Merge pull request #550 from NERSC/run-podman-in-event-loop
run podman in threads/sync to prevent starving thread that is updating agent KV
2 parents 1320bf9 + 95e679c commit b783ebe

File tree

1 file changed

+46
-35
lines changed

1 file changed

+46
-35
lines changed

backend/agent/interactem/agent/agent.py

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,11 @@ async def _cleanup_containers(self):
239239
with PodmanClient(
240240
base_url=self._podman_service_uri, max_pool_size=PODMAN_MAX_POOL_SIZE
241241
) as client:
242-
containers = client.containers.list(
243-
filters={"label": f"agent.id={self.id}"}
242+
containers = await to_thread.run_sync(
243+
functools.partial(
244+
client.containers.list,
245+
filters={"label": f"agent.id={self.id}"},
246+
)
244247
)
245248
# Exclude vector container from cleanup
246249
operator_containers = [
@@ -289,20 +292,23 @@ async def _start_vector_container(self) -> Container:
289292
"Type": "k8s-file",
290293
"Config": {"path": f"{cfg.LOG_DIR}/vector.log"},
291294
}
292-
container = client.containers.create(
293-
image=VECTOR_IMAGE,
294-
environment=GLOBAL_ENV,
295-
name=f"vector-{self.id}",
296-
detach=True,
297-
stdout=True,
298-
stderr=True,
299-
log_config=log_config,
300-
network_mode="host",
301-
remove=True,
302-
labels={"agent.id": str(self.id), "container.type": "vector"},
303-
mounts=[mount.model_dump() for mount in cfg.vector_mounts],
295+
container = await to_thread.run_sync(
296+
functools.partial(
297+
client.containers.create,
298+
image=VECTOR_IMAGE,
299+
environment=GLOBAL_ENV,
300+
name=f"vector-{self.id}",
301+
detach=True,
302+
stdout=True,
303+
stderr=True,
304+
log_config=log_config,
305+
network_mode="host",
306+
remove=True,
307+
labels={"agent.id": str(self.id), "container.type": "vector"},
308+
mounts=[mount.model_dump() for mount in cfg.vector_mounts],
309+
)
304310
)
305-
container.start()
311+
await to_thread.run_sync(container.start)
306312
return container
307313

308314
async def receive_cancellation(self, event: AgentDeploymentStopEvent):
@@ -698,7 +704,7 @@ async def _start_operator(
698704
)
699705
if not container:
700706
raise RuntimeError(f"Failed to create container for operator {operator.id}")
701-
container.start()
707+
await to_thread.run_sync(container.start)
702708

703709
return operator, container
704710

@@ -834,7 +840,7 @@ async def monitor_containers(self):
834840
tracker.unmark()
835841
continue
836842
try:
837-
tracker.container.reload()
843+
await to_thread.run_sync(tracker.container.reload)
838844
if tracker.container.status == "running":
839845
continue
840846
if tracker.num_restarts >= ContainerTracker.MAX_RESTARTS:
@@ -875,7 +881,7 @@ async def restart_operator(self, tracker: ContainerTracker):
875881
new_container = await create_container(
876882
self.id, client, operator, self._current_deployment.deployment_id
877883
)
878-
new_container.start()
884+
await to_thread.run_sync(new_container.start)
879885
self.container_trackers[operator.id].container = new_container
880886

881887
async def _set_status(
@@ -965,20 +971,23 @@ async def create_container(
965971
if not cfg.LOCAL and operator.requires_gpus:
966972
create_kwargs["gpu"] = True
967973

968-
return client.containers.create(
969-
image=operator.image,
970-
environment=operator.env,
971-
name=op_name,
972-
command=operator.command,
973-
detach=True,
974-
stdout=True,
975-
stderr=True,
976-
log_config=log_config,
977-
network_mode=network_mode,
978-
remove=True,
979-
labels={"agent.id": str(agent_id)},
980-
mounts=[mount.model_dump() for mount in operator.all_mounts],
981-
**create_kwargs,
974+
return await to_thread.run_sync(
975+
functools.partial(
976+
client.containers.create,
977+
image=operator.image,
978+
environment=operator.env,
979+
name=op_name,
980+
command=operator.command,
981+
detach=True,
982+
stdout=True,
983+
stderr=True,
984+
log_config=log_config,
985+
network_mode=network_mode,
986+
remove=True,
987+
labels={"agent.id": str(agent_id)},
988+
mounts=[mount.model_dump() for mount in operator.all_mounts],
989+
**create_kwargs,
990+
)
982991
)
983992

984993
raise RuntimeError(
@@ -991,7 +1000,7 @@ class ContainerStillRunning(Exception):
9911000
pass
9921001

9931002
try:
994-
container.reload()
1003+
await to_thread.run_sync(container.reload)
9951004
except podman.errors.exceptions.NotFound:
9961005
logger.warning(f"Container {container.name} not found during reload")
9971006
return
@@ -1002,7 +1011,7 @@ class ContainerStillRunning(Exception):
10021011
await to_thread.run_sync(container.stop)
10031012
for attempt in stamina.retry_context(on=ContainerStillRunning):
10041013
with attempt:
1005-
container.reload()
1014+
await to_thread.run_sync(container.reload)
10061015
if container.status == "running":
10071016
raise ContainerStillRunning(
10081017
f"Container {container.name} is still running"
@@ -1020,6 +1029,8 @@ async def handle_name_conflict(client: PodmanClient, container_name: str) -> Non
10201029
f"Container name conflict detected for {container_name}. "
10211030
f"Attempting to remove the conflicting container."
10221031
)
1023-
conflicting_container = client.containers.get(container_name)
1032+
conflicting_container = await to_thread.run_sync(
1033+
functools.partial(client.containers.get, container_name)
1034+
)
10241035
await stop_and_remove_container(conflicting_container)
10251036
logger.info(f"Conflicting container {conflicting_container.id} removed. ")

0 commit comments

Comments
 (0)