Skip to content

Commit 9c4af45

Browse files
author
Tobias Kopp
committed
[Benchmark] Remove cache influence in duckdb and postgres connector
1 parent dad095e commit 9c4af45

File tree

2 files changed

+65
-41
lines changed

2 files changed

+65
-41
lines changed

benchmark/database_connectors/duckdb.py

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import os
77

88

9-
TMP_DB = 'tmp.duckdb'
9+
TMP_DB: str = 'tmp.duckdb'
10+
COMPLETE_TABLE_SUFFIX: str = '_complete'
1011

1112

1213
@typechecked
@@ -56,7 +57,9 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
5657

5758
try:
5859
# Set up database
59-
create_tbl_stmts: list[str] = self.generate_create_table_stmts(params['data'], self.check_with_scale_factors(params))
60+
complete_tables: list[str]
61+
actual_tables: list[str]
62+
complete_tables, actual_tables = self.generate_create_table_stmts(params['data'])
6063

6164
if self.check_execute_single_cases(params):
6265
# Execute cases singly
@@ -69,8 +72,9 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
6972

7073
statements: list[str] = list()
7174
if i == 0:
72-
# Also use the create_tbl_stmts in this case
73-
statements.extend(create_tbl_stmts)
75+
# Also use the CREATE TABLE stmts in this case
76+
statements.extend(complete_tables)
77+
statements.extend(actual_tables)
7478

7579
# Create tables from tmp tables with scale factor
7680
for table_name, table in params['data'].items():
@@ -82,7 +86,7 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
8286
sf = 1
8387
header: int = int(table.get('header', 0))
8488
num_rows: int = round((table['lines_in_file'] - header) * sf)
85-
statements.append(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}_tmp" LIMIT {num_rows};')
89+
statements.append(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}{COMPLETE_TABLE_SUFFIX}" LIMIT {num_rows};')
8690

8791
statements.append(".timer on")
8892
statements.append(query_stmt) # Actual query from this case
@@ -114,13 +118,23 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
114118
timeout: int = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(cases) * n_runs
115119

116120
statements: list[str] = list()
117-
statements.extend(create_tbl_stmts)
121+
statements.extend(complete_tables)
122+
statements.extend(actual_tables)
123+
124+
# Dropping and recreating tables in between runs removes any cache influences
125+
refill_stmts: list[str] = list()
126+
for name, table in params['data'].items():
127+
refill_stmts.append(f'DROP TABLE "{name}";')
128+
refill_stmts.extend(actual_tables)
129+
for name, table in params['data'].items():
130+
refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");')
118131

119-
statements.append(".timer on")
120132
for _ in range(n_runs):
133+
statements.extend(refill_stmts)
134+
statements.append(".timer on")
121135
for case_query in cases.values():
122136
statements.append(case_query)
123-
statements.append(".timer off")
137+
statements.append(".timer off")
124138

125139
combined_query: str = "\n".join(statements)
126140

@@ -157,20 +171,17 @@ def clean_up(self) -> None:
157171
os.remove(TMP_DB)
158172

159173

160-
# Creates tables in the database and copies contents of given files into them
161-
# Call with 'with_scale_factors'=False if data should be loaded as a whole
162-
# Call with 'with_scale_factors'=True if data should be placed in tmp tables
163-
# and copied for each case with different scale factor
164-
def generate_create_table_stmts(self, data: dict[str, dict[str, Any]], with_scale_factors: bool) -> list[str]:
165-
statements: list[str] = list()
174+
# Creates tables in the database and copies contents of given files into them.
175+
# The complete data is in the 'T_complete' tables. For the individual cases the actual table T
176+
# can be filled using 'INSERT INTO T (SELECT * FROM T_complete LIMIT x)'
177+
def generate_create_table_stmts(self, data: dict[str, dict[str, Any]]) -> tuple[list[str],list[str]]:
178+
complete_tables: list[str] = list()
179+
actual_tables: list[str] = list()
166180
for table_name, table in data.items():
167181
columns: str = Connector.parse_attributes(self.DUCKDB_TYPE_PARSER, table['attributes'])
168182

