@@ -20,22 +20,25 @@ fn entry_not_found() -> std::io::Error {
2020 std:: io:: Error :: new ( ErrorKind :: NotFound , "Couldn't find entry" )
2121}
2222
23- // struct BucketMap {
24- //
25- // }
23+ #[ derive( Debug ) ]
24+ struct BucketMap {
25+ buckets : HashMap < Chunk , EntryType > ,
26+ zone_to_entry : Array2 < Option < Chunk > > ,
27+ }
2628
2729#[ derive( Debug ) ]
2830pub struct Cache {
2931 // Make sure to lock buckets before locking zone_to_entry, to avoid deadlock errors
30- buckets : RwLock < HashMap < Chunk , EntryType > > ,
31- zone_to_entry : Mutex < Array2 < Option < Chunk > > > ,
32+ bm : RwLock < BucketMap >
3233}
3334
3435impl Cache {
3536 pub fn new ( num_zones : usize , chunks_per_zone : usize ) -> Self {
3637 Self {
37- buckets : RwLock :: new ( HashMap :: new ( ) ) ,
38- zone_to_entry : Mutex :: new ( ArrayBase :: from_elem ( ( num_zones, chunks_per_zone) , None ) ) ,
38+ bm : RwLock :: new ( BucketMap {
39+ buckets : HashMap :: new ( ) ,
40+ zone_to_entry : ArrayBase :: from_elem ( ( num_zones, chunks_per_zone) , None )
41+ } ) ,
3942 }
4043 }
4144
@@ -57,9 +60,9 @@ impl Cache {
5760
5861 loop {
5962 // Bucket read locked -- no one can write to map
60- let bucket_guard = log_async_read_lock ! ( self . buckets ) ;
63+ let bucket_guard = self . bm . read ( ) . await ;
6164
62- if let Some ( state) = bucket_guard. get ( & key) {
65+ if let Some ( state) = bucket_guard. buckets . get ( & key) {
6366 // We now have the entry
6467 let bucket_state_guard = Arc :: clone ( state) ;
6568 let bucket_state_guard = bucket_state_guard. read ( ) . await ;
@@ -84,10 +87,10 @@ impl Cache {
8487
8588 loop {
8689 // Bucket write locked -- no one can read or write to map
87- let mut bucket_guard = log_async_write_lock ! ( self . buckets ) ;
90+ let mut bucket_guard = self . bm . write ( ) . await ;
8891
8992 // Incase it was inserted inbetween
90- if let Some ( state) = bucket_guard. get ( & key) {
93+ if let Some ( state) = bucket_guard. buckets . get ( & key) {
9194 // We now have the entry
9295 let bucket_state_guard = Arc :: clone ( state) ;
9396 let bucket_state_guard = bucket_state_guard. read ( ) . await ;
@@ -111,7 +114,7 @@ impl Cache {
111114 let mut chunk_loc_guard = locked_chunk_location. write ( ) . await ;
112115 // We now have something in the waiting state
113116 // It is locked and should not be unlocked until it's out of the waiting state
114- bucket_guard. insert ( key. clone ( ) , Arc :: clone ( & locked_chunk_location) ) ; // Place locked waiting state
117+ bucket_guard. buckets . insert ( key. clone ( ) , Arc :: clone ( & locked_chunk_location) ) ; // Place locked waiting state
115118 drop ( bucket_guard) ; // Bucket write unlocked -- Other writes can proceed on the outer map
116119 let write_result = writer ( ) . await ;
117120 match write_result {
@@ -123,8 +126,8 @@ impl Cache {
123126 } ;
124127 drop ( chunk_loc_guard) ; // Prevent deadlock
125128
126- let mut bucket_guard = log_async_write_lock ! ( self . buckets ) ;
127- bucket_guard. remove ( & key) ;
129+ let mut bucket_guard = self . bm . write ( ) . await ;
130+ bucket_guard. buckets . remove ( & key) ;
128131
129132 if let Some ( n) = notify {
130133 n. notify_waiters ( ) ;
@@ -136,8 +139,8 @@ impl Cache {
136139 * chunk_loc_guard = ChunkState :: Ready ( Arc :: new ( location. clone ( ) ) ) ;
137140 drop ( chunk_loc_guard) ;
138141
139- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry ) ;
140- reverse_mapping_guard[ location. as_index ( ) ] = Some ( key) ;
142+ let mut reverse_mapping_guard = self . bm . write ( ) . await ;
143+ reverse_mapping_guard. zone_to_entry [ location. as_index ( ) ] = Some ( key) ;
141144 }
142145 }
143146
@@ -151,15 +154,14 @@ impl Cache {
151154 /// Internal use: won't remove them from the map if they don't
152155 /// exist in the reverse mapping
153156 pub async fn remove_zones ( & self , zone_indices : & [ usize ] ) -> tokio:: io:: Result < ( ) > {
154- let mut map_guard = log_async_write_lock ! ( self . buckets) ; // DEADLOCK
155- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry) ;
157+ let mut map_guard = self . bm . write ( ) . await ;
156158
157159 // Loop over zones
158160 for zone_index in zone_indices {
159161 // Get slice representing a zone
160162 let zone_slice = s ! [ * zone_index, ..] ;
161163 // loop over zone chunks
162- let zone_view = reverse_mapping_guard . slice ( zone_slice) ;
164+ let zone_view = map_guard . zone_to_entry . slice ( zone_slice) ;
163165
164166 // Collect all valid chunks first
165167 let chunks_to_remove: Vec < _ > = zone_view
@@ -170,7 +172,7 @@ impl Cache {
170172
171173 // Now safely mutate map_guard and reverse_mapping
172174 for chunk in chunks_to_remove {
173- let entry = match map_guard. remove ( & chunk) {
175+ let entry = match map_guard. buckets . remove ( & chunk) {
174176 Some ( v) => v,
175177 None => {
176178 return Err ( io:: Error :: new (
@@ -184,7 +186,7 @@ impl Cache {
184186 }
185187
186188 // Now safe to mutate reverse_mapping
187- reverse_mapping_guard
189+ map_guard . zone_to_entry
188190 . slice_mut ( zone_slice)
189191 . map_inplace ( |v| * v = None ) ;
190192 }
@@ -205,22 +207,21 @@ impl Cache {
205207 {
206208 // to_relocate is a list of ChunkLocations that the caller wants to update
207209 // We pass in each chunk location and the writer function should return back with the list of updated chunk locations
208- let mut bucket_guard = log_async_write_lock ! ( self . buckets) ;
209- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry) ;
210+ let mut bucket_guard = self . bm . write ( ) . await ;
210211 let mut entry_lock_list = Vec :: new ( ) ;
211212
212213 // Remove to_relocate elements from the reverse_mapping, and
213214 // change their entries to be in the waiting state.
214215 for location in to_relocate {
215- let chunk_id = match reverse_mapping_guard [ location. as_index ( ) ] . clone ( ) {
216+ let chunk_id = match bucket_guard . zone_to_entry [ location. as_index ( ) ] . clone ( ) {
216217 Some ( id) => {
217- reverse_mapping_guard [ location. as_index ( ) ] . take ( ) ;
218+ bucket_guard . zone_to_entry [ location. as_index ( ) ] . take ( ) ;
218219 id
219220 }
220221 None => return Err ( io:: Error :: new ( ErrorKind :: NotFound , "Couldn't find chunk" ) ) ,
221222 } ;
222223
223- let state_guard = bucket_guard
224+ let state_guard = bucket_guard. buckets
224225 . get ( & chunk_id)
225226 . ok_or ( entry_not_found ( ) ) ?
226227 . clone ( ) ;
@@ -229,7 +230,7 @@ impl Cache {
229230 ChunkState :: Ready ( loc) => {
230231 assert ! ( * * loc == * location) ;
231232 let entry = new_entry ( ) ;
232- bucket_guard. insert ( chunk_id. clone ( ) , entry. clone ( ) ) ;
233+ bucket_guard. buckets . insert ( chunk_id. clone ( ) , entry. clone ( ) ) ;
233234 entry_lock_list. push ( entry) ;
234235 }
235236 ChunkState :: Waiting ( _notify) => {
@@ -271,37 +272,35 @@ impl Cache {
271272 pub async fn remove_entry ( & self , chunk : & ChunkLocation ) -> tokio:: io:: Result < ( ) > {
272273 // to_relocate is a list of ChunkLocations that the caller wants to update
273274 // We pass in each chunk location and the writer function should return back with the list of updated chunk locations
274- let mut bucket_guard = log_async_write_lock ! ( self . buckets) ;
275- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry) ;
275+ let mut bucket_guard = self . bm . write ( ) . await ;
276276
277- let chunk_id = match reverse_mapping_guard [ chunk. as_index ( ) ] . clone ( ) {
277+ let chunk_id = match bucket_guard . zone_to_entry [ chunk. as_index ( ) ] . clone ( ) {
278278 Some ( id) => {
279- reverse_mapping_guard [ chunk. as_index ( ) ] . take ( ) ;
279+ bucket_guard . zone_to_entry [ chunk. as_index ( ) ] . take ( ) ;
280280 id
281281 }
282282 None => return Err ( io:: Error :: new ( ErrorKind :: NotFound , "Couldn't find chunk" ) ) ,
283283 } ;
284284
285- bucket_guard. remove ( & chunk_id) . ok_or ( entry_not_found ( ) ) ?;
285+ bucket_guard. buckets . remove ( & chunk_id) . ok_or ( entry_not_found ( ) ) ?;
286286 Ok ( ( ) )
287287 }
288288
289289 pub async fn remove_entries ( & self , chunks : & [ ChunkLocation ] ) -> tokio:: io:: Result < ( ) > {
290290 // to_relocate is a list of ChunkLocations that the caller wants to update
291291 // We pass in each chunk location and the writer function should return back with the list of updated chunk locations
292- let mut bucket_guard = log_async_write_lock ! ( self . buckets) ;
293- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry) ;
292+ let mut bucket_guard = self . bm . write ( ) . await ;
294293
295294 for chunk in chunks {
296- let chunk_id = match reverse_mapping_guard [ chunk. as_index ( ) ] . clone ( ) {
295+ let chunk_id = match bucket_guard . zone_to_entry [ chunk. as_index ( ) ] . clone ( ) {
297296 Some ( id) => {
298- reverse_mapping_guard [ chunk. as_index ( ) ] . take ( ) ;
297+ bucket_guard . zone_to_entry [ chunk. as_index ( ) ] . take ( ) ;
299298 id
300299 }
301300 None => return Err ( io:: Error :: new ( ErrorKind :: NotFound , "Couldn't find chunk" ) ) ,
302301 } ;
303302
304- bucket_guard. remove ( & chunk_id) . ok_or ( entry_not_found ( ) ) ?;
303+ bucket_guard. buckets . remove ( & chunk_id) . ok_or ( entry_not_found ( ) ) ?;
305304 }
306305
307306 Ok ( ( ) )
@@ -318,8 +317,8 @@ impl Cache {
318317 W : FnOnce ( ) -> WFut + Send + ' static ,
319318 WFut : Future < Output = tokio:: io:: Result < ChunkLocation > > + Send + ' static ,
320319 {
321- let mut bucket_guard = log_async_write_lock ! ( self . buckets ) ;
322- let state = bucket_guard. get ( & key) . ok_or ( std:: io:: Error :: new (
320+ let mut bucket_guard = self . bm . write ( ) . await ;
321+ let state = bucket_guard. buckets . get ( & key) . ok_or ( std:: io:: Error :: new (
323322 ErrorKind :: NotFound ,
324323 format ! ( "Couldn't find entry {:?}" , key) ,
325324 ) ) ?;
@@ -339,14 +338,14 @@ impl Cache {
339338 ChunkState :: Ready ( _) => {
340339 let locked_chunk_location = new_entry ( ) ;
341340 let mut chunk_loc_guard = locked_chunk_location. write ( ) . await ;
342- bucket_guard. insert ( key. clone ( ) , Arc :: clone ( & locked_chunk_location) ) ;
341+ bucket_guard. buckets . insert ( key. clone ( ) , Arc :: clone ( & locked_chunk_location) ) ;
343342 drop ( bucket_guard) ; // Bucket write unlocked -- Other writes can proceed on the outer map
344343
345344 let write_result = writer ( ) . await ;
346345 match write_result {
347346 Err ( e) => {
348- let mut bucket_guard = log_async_write_lock ! ( self . buckets ) ;
349- bucket_guard. remove ( & key) ;
347+ let mut bucket_guard = self . bm . write ( ) . await ;
348+ bucket_guard. buckets . remove ( & key) ;
350349 match & * chunk_loc_guard {
351350 ChunkState :: Waiting ( notify) => {
352351 notify. notify_waiters ( ) ;
@@ -361,8 +360,8 @@ impl Cache {
361360 }
362361 Ok ( location) => {
363362 * chunk_loc_guard = ChunkState :: Ready ( Arc :: new ( location. clone ( ) ) ) ;
364- let mut reverse_mapping_guard = log_async_mutex_lock ! ( self . zone_to_entry ) ;
365- reverse_mapping_guard[ location. as_index ( ) ] = Some ( key) ;
363+ let mut reverse_mapping_guard = self . bm . write ( ) . await ;
364+ reverse_mapping_guard. zone_to_entry [ location. as_index ( ) ] = Some ( key) ;
366365 Ok ( ( ) )
367366 }
368367 }
0 commit comments