Skip to content

Commit ce71483

Browse files
author
Tobias Kopp
committed
[Benchmark] Kill child processes on timeout
1 parent e78ba5f commit ce71483

File tree

1 file changed

+60
-63
lines changed

1 file changed

+60
-63
lines changed

benchmark/database_connectors/hyper.py

Lines changed: 60 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@ def execute(self, n_runs, params: dict()):
4747
result = None
4848
if self.multithreaded:
4949
result = HyPer._execute(n_runs, params)
50-
try:
51-
result = HyPer._execute(n_runs, params)
52-
except Exception as ex:
53-
tqdm.write(str(ex))
54-
sys.stdout.flush()
55-
return dict()
5650

5751
else:
5852
path = os.getcwd()
@@ -61,29 +55,26 @@ def execute(self, n_runs, params: dict()):
6155
sys.path.insert(0, '{path}/benchmark')
6256
import database_connectors.hyper
6357
print(repr(database_connectors.hyper.HyPer._execute({n_runs}, {repr(params)})))
58+
sys.stdout.flush()
6459
'''
6560
args = ['taskset', '-c', '2', 'python3', '-c', script]
6661
if self.verbose:
6762
tqdm.write(f" $ {' '.join(args)}")
6863
sys.stdout.flush()
69-
# try:
70-
# P = subprocess.run(
71-
# args=args,
72-
# capture_output=True,
73-
# text=True,
74-
# cwd=path
75-
# )
76-
# result = eval(P.stdout)
77-
# except Exception as ex:
78-
# tqdm.write(str(ex))
79-
# return dict()
64+
8065
P = subprocess.run(
81-
args=args,
82-
capture_output=True,
83-
text=True,
84-
cwd=path
85-
)
86-
result = eval(P.stdout)
66+
args=args,
67+
capture_output=True,
68+
text=True,
69+
cwd=path
70+
)
71+
72+
# Check returncode
73+
if P.returncode==0:
74+
result = eval(P.stdout)
75+
else:
76+
raise ConnectorException(f"Process failed with return code {P.returncode}")
77+
return dict()
8778

8879
patched_result = dict()
8980
for key, val in result.items():
@@ -143,33 +134,34 @@ def _execute(n_runs, params: dict()):
143134
timeout = TIMEOUT_PER_CASE
144135
times = None
145136
p = multiprocessing.Process(target=run_single_query, args=(connection, query))
146-
try:
147-
p.start()
148-
p.join(timeout=timeout)
137+
p.start()
138+
p.join(timeout=timeout)
139+
if p.is_alive():
140+
# timeout happened
141+
num_timeout_cases += 1
142+
time = timeout * 1000 # in ms
143+
144+
p.terminate() # try to shut down gracefully
145+
p.join(timeout=1) # wait for process to terminate
149146
if p.is_alive():
150-
# timeout happened
151-
num_timeout_cases += 1
152-
time = timeout * 1000 # in ms
153-
p.terminate()
154-
p.join()
155-
else:
156-
# no timeout, extract result
157-
matches = hyperconf.filter_results(
158-
hyperconf.extract_results(),
159-
{ 'k': 'query-end'},
160-
[ hyperconf.MATCH_SELECT ]
161-
)
162-
times = map(lambda m: m['v']['execution-time'] * 1000, matches)
163-
times = list(map(lambda t: f'{t:.3f}', times))
164-
case_idx = list(params['cases'].keys()).index(case)
165-
time = times[run_id * len(list(params['cases'].keys())) + case_idx - num_timeout_cases]
147+
p.kill() # kill if process did not terminate in time
148+
assert(not p.is_alive())
149+
else:
150+
# no timeout, extract result
151+
matches = hyperconf.filter_results(
152+
hyperconf.extract_results(),
153+
{ 'k': 'query-end'},
154+
[ hyperconf.MATCH_SELECT ]
155+
)
156+
times = map(lambda m: m['v']['execution-time'] * 1000, matches)
157+
times = list(map(lambda t: f'{t:.3f}', times))
158+
case_idx = list(params['cases'].keys()).index(case)
159+
time = times[run_id * len(list(params['cases'].keys())) + case_idx - num_timeout_cases]
166160

167-
if case not in measurement_times.keys():
168-
measurement_times[case] = list()
169-
measurement_times[case].append(time)
161+
if case not in measurement_times.keys():
162+
measurement_times[case] = list()
163+
measurement_times[case].append(time)
170164

171-
except Exception as ex:
172-
raise(ConnectorException(ex))
173165

174166

175167

@@ -182,34 +174,40 @@ def _execute(n_runs, params: dict()):
182174
queries = HyPer.get_cases_queries(params, table_defs)
183175
data = HyPer.get_data(params, table_defs)
184176

177+
185178
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
186179
times = None
187180
q = multiprocessing.Queue()
188181
p = multiprocessing.Process(target=run_multiple_queries, args=(connection, queries.values(), data, q))
189-
try:
190-
p.start()
191-
p.join(timeout=timeout)
182+
p.start()
183+
p.join(timeout=timeout)
184+
if p.is_alive():
185+
# timeout happened
186+
times = list(zip(queries.keys(), [TIMEOUT_PER_CASE for _ in range(len(params['cases']))]))
187+
188+
p.terminate() # try to shut down gracefully
189+
p.join(timeout=1) # wait for process to terminate
192190
if p.is_alive():
193-
# timeout happened
194-
times = times = list(zip(queries.keys(), [TIMEOUT_PER_CASE for _ in range(len(params['cases']))]))
195-
else:
196-
times = q.get()
197-
times = list(zip(queries.keys(), list(map(lambda t: float(f'{t:.3f}'), times))))
191+
p.kill() # kill if process did not terminate in time
192+
assert(not p.is_alive())
193+
else:
194+
times = q.get()
195+
times = list(zip(queries.keys(), list(map(lambda t: float(f'{t:.3f}'), times))))
198196

199-
for case, time in times:
200-
if case not in measurement_times.keys():
201-
measurement_times[case] = list()
202-
measurement_times[case].append(time)
197+
for case, time in times:
198+
if case not in measurement_times.keys():
199+
measurement_times[case] = list()
200+
measurement_times[case].append(time)
203201

204-
except Exception as ex:
205-
raise(ConnectorException(ex))
202+
connection.close()
203+
hyper.close()
204+
continue
206205

207206

208207

209208
return {'HyPer': measurement_times}
210209

211210

212-
213211
# returns dict of {table_name: table_def} for each table_def
214212
@staticmethod
215213
def get_all_table_defs(params: dict):
@@ -289,4 +287,3 @@ def get_data(params: dict, table_defs: dict):
289287
}
290288
result.append((table_defs[table_name], file, par))
291289
return result
292-

0 commit comments

Comments
 (0)