@@ -34,7 +34,7 @@ use parquet::file::properties::WriterProperties;
34
34
use parquet:: file:: reader:: { ChunkReader , Length } ;
35
35
use parquet:: file:: FOOTER_SIZE ;
36
36
use parquet:: format:: PageLocation ;
37
- use std:: collections:: HashMap ;
37
+ use std:: collections:: { BTreeMap , HashMap } ;
38
38
use std:: io:: Read ;
39
39
use std:: ops:: Range ;
40
40
use std:: sync:: { Arc , Mutex } ;
@@ -221,18 +221,22 @@ struct TestColumnChunk {
221
221
/// The name of the column
222
222
name : String ,
223
223
224
- /// The location of the column chunk in the file
225
- byte_range : Range < usize > ,
224
+ /// The location of the entire column chunk in the file including data pages
225
+ /// and data pages.
226
+ location : Range < usize > ,
226
227
227
- /// The location of the data pages in the column chunk
228
+ /// The offset of the start of of the dictionary page if any
229
+ dictionary_page_location : Option < i64 > ,
230
+
231
+ /// The location of the data pages in the file
228
232
page_locations : Vec < PageLocation > ,
229
233
}
230
234
231
235
/// Information about the pages in a single row group
232
236
#[ derive( Debug ) ]
233
237
struct TestRowGroup {
234
238
/// Maps column_name -> Information about the column chunk
235
- columns : HashMap < String , TestColumnChunk > ,
239
+ columns : BTreeMap < String , TestColumnChunk > ,
236
240
}
237
241
238
242
/// Information about all the row groups in a Parquet file, extracted from its metadata
@@ -255,17 +259,29 @@ impl TestRowGroups {
255
259
. enumerate ( )
256
260
. map ( |( col_idx, col_meta) | {
257
261
let column_name = col_meta. column_descr ( ) . name ( ) . to_string ( ) ;
258
- let locations = offset_index[ rg_index] [ col_idx] . page_locations ( ) ;
259
- let ( start_offset, end_offset) = col_meta. byte_range ( ) ;
262
+ let page_locations = offset_index[ rg_index] [ col_idx] . page_locations ( ) . to_vec ( ) ;
263
+ let dictionary_page_location = col_meta
264
+ . dictionary_page_offset ( ) ;
265
+
266
+ // We can find the byte range of the entire column chunk
267
+ let ( start_offset, length) = col_meta. byte_range ( ) ;
268
+ let start_offset = start_offset as usize ;
269
+ let end_offset = start_offset + length as usize ;
270
+
260
271
let test_column_chunk = TestColumnChunk {
261
272
name : column_name. clone ( ) ,
262
- byte_range : start_offset as usize ..end_offset as usize ,
263
- page_locations : locations. to_vec ( ) ,
273
+ location : start_offset ..end_offset,
274
+ dictionary_page_location,
275
+ page_locations,
264
276
} ;
265
277
266
- ( column_name, test_column_chunk)
278
+ test_column_chunk
279
+ } )
280
+ . map ( |test_column_chunk| {
281
+ // make key=value pairs to insert into the BTreeMap
282
+ ( test_column_chunk. name . clone ( ) , test_column_chunk)
267
283
} )
268
- . collect :: < HashMap < _ , _ > > ( ) ;
284
+ . collect :: < BTreeMap < _ , _ > > ( ) ;
269
285
TestRowGroup { columns }
270
286
} )
271
287
. collect ( ) ;
@@ -329,6 +345,9 @@ impl OperationLog {
329
345
330
346
/// Adds entries to the operation log for each interesting object that is
331
347
/// accessed by the specified range
348
+ ///
349
+ /// This function checks the ranges in order against possible locations
350
+ /// and adds the appropriate operation to the log for the first match found.
332
351
fn log_access ( & self , range : & Range < usize > ) {
333
352
let start = range. start as i64 ;
334
353
let end = range. end as i64 ;
@@ -339,50 +358,80 @@ impl OperationLog {
339
358
) ;
340
359
341
360
// figure out what logical part of the file this range corresponds to
342
- if self . footer_location . contains ( & range. start ) && self . footer_location . contains ( & ( range. end -1 ) ) {
361
+ if self . metadata_location . contains ( & range. start ) || self . metadata_location . contains ( & ( range. end -1 ) ) {
343
362
self . add_operation ( format ! (
344
- "Read Footer : {location_description}"
363
+ "Read Metadata : {location_description}"
345
364
) ) ;
346
365
return ;
347
366
}
348
- if self . metadata_location . contains ( & range. start ) && self . metadata_location . contains ( & ( range. end -1 ) ) {
367
+
368
+ if self . footer_location . contains ( & range. start ) || self . footer_location . contains ( & ( range. end -1 ) ) {
349
369
self . add_operation ( format ! (
350
- "Read Metadata : {location_description}"
370
+ "Read Footer : {location_description}"
351
371
) ) ;
352
- return ;
372
+ return ;
353
373
}
354
374
355
-
356
- let mut found = false ;
375
+ // Search for the location in each column chunk.
376
+ //
377
+ // The actual parquet reader must in general decode the page headers
378
+ // and determine the byte ranges of the pages. However, for this test
379
+ // we assume the following layout:
380
+ //
381
+ // ```text
382
+ // (Dictionary Page)
383
+ // (Data Page)
384
+ // ...
385
+ // (Data Page)
386
+ // ```
387
+ //
388
+ // We also assume that `self.page_locations` holds the location of all
389
+ // data pages, so any read operation that overlaps with a data page
390
+ // location is considered a read of that page, and any other read must
391
+ // be a dictionary page read.
357
392
for ( row_group_index, row_group) in self . row_groups . iter ( ) . enumerate ( ) {
358
393
for ( column_name, test_column_chunk) in & row_group. columns {
359
- let column_byte_range = & test_column_chunk. byte_range ;
394
+ // Check if the range overlaps with any data page locations
395
+ let page_locations = test_column_chunk. page_locations . iter ( ) ;
396
+ for ( page_index, page_location) in page_locations. enumerate ( ) {
397
+ let page_offset = page_location. offset as i64 ;
398
+ let page_end = page_offset + page_location. compressed_page_size as i64 ;
399
+ if start >= page_offset && end <= page_end {
400
+ self . add_operation ( format ! (
401
+ "Read Row Group {row_group_index}, column '{column_name}', Data Page {page_index}: {location_description}" ,
402
+ ) ) ;
403
+ return ;
404
+ }
405
+ }
406
+
407
+ // Check if the range overlaps with the dictionary page location
408
+ if let Some ( dict_page_offset) = test_column_chunk. dictionary_page_location {
409
+ let dict_page_end = dict_page_offset + test_column_chunk. location . len ( ) as i64 ;
410
+ if start >= dict_page_offset as i64 && end < dict_page_end {
411
+ self . add_operation ( format ! (
412
+ "Read Row Group {row_group_index}, column '{column_name}': Dictionary Page: {location_description}" ,
413
+ ) ) ;
414
+ return ;
415
+ }
416
+ }
417
+
418
+ let column_byte_range = & test_column_chunk. location ;
360
419
if column_byte_range. contains ( & range. start )
361
420
&& column_byte_range. contains ( & ( range. end - 1 ) )
362
421
{
363
- found = true ;
364
422
self . add_operation ( format ! (
365
423
"Read Row Group {row_group_index}, column '{column_name}': {location_description}" ,
366
424
) ) ;
367
- }
368
- // Check if the range overlaps with any of the page locations
369
- let page_locations = test_column_chunk. page_locations . iter ( ) ;
370
- for ( page_index, page_location) in page_locations. enumerate ( ) {
371
- if page_location. offset >= start && page_location. offset < end {
372
- found = true ;
373
- self . add_operation ( format ! (
374
- "Read Row Group {row_group_index}, column '{column_name}', page {page_index}: {location_description}" ,
375
- ) ) ;
376
- }
425
+ return ;
377
426
}
378
427
}
379
428
}
380
429
381
- if !found {
430
+ // If we reach here, the range does not match any known logical part of the file
382
431
self . add_operation ( format ! (
383
432
"UNKNOWN: {location_description}"
384
433
) ) ;
385
- }
434
+
386
435
}
387
436
}
388
437
0 commit comments