1
+ from .connector import *
2
+
3
+ import duckdb
4
+ import os
5
+ import json
6
+
7
+
8
+ TMP_DB = 'tmp.duckdb'
9
+ TMP_SQL_FILE = 'tmp.sql'
10
+
11
+ # TODO way of measuring time is wrong. Use duckdb_cli like in older version.
12
+
13
+ class DuckDB (Connector ):
14
+
15
+ def __new__ (cls , * args , ** kwargs ):
16
+ return super ().__new__ (cls )
17
+
18
+
19
+ def __init__ (self , duckdb_cli , verbose = False ):
20
+ self .duckdb_cli = duckdb_cli
21
+
22
+
23
+ # Runs an experiment 'n_runs' times, all parameters are in 'params'
24
+ def execute (self , n_runs , params : dict ):
25
+ self .clean_up ()
26
+
27
+ measurement_times = dict () # map that is returned with the measured times
28
+
29
+ # Check wether tables contain scale factors
30
+ with_scale_factors = False
31
+ for table in params ['data' ].values ():
32
+ if (table .get ('scale_factors' )):
33
+ with_scale_factors = True
34
+ break
35
+
36
+ for _ in range (n_runs ):
37
+ try :
38
+ # Set up database
39
+ self .generate_create_table_stmts (params ['data' ], with_scale_factors )
40
+
41
+
42
+ # If tables contain scale factors, they have to be loaded separately for every case
43
+ if (with_scale_factors and bool (params .get ('readonly' ))):
44
+ # Write cases/queries to a file that will be passed to the command to execute
45
+ statements = list ()
46
+ for case , query_stmt in params ['cases' ].items ():
47
+ # Create tables from tmp tables with scale factor
48
+ for table_name , table in params ['data' ].items ():
49
+ statements .append (f"DELETE FROM { table_name } ;" ) # empty existing table
50
+ if table .get ('scale_factors' ):
51
+ sf = table ['scale_factors' ][case ]
52
+ else :
53
+ sf = 1
54
+ header = int (table .get ('header' , 0 ))
55
+ num_rows = round ((table ['lines_in_file' ] - header ) * sf )
56
+ statements .append (f"INSERT INTO { table_name } SELECT * FROM { table_name } _tmp LIMIT { num_rows } ;" )
57
+
58
+ statements .append (".timer on" )
59
+ statements .append (query_stmt ) # Actual query from this case
60
+ statements .append (".timer off" )
61
+
62
+ # Append statements to file
63
+ with open (TMP_SQL_FILE , "a+" ) as tmp :
64
+ for stmt in statements :
65
+ tmp .write (stmt + "\n " )
66
+
67
+
68
+
69
+ # Otherwise, tables have to be created just once before the measurements (done above)
70
+ else :
71
+ # Write cases/queries to a file that will be passed to the command to execute
72
+ with open (TMP_SQL_FILE , "a+" ) as tmp :
73
+ tmp .write (".timer on\n " )
74
+ for case_query in params ['cases' ].values ():
75
+ tmp .write (case_query + '\n ' )
76
+ tmp .write (".timer off\n " )
77
+
78
+
79
+ # Execute query file and collect measurement data
80
+ command = f"./{ self .duckdb_cli } { TMP_DB } < { TMP_SQL_FILE } " + " | grep 'Run Time' | cut -d ' ' -f 5 | awk '{print $1 * 1000;}'"
81
+ stream = os .popen (f'{ command } ' )
82
+ for idx , line in enumerate (stream ):
83
+ time = float (line .replace ("\n " , "" ).replace ("," , "." )) # in milliseconds
84
+ case = list (params ['cases' ].keys ())[idx ]
85
+ if case not in measurement_times .keys ():
86
+ measurement_times [case ] = list ()
87
+ measurement_times [case ].append (time )
88
+ stream .close ()
89
+
90
+
91
+ finally :
92
+ self .clean_up ()
93
+
94
+ return {'DuckDB' : measurement_times }
95
+
96
+
97
+ # Deletes the used temporary database
98
+ def clean_up (self ):
99
+ if os .path .exists (TMP_DB ):
100
+ os .remove (TMP_DB )
101
+ if os .path .exists (TMP_SQL_FILE ):
102
+ os .remove (TMP_SQL_FILE )
103
+
104
+
105
+ # Parse attributes of one table, return as string
106
+ def parse_attributes (self , attributes : dict ):
107
+ columns = '('
108
+ for column_name , ty in attributes .items ():
109
+ not_null = 'NOT NULL' if 'NOT NULL' in ty else ''
110
+ ty = ty .split (' ' )
111
+ match (ty [0 ]):
112
+ case 'INT' :
113
+ typ = 'INT'
114
+ case 'CHAR' :
115
+ typ = f'CHAR({ ty [1 ]} )'
116
+ case 'DECIMAL' :
117
+ typ = f'DECIMAL({ ty [1 ]} ,{ ty [2 ]} )'
118
+ case 'DATE' :
119
+ typ = 'DATE'
120
+ case 'DOUBLE' :
121
+ typ = 'DOUBLE'
122
+ case 'FLOAT' :
123
+ typ = 'REAL'
124
+ case 'BIGINT' :
125
+ typ = 'BIGINT'
126
+ case _:
127
+ raise Exception (f"Unknown type given for '{ column_name } '" )
128
+ columns += f"{ column_name } { typ } { not_null } , "
129
+ columns = columns [:- 2 ] + ')'
130
+ return columns
131
+
132
+
133
+ # Creates tables in the database and copies contents of given files into them
134
+ # Call with 'with_scale_factors'=False if data should be loaded as a whole
135
+ # Call with 'with_scale_factors'=True if data should be placed in tmp tables
136
+ # and copied for each case with different scale factor
137
+ def generate_create_table_stmts (self , data : dict , with_scale_factors ):
138
+ statements = list ()
139
+ for table_name , table in data .items ():
140
+ columns = self .parse_attributes (table ['attributes' ])
141
+
142
+ delimiter = table .get ('delimiter' )
143
+ header = table .get ('header' )
144
+ format = table ['format' ].upper ()
145
+
146
+ if with_scale_factors :
147
+ table_name += "_tmp"
148
+
149
+ create = f"CREATE TABLE { table_name } { columns } ;"
150
+ copy = f"COPY { table_name } FROM '{ table ['file' ]} ' ( "
151
+ if delimiter :
152
+ delim = delimiter .replace ("'" , "" )
153
+ copy += f" DELIMITER \' { delim } \' ,"
154
+ if format :
155
+ copy += f" FORMAT { format } ,"
156
+ if header :
157
+ copy += f" HEADER," if (header == 1 ) else ""
158
+
159
+ copy = copy [:- 1 ] + " );"
160
+
161
+ statements .append (create )
162
+ statements .append (copy )
163
+
164
+ if with_scale_factors :
165
+ # Create actual table that will be used for experiment
166
+ statements .append (f"CREATE TABLE { table_name [:- 4 ]} { columns } ;" )
167
+
168
+ with open (TMP_SQL_FILE , "w" ) as tmp :
169
+ for stmt in statements :
170
+ tmp .write (stmt + "\n " )
0 commit comments