Skip to content

Commit 662f2c3

Browse files
[Benchmark] Explicitly flush after tqdm.write().
Apparently, calls to `tqdm.write()` do *not* flush the output to the output stream (`sys.stdout`). We now do this manually after calls to `tqdm.write()`.
1 parent 7c45938 commit 662f2c3

File tree

7 files changed

+40
-13
lines changed

7 files changed

+40
-13
lines changed

benchmark/Benchmark.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ def perform_experiment(yml, conn, info, results) -> list():
255255
experiment_times = conn.execute(N_RUNS, params)
256256
except connector.ConnectorException as ex:
257257
tqdm.write(f"\nAn error occurred for {system} while executing {path_to_file}: {str(ex)}\n")
258+
sys.stdout.flush()
258259
raise BenchmarkError()
259260

260261
# Add measurements to result
@@ -350,10 +351,12 @@ def perform_experiment(yml, conn, info, results) -> list():
350351
if output_csv_file:
351352
if not os.path.isfile(output_csv_file): # file does not yet exist
352353
tqdm.write(f'Writing measurements to \'{output_csv_file}\'.')
354+
sys.stdout.flush()
353355
with open(output_csv_file, 'w') as csv:
354356
csv.write('commit,date,version,suite,benchmark,experiment,name,config,case,time\n')
355357
else:
356358
tqdm.write(f'Adding measurements to \'{output_csv_file}\'.')
359+
sys.stdout.flush()
357360

358361
# A central object to collect all measurements of all experiments. Has the following structure:
359362
#
@@ -384,6 +387,7 @@ def perform_experiment(yml, conn, info, results) -> list():
384387
# Validate schema
385388
if not validate_schema(path_to_file, YML_SCHEMA):
386389
tqdm.write(f'Benchmark file "{path_to_file}" violates schema.')
390+
sys.stdout.flush()
387391
continue
388392

389393
with open(path_to_file, 'r') as yml_file:
@@ -409,6 +413,7 @@ def perform_experiment(yml, conn, info, results) -> list():
409413
tqdm.write('\n\n==========================================================')
410414
tqdm.write(f'Perform benchmarks in \'{path_to_file}\'.')
411415
tqdm.write('==========================================================')
416+
sys.stdout.flush()
412417

413418
# Perform experiment for each system
414419
for system in yml.get('systems').keys():

benchmark/__init__.py

Whitespace-only changes.

benchmark/database_connectors/__init__.py

Whitespace-only changes.

benchmark/database_connectors/duckdb.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from .connector import *
22

3-
import os
3+
from tqdm import tqdm
44
import json
5+
import os
56
import subprocess
6-
from tqdm import tqdm
7+
import sys
78

89

910
TMP_DB = 'tmp.duckdb'
@@ -22,6 +23,7 @@ def execute(self, n_runs, params: dict):
2223
experiment = params['name']
2324
configname = f'DuckDB ({get_num_cores()} cores)' if self.multithreaded else 'DuckDB (single core)'
2425
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment} with configuration {configname}.')
26+
sys.stdout.flush()
2527

2628
self.clean_up()
2729

@@ -74,6 +76,7 @@ def execute(self, n_runs, params: dict):
7476
if self.verbose and not verbose_printed:
7577
verbose_printed = True
7678
tqdm.write(combined_query)
79+
sys.stdout.flush()
7780

7881
benchmark_info = f"{suite}/{benchmark}/{experiment} [{configname}]"
7982
try:
@@ -102,6 +105,7 @@ def execute(self, n_runs, params: dict):
102105
if self.verbose and not verbose_printed:
103106
verbose_printed = True
104107
tqdm.write(combined_query)
108+
sys.stdout.flush()
105109

