77
77
current_file ,
78
78
% % current file handle since the last fsync?
79
79
current_file_handle ,
80
- % % file handle cache
80
+ % % current write file offset
81
81
current_file_offset ,
82
+ % % messages that were potentially removed from the current write file
83
+ current_file_removes = [],
82
84
% % TRef for our interval timer
83
85
sync_timer_ref ,
84
86
% % files that had removes
@@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef,
1150
1152
end , CRef , State1 )
1151
1153
end .
1152
1154
1153
- remove_message (MsgId , CRef , State = # msstate { index_ets = IndexEts }) ->
1155
+ remove_message (MsgId , CRef ,
1156
+ State = # msstate {
1157
+ index_ets = IndexEts ,
1158
+ current_file = CurrentFile ,
1159
+ current_file_removes = Removes }) ->
1154
1160
case should_mask_action (CRef , MsgId , State ) of
1155
1161
{true , _Location } ->
1156
1162
State ;
@@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
1162
1168
% % ets:lookup(FileSummaryEts, File),
1163
1169
State ;
1164
1170
{_Mask , # msg_location { ref_count = RefCount , file = File ,
1165
- total_size = TotalSize }}
1171
+ total_size = TotalSize } = Entry }
1166
1172
when RefCount > 0 ->
1167
1173
% % only update field, otherwise bad interaction with
1168
1174
% % concurrent GC
1169
- Dec = fun () -> index_update_ref_counter (IndexEts , MsgId , - 1 ) end ,
1170
1175
case RefCount of
1171
- % % don 't remove from cur_file_cache_ets here because
1176
+ % % Don 't remove from cur_file_cache_ets here because
1172
1177
% % there may be further writes in the mailbox for the
1173
- % % same msg.
1174
- 1 -> ok = Dec (),
1175
- delete_file_if_empty (
1176
- File , gc_candidate (File ,
1177
- adjust_valid_total_size (
1178
- File , - TotalSize , State )));
1179
- _ -> ok = Dec (),
1180
- gc_candidate (File , State )
1178
+ % % same msg. We will remove 0 ref_counts when rolling
1179
+ % % over to the next write file.
1180
+ 1 when File =:= CurrentFile ->
1181
+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1182
+ State1 = State # msstate {current_file_removes =
1183
+ [Entry # msg_location {ref_count = 0 }|Removes ]},
1184
+ delete_file_if_empty (
1185
+ File , gc_candidate (File ,
1186
+ adjust_valid_total_size (
1187
+ File , - TotalSize , State1 )));
1188
+ 1 ->
1189
+ index_delete (IndexEts , MsgId ),
1190
+ delete_file_if_empty (
1191
+ File , gc_candidate (File ,
1192
+ adjust_valid_total_size (
1193
+ File , - TotalSize , State )));
1194
+ _ ->
1195
+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1196
+ gc_candidate (File , State )
1181
1197
end
1182
1198
end .
1183
1199
@@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file(
1239
1255
cur_file_cache_ets = CurFileCacheEts ,
1240
1256
file_size_limit = FileSizeLimit })
1241
1257
when Offset >= FileSizeLimit ->
1242
- State1 = internal_sync (State ),
1258
+ % % Cleanup the index of messages that were removed before rolling over.
1259
+ State0 = cleanup_index_on_roll_over (State ),
1260
+ State1 = internal_sync (State0 ),
1243
1261
ok = writer_close (CurHdl ),
1244
1262
NextFile = CurFile + 1 ,
1245
1263
{ok , NextHdl } = writer_open (Dir , NextFile ),
@@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin,
1267
1285
index_ets = IndexEts ,
1268
1286
file_summary_ets = FileSummaryEts ,
1269
1287
cur_file_cache_ets = CurFileCacheEts }) ->
1288
+ % % Cleanup the index of messages that were removed before rolling over.
1289
+ State1 = cleanup_index_on_roll_over (State0 ),
1270
1290
{LargeMsgFile , LargeMsgHdl } = case CurOffset of
1271
1291
% % We haven't written in the file yet. Use it.
1272
1292
0 ->
@@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin,
1286
1306
ok = index_insert (IndexEts ,
1287
1307
# msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
1288
1308
offset = 0 , total_size = TotalSize }),
1289
- State1 = case CurFile of
1309
+ State2 = case CurFile of
1290
1310
% % We didn't open a new file. We must update the existing value.
1291
1311
LargeMsgFile ->
1292
1312
[_ ,_ ] = ets :update_counter (FileSummaryEts , LargeMsgFile ,
1293
1313
[{# file_summary .valid_total_size , TotalSize },
1294
1314
{# file_summary .file_size , TotalSize }]),
1295
- State0 ;
1315
+ State1 ;
1296
1316
% % We opened a new file. We can insert it all at once.
1297
1317
% % We must also check whether we need to delete the previous
1298
1318
% % current file, because if there is no valid data this is
@@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin,
1303
1323
valid_total_size = TotalSize ,
1304
1324
file_size = TotalSize ,
1305
1325
locked = false }),
1306
- delete_file_if_empty (CurFile , State0 # msstate { current_file_handle = LargeMsgHdl ,
1326
+ delete_file_if_empty (CurFile , State1 # msstate { current_file_handle = LargeMsgHdl ,
1307
1327
current_file = LargeMsgFile ,
1308
1328
current_file_offset = TotalSize })
1309
1329
end ,
@@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin,
1318
1338
% % Delete messages from the cache that were written to disk.
1319
1339
true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1320
1340
% % Process confirms (this won't flush; we already did) and continue.
1321
- State = internal_sync (State1 ),
1341
+ State = internal_sync (State2 ),
1322
1342
State # msstate { current_file_handle = NextHdl ,
1323
1343
current_file = NextFile ,
1324
1344
current_file_offset = 0 }.
1325
1345
1346
+ cleanup_index_on_roll_over (State = # msstate {
1347
+ index_ets = IndexEts ,
1348
+ current_file_removes = Removes }) ->
1349
+ lists :foreach (fun (Entry ) ->
1350
+ % % We delete objects that have ref_count=0. If a message
1351
+ % % got its ref_count increased, it will not be deleted.
1352
+ % % We thus avoid extra index lookups to check for ref_count.
1353
+ index_delete_object (IndexEts , Entry )
1354
+ end , Removes ),
1355
+ State # msstate {current_file_removes = []}.
1356
+
1326
1357
contains_message (MsgId , From , State = # msstate { index_ets = IndexEts }) ->
1327
1358
MsgLocation = index_lookup_positive_ref_count (IndexEts , MsgId ),
1328
1359
gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) ->
1643
1674
ok .
1644
1675
1645
1676
index_update_fields (IndexEts , Key , Updates ) ->
1646
- true = ets :update_element (IndexEts , Key , Updates ),
1677
+ _ = ets :update_element (IndexEts , Key , Updates ),
1647
1678
ok .
1648
1679
1649
1680
index_update_ref_counter (IndexEts , Key , RefCount ) ->
@@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate {
1967
1998
% % We do not try to look at messages that are not the last because we do not want to
1968
1999
% % accidentally write over messages that were moved earlier.
1969
2000
1970
- compact_file (File , State = # gc_state { index_ets = IndexEts ,
1971
- file_summary_ets = FileSummaryEts ,
1972
- dir = Dir ,
1973
- msg_store = Server }) ->
2001
+ compact_file (File , State = # gc_state { file_summary_ets = FileSummaryEts }) ->
2002
+ case ets :lookup (FileSummaryEts , File ) of
2003
+ [] ->
2004
+ rabbit_log :debug (" File ~tp has already been deleted; no need to compact" ,
2005
+ [File ]),
2006
+ ok ;
2007
+ [# file_summary {file_size = FileSize }] ->
2008
+ compact_file (File , FileSize , State )
2009
+ end .
2010
+
2011
+ compact_file (File , FileSize ,
2012
+ State = # gc_state { index_ets = IndexEts ,
2013
+ file_summary_ets = FileSummaryEts ,
2014
+ dir = Dir ,
2015
+ msg_store = Server }) ->
1974
2016
% % Get metadata about the file. Will be used to calculate
1975
2017
% % how much data was reclaimed as a result of compaction.
1976
2018
[# file_summary {file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
@@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
2123
2165
2124
2166
-spec delete_file (non_neg_integer (), gc_state ()) -> ok | defer .
2125
2167
2126
- delete_file (File , State = # gc_state { file_summary_ets = FileSummaryEts ,
2127
- file_handles_ets = FileHandlesEts ,
2128
- dir = Dir }) ->
2168
+ delete_file (File , # gc_state { file_summary_ets = FileSummaryEts ,
2169
+ file_handles_ets = FileHandlesEts ,
2170
+ dir = Dir }) ->
2129
2171
case ets :match_object (FileHandlesEts , {{'_' , File }, '_' }, 1 ) of
2130
2172
{[_ |_ ], _Cont } ->
2131
2173
rabbit_log :debug (" Asked to delete file ~p but it has active readers. Deferring." ,
@@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
2134
2176
_ ->
2135
2177
[# file_summary { valid_total_size = 0 ,
2136
2178
file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2137
- [] = scan_and_vacuum_message_file (File , State ),
2138
2179
ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
2139
2180
true = ets :delete (FileSummaryEts , File ),
2140
2181
rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
0 commit comments