@@ -3,50 +3,103 @@ use std::time::Instant;
3
3
4
4
use csv:: Writer ;
5
5
use tracing:: info;
6
+
7
+ use crate :: block_committer:: input:: { ConfigImpl , Input } ;
8
+
9
+ pub type InputImpl = Input < ConfigImpl > ;
10
+
11
+ #[ derive( Debug , PartialEq , Eq ) ]
12
+ pub enum Action {
13
+ Total ,
14
+ Read ,
15
+ Compute ,
16
+ Write ,
17
+ }
18
+
6
19
pub struct TimeMeasurement {
7
- timer : Option < Instant > ,
8
- total_time : u128 , // Total duration of all blocks (milliseconds).
9
- per_fact_durations : Vec < u64 > , // Average duration (microseconds) per new fact in a block.
10
- n_facts : Vec < usize > ,
11
- block_durations : Vec < u64 > , // Duration of a block (milliseconds).
12
- facts_in_db : Vec < usize > , // Number of facts in the DB prior to the current block.
13
- total_facts : usize ,
20
+ pub block_timer : Option < Instant > ,
21
+ pub read_timer : Option < Instant > ,
22
+ pub compute_timer : Option < Instant > ,
23
+ pub writer_timer : Option < Instant > ,
24
+ pub total_time : u64 , // Total duration of all blocks (milliseconds).
25
+ pub n_new_facts : Vec < usize > ,
26
+ pub n_read_facts : Vec < usize > ,
27
+ pub block_durations : Vec < u64 > , // Duration of a block (milliseconds).
28
+ pub read_durations : Vec < u64 > , // Duration of a read (milliseconds).
29
+ pub compute_durations : Vec < u64 > , // Duration of a compute (milliseconds).
30
+ pub write_durations : Vec < u64 > , // Duration of a write (milliseconds).
31
+ pub facts_in_db : Vec < usize > , // Number of facts in the DB prior to the current block.
32
+ pub block_number : usize ,
33
+ pub total_facts : usize ,
14
34
}
15
35
16
36
impl TimeMeasurement {
17
- pub fn new ( n_iterations : usize ) -> Self {
37
+ pub fn new ( size : usize ) -> Self {
18
38
Self {
19
- timer : None ,
39
+ block_timer : None ,
40
+ read_timer : None ,
41
+ compute_timer : None ,
42
+ writer_timer : None ,
20
43
total_time : 0 ,
21
- per_fact_durations : Vec :: with_capacity ( n_iterations) ,
22
- n_facts : Vec :: with_capacity ( n_iterations) ,
23
- block_durations : Vec :: with_capacity ( n_iterations) ,
24
- facts_in_db : Vec :: with_capacity ( n_iterations) ,
44
+ n_new_facts : Vec :: with_capacity ( size) ,
45
+ n_read_facts : Vec :: with_capacity ( size) ,
46
+ read_durations : Vec :: with_capacity ( size) ,
47
+ compute_durations : Vec :: with_capacity ( size) ,
48
+ write_durations : Vec :: with_capacity ( size) ,
49
+ block_durations : Vec :: with_capacity ( size) ,
50
+ facts_in_db : Vec :: with_capacity ( size) ,
51
+ block_number : 0 ,
25
52
total_facts : 0 ,
26
53
}
27
54
}
28
55
29
- pub fn start_measurement ( & mut self ) {
30
- self . timer = Some ( Instant :: now ( ) ) ;
56
+ fn get_mut_timer ( & mut self , action : & Action ) -> & mut Option < Instant > {
57
+ match action {
58
+ Action :: Total => & mut self . block_timer ,
59
+ Action :: Read => & mut self . read_timer ,
60
+ Action :: Compute => & mut self . compute_timer ,
61
+ Action :: Write => & mut self . writer_timer ,
62
+ }
31
63
}
32
64
33
- pub fn stop_measurement ( & mut self , facts_count : usize ) {
34
- let duration =
35
- self . timer . expect ( "stop_measurement called before start_measurement" ) . elapsed ( ) ;
65
+ pub fn start_measurement ( & mut self , action : Action ) {
66
+ * self . get_mut_timer ( & action) = Some ( Instant :: now ( ) ) ;
67
+ }
68
+
69
+ /// Stop the measurement for the given action and add the duration to the corresponding vector.
70
+ /// facts_count is either the number of facts read from the DB for Read action, or the number of
71
+ /// new facts written to the DB for the Total action.
72
+ pub fn stop_measurement ( & mut self , facts_count : Option < usize > , action : Action ) {
73
+ let duration = self
74
+ . get_mut_timer ( & action)
75
+ . expect ( "stop_measurement called before start_measurement" )
76
+ . elapsed ( ) ;
36
77
info ! (
37
78
"Time elapsed for iteration {}: {} milliseconds" ,
38
79
self . n_results( ) ,
39
80
duration. as_millis( )
40
81
) ;
41
- let millis = duration. as_millis ( ) ;
82
+ let millis: u64 = duration. as_millis ( ) . try_into ( ) . unwrap ( ) ;
83
+ match action {
84
+ Action :: Total => {
85
+ self . block_durations . push ( millis) ;
42
86
self . total_time += millis;
43
- #[ allow( clippy:: as_conversions) ]
44
- self . per_fact_durations
45
- . push ( duration. div_f32 ( facts_count as f32 ) . as_micros ( ) . try_into ( ) . unwrap ( ) ) ;
46
- self . block_durations . push ( millis. try_into ( ) . unwrap ( ) ) ;
47
- self . n_facts . push ( facts_count) ;
87
+ self . n_new_facts . push ( facts_count. unwrap ( ) ) ;
48
88
self . facts_in_db . push ( self . total_facts ) ;
49
- self . total_facts += facts_count;
89
+ self . total_facts += facts_count. unwrap ( ) ;
90
+ self . block_number += 1 ;
91
+ }
92
+ Action :: Read => {
93
+ self . read_durations . push ( millis) ;
94
+ self . n_read_facts . push ( facts_count. unwrap ( ) ) ;
95
+ }
96
+ Action :: Compute => {
97
+ self . compute_durations . push ( millis) ;
98
+ }
99
+ Action :: Write => {
100
+ self . write_durations . push ( millis) ;
101
+ }
102
+ }
50
103
}
51
104
52
105
pub fn n_results ( & self ) -> usize {
@@ -72,7 +125,7 @@ impl TimeMeasurement {
72
125
let sum: u64 =
73
126
self . block_durations [ window_start..window_start + window_size] . iter ( ) . sum ( ) ;
74
127
let sum_of_facts: usize =
75
- self . n_facts [ window_start..window_start + window_size] . iter ( ) . sum ( ) ;
128
+ self . n_new_facts [ window_start..window_start + window_size] . iter ( ) . sum ( ) ;
76
129
#[ allow( clippy:: as_conversions) ]
77
130
averages. push ( 1000.0 * sum as f64 / sum_of_facts as f64 ) ;
78
131
}
@@ -116,26 +169,26 @@ impl TimeMeasurement {
116
169
let mut wtr = Writer :: from_writer ( file) ;
117
170
wtr. write_record ( [
118
171
"block_number" ,
119
- "n_facts " ,
120
- "facts_in_db " ,
121
- "time_per_fact_micros " ,
172
+ "n_new_facts " ,
173
+ "n_read_facts " ,
174
+ "initial_facts_in_db " ,
122
175
"block_duration_millis" ,
176
+ "read_duration_millis" ,
177
+ "compute_duration_millis" ,
178
+ "write_duration_millis" ,
123
179
] )
124
180
. expect ( "Failed to write CSV header." ) ;
125
- for ( i, ( ( ( & per_fact, & n_facts) , & duration) , & facts_in_db) ) in self
126
- . per_fact_durations
127
- . iter ( )
128
- . zip ( self . n_facts . iter ( ) )
129
- . zip ( self . block_durations . iter ( ) )
130
- . zip ( self . facts_in_db . iter ( ) )
131
- . enumerate ( )
132
- {
181
+ let n_results = self . n_results ( ) ;
182
+ for i in 0 ..n_results {
133
183
wtr. write_record ( & [
134
- i. to_string ( ) ,
135
- n_facts. to_string ( ) ,
136
- facts_in_db. to_string ( ) ,
137
- per_fact. to_string ( ) ,
138
- duration. to_string ( ) ,
184
+ ( self . block_number - n_results + i) . to_string ( ) ,
185
+ self . n_new_facts [ i] . to_string ( ) ,
186
+ self . n_read_facts [ i] . to_string ( ) ,
187
+ self . facts_in_db [ i] . to_string ( ) ,
188
+ self . block_durations [ i] . to_string ( ) ,
189
+ self . read_durations [ i] . to_string ( ) ,
190
+ self . compute_durations [ i] . to_string ( ) ,
191
+ self . write_durations [ i] . to_string ( ) ,
139
192
] )
140
193
. expect ( "Failed to write CSV record." ) ;
141
194
}
0 commit comments