106110
benchmark_info = f"{suite}/{benchmark}/{experiment} [{configname}]"
107111
try:
@@ -209,6 +213,7 @@ def run_query(self, query, timeout, benchmark_info):
209213
except subprocess.TimeoutExpired:
210214
process.kill()
211215
tqdm.write(f" ! Query \n'{query}'\n' timed out after {timeout} seconds")
216+
sys.stdout.flush()
212217
raise ExperimentTimeoutExpired(f'Query timed out after {timeout} seconds')
213218
finally:
214219
if process.poll() is None: # if process is still alive
@@ -231,6 +236,7 @@ def run_query(self, query, timeout, benchmark_info):
231236
{err}
232237
==================
233238
''')
239+
sys.stdout.flush()
234240
if process.returncode:
235241
raise ConnectorException(f'Benchmark failed with return code {process.returncode}.')
236242

@@ -240,4 +246,4 @@ def run_query(self, query, timeout, benchmark_info):
240246
durations.remove('')
241247
durations = [float(i.replace("\n", "").replace(",", ".")) for i in durations]
242248

243-
return durations
249+
return durations

benchmark/database_connectors/hyper.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33

44
from tableauhyperapi import HyperProcess, Telemetry, Connection, CreateMode, NOT_NULLABLE, NULLABLE, SqlType, \
55
TableDefinition, Inserter, escape_name, escape_string_literal, HyperException, TableName
6-
import time
7-
import os
8-
import sys
9-
import subprocess
106
from tqdm import tqdm
117
import multiprocessing
8+
import os
9+
import subprocess
10+
import sys
11+
import sys
12+
import time
1213

1314

1415
# Converting table names to lower case is needed because then
@@ -41,6 +42,7 @@ def execute(self, n_runs, params: dict()):
4142
experiment = params['name']
4243
suffix = f' ({get_num_cores()} cores)' if self.multithreaded else ' (single core)'
4344
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment} with configuration HyPer{suffix}.')
45+
sys.stdout.flush()
4446

4547
result = None
4648
if self.multithreaded:
@@ -49,6 +51,7 @@ def execute(self, n_runs, params: dict()):
4951
result = HyPer._execute(n_runs, params)
5052
except Exception as ex:
5153
tqdm.write(str(ex))
54+
sys.stdout.flush()
5255
return dict()
5356

5457
else:
@@ -62,6 +65,7 @@ def execute(self, n_runs, params: dict()):
6265
args = ['taskset', '-c', '2', 'python3', '-c', script]
6366
if self.verbose:
6467
tqdm.write(f" $ {' '.join(args)}")
68+
sys.stdout.flush()
6569
# try:
6670
# P = subprocess.run(
6771
# args=args,

benchmark/database_connectors/mutable.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
from tqdm import tqdm
44
import os
55
import pandas
6-
import subprocess
76
import re
7+
import subprocess
8+
import sys
89

910
class BenchmarkError(Exception):
1011
pass
@@ -84,6 +85,7 @@ def run_configuration(self, experiment, config_name, config, yml):
8485
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment} with configuration {config_name}.')
8586
else:
8687
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment}.')
88+
sys.stdout.flush()
8789

8890
# Get database schema
8991
schema = os.path.join(os.path.dirname(path_to_file), 'data', 'schema.sql')
@@ -125,6 +127,7 @@ def run_configuration(self, experiment, config_name, config, yml):
125127
durations = self.benchmark_query(command, query, yml['pattern'], timeout, path_to_file)
126128
except BenchmarkTimeoutException as ex:
127129
tqdm.write(str(ex))
130+
sys.stdout.flush()
128131
# Add timeout durations
129132
for case in cases.keys():
130133
execution_times[case] = TIMEOUT_PER_CASE * 1000
@@ -147,11 +150,13 @@ def run_configuration(self, experiment, config_name, config, yml):
147150
durations = self.benchmark_query(command, query_str, yml['pattern'], timeout, path_to_file)
148151
except BenchmarkTimeoutException as ex:
149152
tqdm.write(str(ex))
153+
sys.stdout.flush()
150154
execution_times[case] = timeout * 1000
151155
else:
152156
execution_times[case] = durations[0]
153157
except BenchmarkError as ex:
154158
tqdm.write(str(ex))
159+
sys.stdout.flush()
155160

156161
return execution_times
157162

@@ -195,6 +200,7 @@ def benchmark_query(self, command, query, pattern, timeout, path_to_file):
195200
{err}
196201
==================
197202
''')
203+
sys.stdout.flush()
198204
if process.returncode:
199205
raise ConnectorException(f'Benchmark failed with return code {process.returncode}.')
200206

@@ -253,3 +259,4 @@ def print_command(command :list, query :str, indent = ''):
253259
query_str = query.strip().replace('\n', ' ').replace('"', '\\"')
254260
command_str = ' '.join(command)
255261
tqdm.write(f'{indent}$ echo "{query_str}" | {command_str}')
262+
sys.stdout.flush()

benchmark/database_connectors/postgresql.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from .connector import *
22

3-
import time
3+
from psycopg2.extras import LoggingConnection, LoggingCursor
4+
from tqdm import tqdm
5+
import logging
46
import os
57
import psycopg2
68
import psycopg2.extensions
7-
from psycopg2.extras import LoggingConnection, LoggingCursor
8-
import logging
9-
import subprocess
109
import shlex
11-
from tqdm import tqdm
10+
import subprocess
11+
import sys
12+
import time
1213

1314
db_options = {
1415
'dbname': 'benchmark_tmp',
@@ -29,6 +30,7 @@ def execute(self, n_runs, params: dict):
2930
benchmark = params['benchmark']
3031
experiment = params['name']
3132
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment} with configuration PostgreSQL.')
33+
sys.stdout.flush()
3234

3335
# map that is returned with the measured times
3436
measurement_times = dict()
@@ -91,6 +93,7 @@ def execute(self, n_runs, params: dict):
9193
verbose_printed = True
9294
with open(TMP_SQL_FILE) as tmp:
9395
tqdm.write(" " + " ".join(tmp.readlines()))
96+
sys.stdout.flush()
9497

9598
timeout = TIMEOUT_PER_CASE
9699
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
@@ -126,6 +129,7 @@ def execute(self, n_runs, params: dict):
126129
verbose_printed = True
127130
with open(TMP_SQL_FILE) as tmp:
128131
tqdm.write(" " + " ".join(tmp.readlines()))
132+
sys.stdout.flush()
129133

130134
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
131135
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
@@ -269,6 +273,7 @@ def run_command(self, command, timeout, benchmark_info):
269273
{err}
270274
==================
271275
''')
276+
sys.stdout.flush()
272277
if process.returncode:
273278
raise ConnectorException(f'Benchmark failed with return code {process.returncode}.')
274279

0 commit comments

Comments
 (0)