@@ -87,15 +87,33 @@ def get_status_incremental(self, task_id: str):
8787 return []
8888
8989 try :
90- # 使用 os.open + os.fstat 绕过 Python 的 stat 缓存
91- # os.fstat(fd) 直接从文件描述符获取
90+ import time as perf_time
91+ perf_start = perf_time .time ()
92+
93+ # 强制刷新 NFS 目录缓存:先 listdir 触发目录元数据更新
94+ listdir_start = perf_time .time ()
95+ try :
96+ dir_path = os .path .dirname (file_path )
97+ os .listdir (dir_path ) # 强制刷新目录缓存
98+ except Exception as e :
99+ log ("DEBUG" , f"listdir failed for { task_id } : { e } " )
100+ listdir_cost = (perf_time .time () - listdir_start ) * 1000
101+ log ("DEBUG" , f"[Perf][{ task_id } ] listdir cost: { listdir_cost :.1f} ms" )
102+
103+ # 关键:使用 os.open + os.fstat 绕过 Python 的 stat 缓存
104+ # os.stat() 可能读取缓存的元数据,而 os.fstat(fd) 直接从文件描述符获取
105+ fstat_start = perf_time .time ()
92106 try :
93107 fd = os .open (file_path , os .O_RDONLY )
94108 file_stat = os .fstat (fd )
95109 os .close (fd )
96110 except Exception as e :
97111 log ("DEBUG" , f"Fallback to regular stat for { task_id } : { e } " )
98112 file_stat = os .stat (file_path )
113+
114+ stat_time = (perf_time .time () - fstat_start ) * 1000
115+ log ("DEBUG" , f"[Perf][{ task_id } ] fstat cost: { stat_time :.1f} ms" )
116+
99117 current_size = file_stat .st_size
100118 current_modified = file_stat .st_mtime
101119
@@ -120,6 +138,8 @@ def get_status_incremental(self, task_id: str):
120138
121139 # 读取新增内容 - 使用无缓冲模式
122140 new_statuses = []
141+ read_start = perf_time .time ()
142+
123143 # 使用 buffering=0 和 binary 模式读取,绕过 Python 的缓冲层
124144 with open (file_path , 'rb' , buffering = 0 ) as f :
125145 # 定位到上次读取位置
@@ -128,30 +148,40 @@ def get_status_incremental(self, task_id: str):
128148 # 读取新增内容(二进制模式)
129149 new_content_bytes = f .read ()
130150 new_position = f .tell ()
131-
132- # 解码为文本
133- new_content = new_content_bytes .decode ('utf-8' , errors = 'ignore' )
134-
135- if new_content .strip ():
136- # 按行解析JSON
137- for line in new_content .strip ().split ('\n ' ):
138- if line .strip ():
139- try :
140- status_data = json .loads (line )
141- new_statuses .append (status_data )
142- except json .JSONDecodeError as e :
143- log ("WARNING" , f"Failed to parse status line for task { task_id } : { line [:100 ]} ... Error: { e } " )
144- continue
151+
152+ read_cost = (perf_time .time () - read_start ) * 1000
153+ log ("DEBUG" , f"[Perf][{ task_id } ] file read cost: { read_cost :.1f} ms, bytes={ len (new_content_bytes )} " )
154+
155+ parse_start = perf_time .time ()
156+ # 解码为文本
157+ new_content = new_content_bytes .decode ('utf-8' , errors = 'ignore' )
158+
159+ if new_content .strip ():
160+ # 按行解析JSON
161+ for line in new_content .strip ().split ('\n ' ):
162+ if line .strip ():
163+ try :
164+ status_data = json .loads (line )
165+ new_statuses .append (status_data )
166+ except json .JSONDecodeError as e :
167+ log ("WARNING" , f"Failed to parse status line for task { task_id } : { line [:100 ]} ... Error: { e } " )
168+ continue
169+
170+ parse_cost = (perf_time .time () - parse_start ) * 1000
171+ log ("DEBUG" , f"[Perf][{ task_id } ] parse cost: { parse_cost :.1f} ms" )
145172
146173 # 更新缓存
147174 self ._file_read_cache [task_id ] = {
148175 "position" : new_position ,
149176 "last_modified" : current_modified
150177 }
151178
179+ total_cost = (perf_time .time () - perf_start ) * 1000
180+
152181 if new_statuses :
153- log ("DEBUG" , f"Read { len (new_statuses )} new status updates for task { task_id } "
154- f"(position: { last_position } -> { new_position } )" )
182+ log ("INFO" , f"[Perf][{ task_id } ] get_status_incremental total: { total_cost :.1f} ms "
183+ f"(listdir={ listdir_cost :.1f} ms, fstat={ stat_time :.1f} ms, read={ read_cost :.1f} ms, parse={ parse_cost :.1f} ms), "
184+ f"got { len (new_statuses )} statuses, position: { last_position } -> { new_position } " )
155185
156186 return new_statuses
157187
0 commit comments