5
5
from tableauhyperapi import HyperProcess , Telemetry , Connection , CreateMode , NOT_NULLABLE , NULLABLE , SqlType , \
6
6
TableDefinition , Inserter , escape_name , escape_string_literal , HyperException , TableName
7
7
from typeguard import typechecked
8
- from typing import Any
8
+ from typing import Any , Sequence
9
9
import os
10
- import subprocess
11
-
12
10
13
11
14
12
@typechecked
@@ -25,10 +23,9 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
25
23
suffix : str = f' ({ get_num_cores ()} cores)' if self .multithreaded else ' (single core)'
26
24
tqdm_print (f'` Perform experiment { suite } /{ benchmark } /{ experiment } with configuration HyPer{ suffix } .' )
27
25
28
- path : str = os .getcwd ()
29
26
script = f'''
30
27
import sys
31
- sys.path.insert(0, '{ path } /benchmark')
28
+ sys.path.insert(0, '{ os . getcwd () } /benchmark')
32
29
import database_connectors.hyper
33
30
print(repr(database_connectors.hyper.HyPer._execute({ n_runs } , { repr (params )} )))
34
31
sys.stdout.flush()
@@ -44,36 +41,18 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
44
41
tqdm_print (f" $ { ' ' .join (args )} " )
45
42
46
43
timeout : int = n_runs * (DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len (params ['cases' ]))
47
- process = subprocess .Popen (
48
- args = args ,
49
- stdout = subprocess .PIPE ,
50
- stderr = subprocess .PIPE ,
51
- cwd = path
52
- )
53
44
54
45
try :
55
- process . wait ( timeout = timeout )
56
- except subprocess . TimeoutExpired :
57
- tqdm_print ( f'Benchmark timed out after { timeout } seconds' )
58
- # Set execution time of every case of every run to timeout
59
- times : list [float ] = [float (TIMEOUT_PER_CASE * 1000 ) for _ in range (n_runs )]
46
+ out : str = self . benchmark_query ( command = args , query = '' , timeout = timeout ,
47
+ benchmark_info = f' { suite } / { benchmark } / { experiment } [HyPer { suffix } ]' ,
48
+ verbose = self . verbose )
49
+ except ExperimentTimeoutExpired :
50
+ times : list [float ] = [float (TIMEOUT_PER_CASE * 1000 ) for _ in range (n_runs )]
60
51
config_result : ConfigResult = {case : times for case in params ['cases' ].keys ()}
61
52
result : ConnectorResult = {f'HyPer{ suffix } ' : config_result }
62
53
return result
63
- finally :
64
- if process .poll () is None : # if process is still alive
65
- process .terminate () # try to shut down gracefully
66
- try :
67
- process .wait (timeout = 15 ) # give process 15 seconds to terminate
68
- except subprocess .TimeoutExpired :
69
- process .kill () # kill if process did not terminate in time
70
-
71
- # Check returncode
72
- if process .returncode == 0 :
73
- result : ConnectorResult = eval (process .stdout .read ().decode ('latin-1' ))
74
- else :
75
- raise ConnectorException (f"Process failed with return code { process .returncode } . Details:\n { process .stderr .read ().decode ('latin-1' )} " )
76
54
55
+ result : ConnectorResult = eval (out )
77
56
patched_result : ConnectorResult = dict ()
78
57
for key , val in result .items ():
79
58
patched_result [f'{ key } { suffix } ' ] = val
@@ -255,3 +234,9 @@ def get_data(params: dict[str, Any], table_defs: dict[str, TableDefinition]) ->
255
234
}
256
235
result .append ((table_defs [table_name ], file , par ))
257
236
return result
237
+
238
+ def print_command (self , command : str | bytes | Sequence [str | bytes ], query : str , indent : str = '' ) -> None :
239
+ # hyper connector only uses list[str] as command
240
+ if command is not list [str ]:
241
+ pass
242
+ tqdm_print (f" $ { ' ' .join (command )} " )
0 commit comments