169-
if with_scale_factors:
170-
table_name += "_tmp"
171-
172-
create: str = f'CREATE TABLE "{table_name}" {columns};'
173-
copy: str = f'COPY "{table_name}" FROM \'{table["file"]}\' ( '
183+
create: str = f'CREATE TABLE "{table_name}{COMPLETE_TABLE_SUFFIX}" {columns};'
184+
copy: str = f'COPY "{table_name}{COMPLETE_TABLE_SUFFIX}" FROM \'{table["file"]}\' ( '
174185
if 'delimiter' in table:
175186
delim = table['delimiter'].replace("'", "")
176187
copy += f" DELIMITER \'{delim}\',"
@@ -181,14 +192,13 @@ def generate_create_table_stmts(self, data: dict[str, dict[str, Any]], with_scal
181192

182193
copy = copy[:-1] + " );"
183194

184-
statements.append(create)
185-
statements.append(copy)
195+
complete_tables.append(create)
196+
complete_tables.append(copy)
186197

187-
if with_scale_factors:
188-
# Create actual table that will be used for experiment
189-
statements.append(f'CREATE TABLE "{table_name[:-4]}" {columns};')
198+
# Create actual table that will be used for experiment
199+
actual_tables.append(f'CREATE TABLE "{table_name}" {columns};')
190200

191-
return statements
201+
return complete_tables, actual_tables
192202

193203

194204
# Parse `results` for timings

benchmark/database_connectors/postgresql.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,26 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
118118

119119
else:
120120
# Prepare db
121-
self.prepare_db(params)
121+
actual_tables: list[str] = self.prepare_db(params)
122+
123+
# Dropping and recreating tables in between runs removes any cache influences
124+
refill_stmts: list[str] = list()
125+
for name, table in params['data'].items():
126+
refill_stmts.append(f'DROP TABLE "{name}";')
127+
refill_stmts.extend(actual_tables)
128+
for name, table in params['data'].items():
129+
refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");')
122130

123-
# Otherwise, tables have to be created just once before the measurements (done above)
124131
# Write cases/queries to a file that will be passed to the command to execute
125132
with open(TMP_SQL_FILE, "w") as tmp:
126133
tmp.write(f'set statement_timeout = {TIMEOUT_PER_CASE * 1000:.0f};\n')
127-
tmp.write("\\timing on\n")
128134
for _ in range(n_runs):
135+
for stmt in refill_stmts:
136+
tmp.write(stmt + '\n')
137+
tmp.write("\\timing on\n")
129138
for case_query in cases.values():
130139
tmp.write(case_query + '\n')
131-
tmp.write("\\timing off\n")
140+
tmp.write("\\timing off\n")
132141
tmp.write(f'set statement_timeout = 0;\n')
133142

134143
# Execute query file and collect measurement data
@@ -184,7 +193,7 @@ def clean_up(self) -> None:
184193
os.remove(TMP_SQL_FILE)
185194

186195

187-
def prepare_db(self, params: dict[str, Any]) -> None:
196+
def prepare_db(self, params: dict[str, Any]) -> list[str]:
188197
# Set up database
189198
self.setup()
190199

@@ -194,23 +203,24 @@ def prepare_db(self, params: dict[str, Any]) -> None:
194203
connection.autocommit = True
195204
cursor = connection.cursor()
196205
cursor.execute("set jit=off;")
197-
self.create_tables(cursor, params['data'], self.check_execute_single_cases(params))
206+
actual_tables: list[str] = self.create_tables(cursor, params['data'])
198207
finally:
199208
connection.close()
200209
del connection
201-
202-
203-
# Creates tables in the database and copies contents of given files into them
204-
# Call with 'with_scale_factors'=False if data should be loaded as a whole
205-
# Call with 'with_scale_factors'=True if data should be placed in tmp tables
206-
# and copied for each case with different scale factor
207-
def create_tables(self, cursor: psycopg2.extensions.cursor, data: dict[str, dict[str, Any]], with_scale_factors: bool) -> None:
210+
return actual_tables
211+
212+
# Creates tables in the database and copies contents of given files into them.
213+
# The complete data is in the 'T_complete' tables. For the individual cases the actual table T
214+
# can be filled using 'INSERT INTO T (SELECT * FROM T_complete LIMIT x)'
215+
# Returns the list of actual table statements
216+
def create_tables(self, cursor: psycopg2.extensions.cursor, data: dict[str, dict[str, Any]]) -> list[str]:
217+
actual_tables: list[str] = list()
208218
for table_name, table in data.items():
209219
columns: str = Connector.parse_attributes(self.POSTGRESQL_TYPE_PARSER, table['attributes'])
210220

211221
# Use an additional table with the *complete* data set to quickly recreate the table with the benchmark
212222
# data, in case of varying scale factor.
213-
complete_table_name: str = table_name + COMPLETE_TABLE_SUFFIX if with_scale_factors else table_name
223+
complete_table_name: str = table_name + COMPLETE_TABLE_SUFFIX
214224
quoted_table_name: str = f'"{complete_table_name}"'
215225

216226
create: str = f"CREATE UNLOGGED TABLE {quoted_table_name} {columns};"
@@ -232,8 +242,12 @@ def create_tables(self, cursor: psycopg2.extensions.cursor, data: dict[str, dict
232242
except psycopg2.errors.BadCopyFileFormat as ex:
233243
raise ConnectorException(str(ex))
234244

235-
if with_scale_factors:
236-
cursor.execute(f'CREATE UNLOGGED TABLE "{table_name}" {columns};') # Create actual table that will be used for experiment
245+
# Create actual table that will be used for experiment
246+
actual: str = f'CREATE UNLOGGED TABLE "{table_name}" {columns};'
247+
cursor.execute(actual)
248+
actual_tables.append(actual)
249+
250+
return actual_tables
237251

238252

239253
# Parse `results` for timings

0 commit comments

Comments
 (0)