Skip to content

Conversation

@KoalaBryson
Copy link
Collaborator

No description provided.

Copy link
Collaborator

@CiCi503 CiCi503 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 去掉无关的 md 文件和测试脚本,提交前要看一遍提交的内容
  2. 代码结构调整:在 service 下新增 gateway 模块,将在 route 里和 serverlessapi 下的关于 cpu 函数的复杂逻辑移至 gateway 下

service.start(constants.AUTO_LAUNCH_SNAPSHOT_NAME)

print(f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}")
service.start('latest-dev', nodes_map={}) # 使用latest-dev快照,与_tryStartIGService保持一致
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这不行,需要区分 dev 环境和 prod 环境。
prod 环境的 initialize 需要基于环境变量里配置的 prod-xxx 的 snapshot 启动

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpu 的 ws 处理需要大改:

  1. ws 中不止包含 status 信息,这也意味着我们不能只返回 status 信息,还有一些插件的状态是在 ws 里返回的,比如:
image 更合理的实现是: - 1 个 clientid -> 1 个 ws -> 1 个 gpu 实例 - 将特定 gpu 实例的 ws messages 全部返回给指定客户端

print(f"[Enhanced History] Error getting all persisted history: {e}")
return {}

def _get_persisted_history_by_prompt_id(self, api_service, prompt_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不要将这些复杂的业务逻辑放到 routes 里。
在 service 下新建一个 gateway 目录,将 get_history、status_poller 相关逻辑移动至 gateway 中

else:
return []

def get_status_from_store_incremental(self, task_id: str):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个逻辑和 serverless api 无关,移走

@KoalaBryson KoalaBryson force-pushed the comfyui-cluster branch 2 times, most recently from 969857b to 12b563e Compare November 6, 2025 06:22

_original_print = builtins.print

def timestamped_print(*args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以 rebase cap,用 agent/utils 中的 logger


# 使用环境变量指定的snapshot,默认为latest-dev
snapshot_name = os.environ.get('AUTO_LAUNCH_SNAPSHOT_NAME', 'latest-dev')
print(f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}, snapshot: {snapshot_name}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 这里的 print/logger 使用 util 中的 logger
  2. 系统相关的日志默认设置为 DEBUG,默认用户在日志中不需要看到

service.start(constants.AUTO_LAUNCH_SNAPSHOT_NAME)

# 使用环境变量指定的snapshot,默认为latest-dev
snapshot_name = os.environ.get('AUTO_LAUNCH_SNAPSHOT_NAME', 'latest-dev')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latest-dev 的 snapshot 是在哪里生成的?


# 发送初始状态消息(模拟ComfyUI原生行为)
client_id = f"cpu_client_{int(time.time() * 1000)}"
ws.send(json.dumps({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_connection 中有 _send_initial_status ,这里也 set status 了,为啥需要有两份呢?

print(f"ws connected: {json.dumps(conn_info, indent=2)}")

# 设置TCP_NODELAY禁用Nagle算法,确保消息立即发送
self._set_tcp_nodelay(ws)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

禁用 Nagle 算法是有必要的吗?解决了什么问题?需要引入这个复杂度吗?

# 任务状态推送功能
self._task_subscriptions: Dict[str, Set] = {} # task_id -> set of websockets
self._client_subscriptions: Dict = {} # websocket -> set of task_ids
self._client_id_mapping: Dict[str, Set] = {} # client_id -> set of websockets
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_id_mapping 为啥需要一个 set 呢?一个 client_id 应该对应一个 ws?

self._task_subscriptions: Dict[str, Set] = {} # task_id -> set of websockets
self._client_subscriptions: Dict = {} # websocket -> set of task_ids
self._client_id_mapping: Dict[str, Set] = {} # client_id -> set of websockets
self._ws_client_id_mapping: Dict = {} # websocket -> client_id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ws_client_id_mapping 没有存在的必要吧, getattr(ws, '_comfyui_client_id') 可以获取到 client_id 呀

"message": "Please start your comfyui/sd service first"
}), 500
# CPU模式:接收ComfyUI原生的WebSocket连接,但推送基于任务队列的真实状态
if constants.COMFYUI_MODE == "cpu":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当前的 routes 代码结构职责混乱,有代码有路由,还有很多 cpu/gpu 的判断散落在各处。建议将 cpu_router 和 gpu_router 分开。
image

return _task_queue_manager

# 为了向后兼容,提供一个属性访问器
class TaskQueueManagerProxy:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 TaskQueueManagerProxy 有啥用?

@KoalaBryson KoalaBryson force-pushed the comfyui-cluster branch 3 times, most recently from 047dbcf to 5326170 Compare November 24, 2025 02:51
}
}), 500

def _register_userdata_block(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 优化函数名,_register_userdata_block => _register_userdata_handler?
  2. 设置一个环境变量吧,默认不允许 save userdata,可以通过环境变量开关控制是否 save userdata

elif fc_request_id:
return fc_request_id, "x-fc-request-id"
else:
return None, "none"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有获取到 taskid 是非常严重的异常,要打 error 日志

def _async_handle_task_failure(self, task_id: str, status_data: dict):
"""异步处理任务失败(向后兼容方法)
注意:此方法已被 _async_handle_task_failure_after_update 替代
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此方法已经被替代了,还留着干啥呢?

yinsu.zs added 2 commits November 25, 2025 21:38
Change-Id: Id6716878f840d1c44ef7f861f1cd459e59b166d8
Change-Id: I7a5940740cd62ea924248a701e18e81826ee99b1
Change-Id: Ia021faa5fdde460c3df814ca9f7a6c7ce4e29066
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants