Skip to content

Commit 4241f72

Browse files
[Benchmark] Fix PostgreSQL timeout and clean-up.
- The *process* of the query within the PostgreSQL server can reach a timeout. In that case, we kill the `psql` client. This, however, does not stop the *process* within the server. Consequently, clean-up after an experiment trying to run `DROP DATABASE` will trigger an error, as a table in the DB to drop is still in use. To properly *"kill"* the server *process* we install *in PostgreSQL* a timeout with `set statement_timeout`. This has the server kill *processes* that reach the specified timeout. - The clean-up after an experiment must `DROP DATABASE` the table with the benchmark input data. This was run incorrectly in one iteration of a loop right before the next iteration would attempt to that exact database, which would then have been dropped. We generally change the structure of the code to use *local* `psycopg2` connections and always close them properly in a `finally` section. Further, the `run_command()` no longer runs `clean_up()` itself.
1 parent e0478b0 commit 4241f72

File tree

1 file changed

+72
-73
lines changed

1 file changed

+72
-73
lines changed

benchmark/database_connectors/postgresql.py

Lines changed: 72 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ def execute(self, n_runs, params: dict):
3030
experiment = params['name']
3131
tqdm.write(f'` Perform experiment {suite}/{benchmark}/{experiment} with configuration PostgreSQL.')
3232

33-
try:
34-
self.clean_up()
35-
except psycopg2.OperationalError as ex:
36-
raise ConnectorException(str(ex))
37-
3833
# map that is returned with the measured times
3934
measurement_times = dict()
4035

@@ -47,21 +42,24 @@ def execute(self, n_runs, params: dict):
4742

4843
verbose_printed = False
4944
for _ in range(n_runs):
45+
# Set up database
46+
self.setup()
47+
48+
# Connect to database and set up tables
49+
connection = psycopg2.connect(**db_options)
5050
try:
51-
# Set up database
52-
self.setup()
53-
connection = psycopg2.connect(**db_options)
5451
connection.autocommit = True
5552
cursor = connection.cursor()
5653
cursor.execute("set jit=off;")
5754
self.create_tables(cursor, params['data'], with_scale_factors)
55+
finally:
5856
connection.close()
5957

60-
61-
# If tables contain scale factors, they have to be loaded separately for every case
62-
if (with_scale_factors or not bool(params.get('readonly'))):
63-
for case, query_stmt in params['cases'].items():
64-
connection = psycopg2.connect(**db_options)
58+
# If tables contain scale factors, they have to be loaded separately for every case
59+
if (with_scale_factors or not bool(params.get('readonly'))):
60+
for case, query_stmt in params['cases'].items():
61+
connection = psycopg2.connect(**db_options)
62+
try:
6563
connection.autocommit = True
6664
cursor = connection.cursor()
6765
# Create tables from tmp tables with scale factor
@@ -74,49 +72,18 @@ def execute(self, n_runs, params: dict):
7472
num_rows = round((table['lines_in_file'] - header) * sf)
7573
cursor.execute(f"DELETE FROM {table_name};") # empty existing table
7674
cursor.execute(f"INSERT INTO {table_name} SELECT * FROM {table_name}_tmp LIMIT {num_rows};") # copy data with scale factor
75+
finally:
7776
connection.close()
7877

79-
# Write case/query to a file that will be passed to the command to execute
80-
with open(TMP_SQL_FILE, "w") as tmp:
81-
tmp.write("\\timing on\n")
82-
tmp.write(query_stmt + '\n')
83-
tmp.write("\\timing off\n")
84-
85-
# Execute query as benchmark and get measurement time
86-
command = f"psql -U {db_options['user']} -d {db_options['dbname']} -f {TMP_SQL_FILE} | grep 'Time' | cut -d ' ' -f 2"
87-
if self.verbose:
88-
tqdm.write(f" $ {command}")
89-
if not verbose_printed:
90-
verbose_printed = True
91-
with open(TMP_SQL_FILE) as tmp:
92-
tqdm.write(" " + " ".join(tmp.readlines()))
93-
94-
timeout = TIMEOUT_PER_CASE
95-
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
96-
try:
97-
durations = self.run_command(command, timeout, benchmark_info)
98-
except ExperimentTimeoutExpired as ex:
99-
if case not in measurement_times.keys():
100-
measurement_times[case] = list()
101-
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
102-
else:
103-
for idx, line in enumerate(durations):
104-
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
105-
if case not in measurement_times.keys():
106-
measurement_times[case] = list()
107-
measurement_times[case].append(time)
108-
109-
110-
# Otherwise, tables have to be created just once before the measurements (done above)
111-
else:
112-
# Write cases/queries to a file that will be passed to the command to execute
78+
# Write case/query to a file that will be passed to the command to execute
11379
with open(TMP_SQL_FILE, "w") as tmp:
80+
tmp.write(f'set statement_timeout = {TIMEOUT_PER_CASE * 1000:.0f};\n')
11481
tmp.write("\\timing on\n")
115-
for case_query in params['cases'].values():
116-
tmp.write(case_query + '\n')
82+
tmp.write(query_stmt + '\n')
11783
tmp.write("\\timing off\n")
84+
tmp.write(f'set statement_timeout = 0;\n')
11885

119-
# Execute query file and collect measurement data
86+
# Execute query as benchmark and get measurement time
12087
command = f"psql -U {db_options['user']} -d {db_options['dbname']} -f {TMP_SQL_FILE} | grep 'Time' | cut -d ' ' -f 2"
12188
if self.verbose:
12289
tqdm.write(f" $ {command}")
@@ -125,27 +92,59 @@ def execute(self, n_runs, params: dict):
12592
with open(TMP_SQL_FILE) as tmp:
12693
tqdm.write(" " + " ".join(tmp.readlines()))
12794

128-
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
95+
timeout = TIMEOUT_PER_CASE
12996
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
13097
try:
13198
durations = self.run_command(command, timeout, benchmark_info)
13299
except ExperimentTimeoutExpired as ex:
133-
for case in params['cases'].keys():
134-
if case not in measurement_times.keys():
135-
measurement_times[case] = list()
136-
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
100+
if case not in measurement_times.keys():
101+
measurement_times[case] = list()
102+
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
137103
else:
138104
for idx, line in enumerate(durations):
139105
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
140-
case = list(params['cases'].keys())[idx]
141106
if case not in measurement_times.keys():
142107
measurement_times[case] = list()
143108
measurement_times[case].append(time)
144109

145-
finally:
146-
if(connection):
147-
connection.close()
148-
self.clean_up()
110+
# Otherwise, tables have to be created just once before the measurements (done above)
111+
else:
112+
# Write cases/queries to a file that will be passed to the command to execute
113+
with open(TMP_SQL_FILE, "w") as tmp:
114+
tmp.write(f'set statement_timeout = {TIMEOUT_PER_CASE * 1000:.0f};\n')
115+
tmp.write("\\timing on\n")
116+
for case_query in params['cases'].values():
117+
tmp.write(case_query + '\n')
118+
tmp.write("\\timing off\n")
119+
tmp.write(f'set statement_timeout = 0;\n')
120+
121+
# Execute query file and collect measurement data
122+
command = f"psql -U {db_options['user']} -d {db_options['dbname']} -f {TMP_SQL_FILE} | grep 'Time' | cut -d ' ' -f 2"
123+
if self.verbose:
124+
tqdm.write(f" $ {command}")
125+
if not verbose_printed:
126+
verbose_printed = True
127+
with open(TMP_SQL_FILE) as tmp:
128+
tqdm.write(" " + " ".join(tmp.readlines()))
129+
130+
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
131+
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
132+
try:
133+
durations = self.run_command(command, timeout, benchmark_info)
134+
except ExperimentTimeoutExpired as ex:
135+
for case in params['cases'].keys():
136+
if case not in measurement_times.keys():
137+
measurement_times[case] = list()
138+
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
139+
else:
140+
for idx, line in enumerate(durations):
141+
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
142+
case = list(params['cases'].keys())[idx]
143+
if case not in measurement_times.keys():
144+
measurement_times[case] = list()
145+
measurement_times[case].append(time)
146+
147+
self.clean_up()
149148

150149
return {'PostgreSQL': measurement_times}
151150

@@ -154,25 +153,26 @@ def execute(self, n_runs, params: dict):
154153
def setup(self):
155154
# Delete existing 'benchmark_tmp' database and create a new empty one
156155
connection = psycopg2.connect(user=db_options['user'])
157-
connection.autocommit = True
158-
cursor = connection.cursor()
159-
cursor.execute(f"DROP DATABASE IF EXISTS {db_options['dbname']};")
160-
cursor.execute(f"CREATE DATABASE {db_options['dbname']};")
161-
connection.close()
156+
try:
157+
connection.autocommit = True
158+
cursor = connection.cursor()
159+
cursor.execute(f"DROP DATABASE IF EXISTS {db_options['dbname']};")
160+
cursor.execute(f"CREATE DATABASE {db_options['dbname']};")
161+
finally:
162+
connection.close()
162163

163164

164165
# Deletes the used temporary database and file
165166
def clean_up(self):
166167
connection = psycopg2.connect(user=db_options['user'])
167-
connection.autocommit = True
168-
cursor = connection.cursor()
169168
try:
169+
connection.autocommit = True
170+
cursor = connection.cursor()
170171
cursor.execute(f"DROP DATABASE IF EXISTS {db_options['dbname']};")
171-
except Exception as ex:
172-
tqdm.write(f"Unexpeced error while executing 'DROP DATABASE IF EXISTS {db_options['dbname']}' : {ex}")
173-
connection.close()
174-
if os.path.exists(TMP_SQL_FILE):
175-
os.remove(TMP_SQL_FILE)
172+
finally:
173+
connection.close()
174+
if os.path.exists(TMP_SQL_FILE):
175+
os.remove(TMP_SQL_FILE)
176176

177177

178178
# Parse attributes of one table, return as string ready for a CREATE TABLE query
@@ -245,7 +245,6 @@ def run_command(self, command, timeout, benchmark_info):
245245
out, err = process.communicate("".encode('latin-1'), timeout=timeout)
246246
except subprocess.TimeoutExpired:
247247
process.kill()
248-
self.clean_up()
249248
raise ExperimentTimeoutExpired(f'Query timed out after {timeout} seconds')
250249
finally:
251250
if process.poll() is None: # if process is still alive

0 commit comments

Comments
 (0)