diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 7e221b8..f6fbf6d 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. %% -module(osiris_log). @@ -64,7 +64,10 @@ sorted_index_files/1, index_files_unsorted/1, make_chunk/7, - orphaned_segments/1 + orphaned_segments/1, + read_header0/1, + read_ahead_hints/3, + update_read/4 ]). % maximum size of a segment in bytes @@ -104,6 +107,7 @@ _/binary>>). -define(SKIP_SEARCH_JUMP, 2048). +-define(READ_AHEAD_LIMIT, 4096). %% Specification of the Log format. %% @@ -424,7 +428,10 @@ transport :: transport(), chunk_selector :: all | user_data, position = 0 :: non_neg_integer(), - filter :: undefined | osiris_bloom:mstate()}). + filter :: undefined | osiris_bloom:mstate(), + last_data_size = undefined :: undefined | non_neg_integer(), + filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(), + read_ahead_data = undefined :: undefined | binary()}). -record(write, {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -1389,7 +1396,7 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> {ok, #{num_records := NumRecords, next_position := NextPos} = - Header, + Header, _, #?MODULE{mode = #read{next_offset = ChId} = Read} = State} -> %% skip data portion {ok, Header, @@ -1436,8 +1443,9 @@ chunk_iterator(State) -> {error, {invalid_chunk_header, term()}}. chunk_iterator(#?MODULE{cfg = #cfg{}, mode = #read{type = RType, - chunk_selector = Selector} - } = State0, CreditHint) + chunk_selector = Selector, + filter_size = RaFs}} = State0, + CreditHint) when (is_integer(CreditHint) andalso CreditHint > 0) orelse is_atom(CreditHint) -> %% reads the next chunk of unparsed chunk data @@ -1451,15 +1459,20 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, data_size := DataSize, filter_size := FilterSize, position := Pos, - next_position := NextPos} = Header, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> - State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, - position = NextPos}}, + next_position := NextPos} = Header, MaybeData, + #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read1} = State1} -> case needs_handling(RType, Selector, ChType) of true -> + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos, + last_data_size = DataSize, + filter_size = read_ahead_fsize(RaFs, + FilterSize)}, + + State = State1#?MODULE{mode = Read}, DataPos = Pos + ?HEADER_SIZE_B + FilterSize, Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint, - DataSize, NumEntries), + DataSize, NumEntries, MaybeData), Iterator = #iterator{fd = Fd, data = Data, next_offset = ChId, @@ -1468,6 +1481,9 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, {ok, Header, Iterator, State}; false -> %% skip + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos}, + State = State1#?MODULE{mode = Read}, chunk_iterator(State, CreditHint) end; Other -> @@ -1549,7 +1565,7 @@ read_chunk(#?MODULE{cfg = #cfg{}} = State0) -> filter_size := FilterSize, position := Pos, next_position := NextPos, - trailer_size := TrailerSize}, + trailer_size := TrailerSize}, _, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State} -> ToRead = ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, {ok, ChData} = file:pread(Fd, Pos, ToRead), @@ -1649,17 +1665,18 @@ is_valid_chunk_on_disk(SegFile, Pos) -> {error, term()} | {end_of_stream, state()}. send_file(Sock, State) -> - send_file(Sock, State, fun(_, _) -> ok end). + send_file(Sock, State, fun(_, _) -> <<>> end). -spec send_file(gen_tcp:socket() | ssl:socket(), state(), - fun((header_map(), non_neg_integer()) -> term())) -> + fun((header_map(), non_neg_integer()) -> binary())) -> {ok, state()} | {error, term()} | {end_of_stream, state()}. send_file(Sock, #?MODULE{mode = #read{type = RType, chunk_selector = Selector, - transport = Transport}} = State0, + transport = Transport, + filter_size = RaFs}} = State0, Callback) -> case catch read_header0(State0) of {ok, #{type := ChType, @@ -1671,45 +1688,62 @@ send_file(Sock, position := Pos, next_position := NextPos, header_data := HeaderData} = Header, + MaybeData, #?MODULE{fd = Fd, - mode = #read{next_offset = ChId} = Read0} = State1} -> - %% read header - %% used to write frame headers to socket - %% and return the number of bytes to sendfile - %% this allow users of this api to send all the data - %% or just header and entry data - {ToSkip, ToSend} = - case RType of - offset -> - select_amount_to_send(Selector, ChType, FilterSize, - DataSize, TrailerSize); - data -> - {0, FilterSize + DataSize + TrailerSize} - end, + mode = #read{next_offset = ChId} = Read1} = State1} -> - Read = Read0#read{next_offset = ChId + NumRecords, - position = NextPos}, %% only sendfile if either the reader is a data reader %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - _ = Callback(Header, ToSend + byte_size(HeaderData)), - case send(Transport, Sock, HeaderData) of - ok -> - case sendfile(Transport, Fd, Sock, - Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos, + last_data_size = DataSize, + filter_size = read_ahead_fsize(RaFs, + FilterSize)}, + case MaybeData of + undefined -> + %% read header + %% used to write frame headers to socket + %% and return the number of bytes to sendfile + %% this allow users of this api to send all the data + %% or just header and entry data + {ToSkip, ToSend} = + select_amount_to_send(RType, Selector, ChType, + FilterSize, DataSize, + TrailerSize), + + FrameHeader = Callback(Header, + ToSend + byte_size(HeaderData)), + case send(Transport, Sock, [FrameHeader, HeaderData]) of + ok -> + case sendfile(Transport, Fd, Sock, + Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of + ok -> + State = State1#?MODULE{mode = Read}, + {ok, State}; + Err -> + %% reset the position to the start of the current + %% chunk so that subsequent reads won't error + Err + end; + Err -> + Err + end; + Data -> + FrameHeader = Callback(Header, + byte_size(Data) + byte_size(HeaderData)), + case send(Transport, Sock, [FrameHeader, HeaderData, Data]) of ok -> State = State1#?MODULE{mode = Read}, {ok, State}; Err -> - %% reset the position to the start of the current - %% chunk so that subsequent reads won't error Err - end; - Err -> - Err + end end; false -> + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos}, State = State1#?MODULE{mode = Read}, %% skip chunk and recurse send_file(Sock, State, Callback) @@ -1718,6 +1752,11 @@ send_file(Sock, Other end. +select_amount_to_send(data, _Sel, _ChkType, FilterSize, DataSize, TrailerSize) -> + {0, FilterSize + DataSize + TrailerSize}; +select_amount_to_send(offset, Sel, ChkType, FilterSize, DataSize, TrailerSize) -> + select_amount_to_send(Sel, ChkType, FilterSize, DataSize, TrailerSize). + %% There could be many more selectors in the future select_amount_to_send(user_data, ?CHNK_USER, FilterSize, DataSize, _TrailerSize) -> {FilterSize, DataSize}; @@ -2861,138 +2900,255 @@ recover_tracking(Fd, Trk0, Pos0) -> Trk0 end. -read_header0(#?MODULE{cfg = #cfg{directory = Dir, - shared = Shared, - counter = CntRef}, - mode = #read{next_offset = NextChId0, - position = Pos, - filter = Filter} = Read0, - current_file = CurFile, - fd = Fd} = - State) -> +-spec read_header0(state()) -> {ok, map(), undefined | binary(), state()} | + {end_of_stream, state()}. +read_header0(State) -> %% reads the next header if permitted case can_read_next(State) of true -> - %% optimistically read 64 bytes (small binary) as it may save us - %% a syscall reading the filter if the filter is of the default - %% 16 byte size - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of - {ok, <> = HeaderData0} -> - <> = HeaderData0, - counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), - counters:add(CntRef, ?C_CHUNKS, 1), - NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - - ChunkFilter = case MaybeFilter of - <> -> - %% filter is of default size or 0 - F; - _ when Filter =/= undefined -> - %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, - FilterSize) of - {ok, F} -> - F; - eof -> - throw({end_of_stream, State}) - end; - _ -> - <<>> - end, - - case osiris_bloom:is_match(ChunkFilter, Filter) of - true -> - {ok, #{chunk_id => NextChId0, - epoch => Epoch, - type => ChType, - crc => Crc, - num_records => NumRecords, - num_entries => NumEntries, - timestamp => Timestamp, - data_size => DataSize, - trailer_size => TrailerSize, - header_data => HeaderData, - filter_size => FilterSize, - next_position => NextPos, - position => Pos}, State}; - false -> - Read = Read0#read{next_offset = NextChId0 + NumRecords, - position = NextPos}, - read_header0(State#?MODULE{mode = Read}); - {retry_with, NewFilter} -> - Read = Read0#read{filter = NewFilter}, - read_header0(State#?MODULE{mode = Read}) - end; - {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> - %% partial header read - %% this can happen when a replica reader reads ahead - %% optimistically - %% treat as end_of_stream + read_header_with_ra(State); + false -> + {end_of_stream, State} + end. + +read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, + shared = Shared}, + mode = #read{next_offset = NextChId0, + position = Pos, + last_data_size = Lds, + filter_size = FilterSize, + read_ahead_data = undefined} = Read0, + current_file = CurFile, + fd = Fd} = State) -> + ReadAheadSize = read_ahead_size(Lds, FilterSize), + + case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadSize) of + {ok, Bin} when byte_size(Bin) >= ?HEADER_SIZE_B -> + parse_header(Bin, State); + {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> + %% partial header read + %% this can happen when a replica reader reads ahead + %% optimistically + %% treat as end_of_stream + {end_of_stream, State}; + eof -> + FirstOffset = osiris_log_shared:first_chunk_id(Shared), + %% open next segment file and start there if it exists + NextChId = max(FirstOffset, NextChId0), + %% TODO: replace this check with a last chunk id counter + %% updated by the writer and replicas + SegFile = make_file_name(NextChId, "segment"), + case SegFile == CurFile of + true -> + %% the new filename is the same as the old one + %% this should only really happen for an empty + %% log but would cause an infinite loop if it does {end_of_stream, State}; - eof -> - FirstOffset = osiris_log_shared:first_chunk_id(Shared), - %% open next segment file and start there if it exists - NextChId = max(FirstOffset, NextChId0), - %% TODO: replace this check with a last chunk id counter - %% updated by the writer and replicas - SegFile = make_file_name(NextChId, "segment"), - case SegFile == CurFile of - true -> - %% the new filename is the same as the old one - %% this should only really happen for an empty - %% log but would cause an infinite loop if it does - {end_of_stream, State}; - false -> - case file:open(filename:join(Dir, SegFile), - [raw, binary, read]) of - {ok, Fd2} -> - ok = file:close(Fd), - Read = Read0#read{next_offset = NextChId, - position = ?LOG_HEADER_SIZE}, - read_header0( - State#?MODULE{current_file = SegFile, - fd = Fd2, - mode = Read}); - {error, enoent} -> - {end_of_stream, State} - end - end; - {ok, - <>} -> - %% TODO: we may need to return the new state here if - %% we've crossed segments - {error, {unexpected_chunk_id, UnexpectedChId, NextChId0}}; - Invalid -> - {error, {invalid_chunk_header, Invalid}} + false -> + case file:open(filename:join(Dir, SegFile), + [raw, binary, read]) of + {ok, Fd2} -> + ok = file:close(Fd), + Read = Read0#read{next_offset = NextChId, + position = ?LOG_HEADER_SIZE}, + read_header0( + State#?MODULE{current_file = SegFile, + fd = Fd2, + mode = Read}); + {error, enoent} -> + {end_of_stream, State} + end + end; + {ok, + <>} -> + %% TODO: we may need to return the new state here if + %% we've crossed segments + {error, {unexpected_chunk_id, UnexpectedChId, NextChId0}}; + Invalid -> + {error, {invalid_chunk_header, Invalid}} + end; +read_header_with_ra(#?MODULE{mode = #read{last_data_size = Lds, + read_ahead_data = RAD} = Read0} = State) -> + case byte_size(RAD) > ?HEADER_SIZE_B + Lds of + true -> + case parse_header(RAD, State) of + need_more_data -> + %% we don't have enough data in memory + Read1 = Read0#read{read_ahead_data = undefined}, + read_header_with_ra(State#?MODULE{mode = Read1}); + Result -> + Result end; false -> - {end_of_stream, State} + Read1 = Read0#read{read_ahead_data = undefined}, + read_header_with_ra(State#?MODULE{mode = Read1}) + end. + +parse_header(<> = HeaderData0, + #?MODULE{mode = #read{type = RType, + chunk_selector = Selector, + next_offset = NextChId0, + read_ahead_data = RAD, + filter_size = LFS} = Read0} = State) -> + {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, + FilterSize, DataSize, + TrailerSize), + case byte_size(MaybeFilterAndRest) >= (ToSkip + ToSend) of + true -> + %% we read everything we needed + {ReadAheadData, Content} = + case MaybeFilterAndRest of + <<_Skip:ToSkip/binary, + Ctnt:ToSend/binary, + Rest/binary>> + when byte_size(Rest) > ?HEADER_SIZE_B + LFS -> + %% remaining is larger than 64 bytes so worth keeping + %% around + {Rest, Ctnt}; + <<_Skip:ToSkip/binary, + Ctnt:ToSend/binary, + _Rest/binary>> -> + {undefined, Ctnt} + end, + Read1 = Read0#read{read_ahead_data = ReadAheadData}, + maybe_return_header(State#?MODULE{mode = Read1}, + HeaderData0, MaybeFilterAndRest, Content, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + false -> + case RAD of + undefined -> + %% we just read the data, we could not read ahead the whole chunk + %% let's move on to see whether the chunk should be filtered or not + maybe_return_header(State, + HeaderData0, MaybeFilterAndRest, undefined, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + _ -> + %% the data were from a previous read + %% we can ditch them and try to read the chunk ahead + need_more_data + end + end; +parse_header(_, _) -> + need_more_data. + +%% keep the previous value if the current one is 0 (i.e. no filter in the chunk) +read_ahead_fsize(Previous, 0) -> + Previous; +read_ahead_fsize(_, Current) -> + Current. + +read_ahead_size(LastDataSize, FilterSize) -> + case LastDataSize =/= undefined andalso + LastDataSize =< ?READ_AHEAD_LIMIT of + true -> + %% the previous chunk was small, try to read + %% the next chunk fully in one read + %% this can save us a system call later + FilterSize + ?READ_AHEAD_LIMIT; + false -> + %% optimistically read the default filter size. + %% this amounts to 64 bytes with the header (small binary) + %% and it may save us a syscall reading the filter + %% if the filter is of the default 16 byte size + ?DEFAULT_FILTER_SIZE + end. + +maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, + mode = #read{next_offset = NextChId0, + position = Pos, + filter = Filter} = Read0, + fd = Fd} = State, HeaderData0, MaybeFilter, Content, + ChType, NumEntries, NumRecords, Timestamp, Epoch, + NextChId0, Crc, DataSize, TrailerSize, FilterSize) -> + + ChunkFilter = case MaybeFilter of + <> -> + %% filter is of default size or 0 + F; + _ when Filter =/= undefined -> + %% the filter is larger than default + case file:pread(Fd, Pos + ?HEADER_SIZE_B, + FilterSize) of + {ok, F} -> + F; + eof -> + throw({end_of_stream, State}) + end; + _ -> + <<>> + end, + + counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), + counters:add(CntRef, ?C_CHUNKS, 1), + NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, + + case osiris_bloom:is_match(ChunkFilter, Filter) of + true -> + <> = HeaderData0, + {ok, #{chunk_id => NextChId0, + epoch => Epoch, + type => ChType, + crc => Crc, + num_records => NumRecords, + num_entries => NumEntries, + timestamp => Timestamp, + data_size => DataSize, + trailer_size => TrailerSize, + header_data => HeaderData, + filter_size => FilterSize, + next_position => NextPos, + position => Pos}, Content, State}; + false -> + Read = Read0#read{next_offset = NextChId0 + NumRecords, + position = NextPos}, + read_header0(State#?MODULE{mode = Read}); + {retry_with, NewFilter} -> + Read = Read0#read{filter = NewFilter}, + read_header0(State#?MODULE{mode = Read}) end. +%% for testing +-spec read_ahead_hints(state(), non_neg_integer(), osiris_bloom:filter_size()) -> + state(). +read_ahead_hints(#?MODULE{mode = R = #read{}} = S, Lds, FilterSize) -> + S#?MODULE{mode = R#read{last_data_size = Lds, + filter_size = FilterSize}}. + +%% for testing +-spec update_read(state(), offset(), offset(), non_neg_integer()) -> state(). +update_read(#?MODULE{mode = R0 = #read{}} = S, ChId, NumRecords, Pos) -> + R = R0#read{next_offset = ChId + NumRecords, + position = Pos}, + S#?MODULE{mode = R}. + trigger_retention_eval(#?MODULE{cfg = #cfg{name = Name, directory = Dir, @@ -3215,16 +3371,21 @@ dump_crc_check(Fd) -> dump_crc_check(Fd) end. -iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> +iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, _Credit, DataSize, _NumEntries, RAD) + when RAD =/= undefined -> + <> = RAD, + Data; +iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries, undefined) -> + %% FIXME is it about credit = 1 or number of entries = 1? %% no point reading ahead if there is only one entry to be read at this %% time undefined; -iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) +iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries, undefined) when Credit == all orelse NumEntries == 1 -> {ok, Data} = file:pread(Fd, Pos, DataSize), validate_crc(ChunkId, Crc, Data), Data; -iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> +iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries, undefined) -> %% read ahead, assumes roughly equal entry sizes which may not be the case %% TODO round up to nearest block? %% We can only practically validate CRC if we read the whole data diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 89e18f8..c807a58 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -38,6 +38,7 @@ all_tests() -> subbatch, subbatch_compressed, iterator_read_chunk, + iterator_read_chunk_with_read_ahead, iterator_read_chunk_mixed_sizes_with_credit, read_chunk_parsed, read_chunk_parsed_2, @@ -92,7 +93,9 @@ all_tests() -> init_partial_writes, init_with_unexpected_file, overview_with_missing_segment, - overview_with_missing_index_at_start + overview_with_missing_index_at_start, + read_header_ahead_offset_reader, + read_header_ahead_offset_reader_filter ]. groups() -> @@ -342,14 +345,16 @@ iterator_read_chunk(Config) -> EntriesRev = [Batch, <<"ho">>, {<<"filter">>, <<"hi">>}], - {ChId, _S1} = write_committed(EntriesRev, S0), - {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1), + {ChId, S1} = write_committed(EntriesRev, S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), HoOffs = ChId + 1, BatchOffs = ChId + 2, {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), {{BatchOffs, Batch}, I} = osiris_log:iterator_next(I2), ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + osiris_log:close(R2), + osiris_log:close(S1), ok. iterator_read_chunk_mixed_sizes_with_credit(Config) -> @@ -363,16 +368,79 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) -> EntriesRev = [Big, <<"ho">>, {<<"filter">>, <<"hi">>}], - {ChId, _S1} = write_committed(EntriesRev, S0), + {ChId, S1} = write_committed(EntriesRev, S0), %% this is a less than ideal case where we have one large and two very - %% small entries inthe same batch. The read ahead only - {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1, 2), + %% small entries in the same batch. We read ahead only the 2 first entries. + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1, 2), HoOffs = ChId + 1, BigOffs = ChId + 2, {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), {{BigOffs, Big}, I} = osiris_log:iterator_next(I2), ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + osiris_log:close(R2), + osiris_log:close(S1), + ok. + +iterator_read_chunk_with_read_ahead(Config) -> + %% the test makes sure reading ahead on header reading does not break + %% the iterator + RAL = 4096, %% read ahead limit + Conf = ?config(osiris_conf, Config), + W = osiris_log:init(Conf), + Shared = osiris_log:get_shared(W), + RConf = Conf#{shared => Shared}, + {ok, R} = osiris_log:init_offset_reader(0, RConf), + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% first chunk, there won't be any data size hints in the reader + EntriesRev = [<<"hi">>, <<"ho">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"ho">>}, I1} = osiris_log:iterator_next(I0), + {{_, <<"hi">>}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one will be read ahead + EntriesRev = [<<"foo">>, <<"bar">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"bar">>}, I1} = osiris_log:iterator_next(I0), + {{_, <<"foo">>}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one will be read ahead + E1 = rand:bytes(RAL - 100), + EntriesRev = [E1 , <<"aaa">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0), + {{_, E1}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one is too big to be read ahead + E1 = rand:bytes(RAL * 2), + EntriesRev = [E1 , <<"aaa">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0), + {{_, E1}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end + ], + + #{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, offset, + ?DEFAULT_FILTER_SIZE, W, R), + osiris_log:close(Rd1), + osiris_log:close(Wr1), ok. read_chunk_parsed(Config) -> @@ -1071,7 +1139,7 @@ accept_chunk_inital_offset(Config) -> Conf = ?config(osiris_conf, Config), - Ch1 = fake_chunk([<<"blob1">>], ?LINE, 1, 100), + Ch1 = fake_chunk_bin([<<"blob1">>], ?LINE, 1, 100), F0 = osiris_log:init(Conf#{initial_offset => 100}, acceptor), F1 = osiris_log:accept_chunk(Ch1, F0), @@ -1092,7 +1160,7 @@ accept_chunk_iolist_header_in_first_element(Config) -> Conf = ?config(osiris_conf, Config), - <> = fake_chunk([{<<"filter">>, <<"blob1">>}], + <> = fake_chunk_bin([{<<"filter">>, <<"blob1">>}], ?LINE, 1, 100), Ch1 = [Head, Res], F0 = osiris_log:init(Conf#{initial_offset => 100}, acceptor), @@ -1879,8 +1947,237 @@ overview_with_missing_index_at_start(Config) -> filename:join(?config(dir, Config), "*.index")))), ok. +read_header_ahead_offset_reader(Config) -> + RAL = 4096, %% read ahead limit + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% no previous chunk, so not reading ahead + %% the messages are large enough to be larger than the default + %% filter size which is always read ahead (16 bytes) + {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0, fsize := FSize}) -> + %% previous chunk too large to read ahead + R1 = osiris_log:read_ahead_hints(R0, RAL * 2, FSize), + {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), + {ok, H, Content, R2} = osiris_log:read_header0(R1), + ?assertEqual(undefined, Content), + {H, W1, R2} + end, + fun(#{w := W0, r := R0, fsize := FSize}) -> + %% trigger reading ahead by setting a small value for the + %% last chunk read. + %% this setting stays the same for the rest of the test + R1 = osiris_log:read_ahead_hints(R0, 1, FSize), + Entries = [<<"foo">>, <<"bar">>], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R2} = osiris_log:read_header0(R1), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R2} + end, + fun(#{w := W0, r := R0}) -> + %% chunk data do not fit in the read ahead + Entries = [binary:copy(<<"a">>, RAL * 2)], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0, rtype := RType, fsize := FSize}) -> + %% chunk with a non-empty filter + %% data are small enough, they should be read with the header + Entries = [<<"ho">>, {<<"banana">>, <<"hi">>}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, BloomD, ED, _] = fake_chunk(Entries, ?LINE, 1, 100, FSize), + Expected = case RType of + data -> + %% we expect the bloom filter data + [BloomD, ED]; + offset -> + ED + end, + ?assertEqual(iolist_to_binary(Expected), Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + Entries1 = [binary:copy(<<"a">>, 2000)], + {_, W1} = write_committed(Entries1, W0), + Entries2 = [binary:copy(<<"b">>, 2000)], + {_, W2} = write_committed(Entries2, W1), + %% this one is too big to be read ahead fully + Entries3 = [binary:copy(<<"c">>, 5000)], + {_, W3} = write_committed(Entries3, W2), + + {ok, H1, Content1, R1} = osiris_log:read_header0(R0), + [_, _, D1, _] = fake_chunk(Entries1, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D1), Content1), + + {ok, H2, Content2, R2} = osiris_log:read_header0(update_read(H1, R1)), + [_, _, D2, _] = fake_chunk(Entries2, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D2), Content2), + + {ok, H3, Content3, R3} = osiris_log:read_header0(update_read(H2, R2)), + ?assertEqual(undefined, Content3), + + {H3, W3, R3} + end + ], + + FilterSizes = [?DEFAULT_FILTER_SIZE, ?DEFAULT_FILTER_SIZE * 2], + Conf0 = ?config(osiris_conf, Config), + #{dir := Dir0} = Conf0, + lists:foreach(fun({FSize, RType}) -> + Dir1 = filename:join(Dir0, io_lib:format("~p~p", [FSize, RType])), + Conf1 = Conf0#{dir => Dir1, + filter_size => FSize}, + Wr0 = osiris_log:init(Conf1), + Shared = osiris_log:get_shared(Wr0), + Conf = Conf1#{shared => Shared}, + {ok, Rd0} = init_reader(RType, Conf), + #{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, RType, + FSize, Wr0, Rd0), + osiris_log:close(Rd1), + osiris_log:close(Wr1) + end, [{FSize, RType} || FSize <- FilterSizes, RType <- [data, offset]]), + ok. + +read_header_ahead_offset_reader_filter(Config) -> + RAL = 4096, %% read ahead limit + %% we store the entry size on 4 bytes, so we must substract them from the data size + MaxEntrySize = RAL - 4, + DFS = ?DEFAULT_FILTER_SIZE, + FilterSizes = [DFS, DFS * 2], + Conf0 = ?config(osiris_conf, Config), + #{dir := Dir0} = Conf0, + lists:foreach( + fun(FSize) -> + Dir1 = filename:join(Dir0, integer_to_list(FSize)), + Conf1 = Conf0#{dir => Dir1, + filter_size => FSize}, + Wr0 = osiris_log:init(Conf1), + Shared = osiris_log:get_shared(Wr0), + Conf = Conf1#{shared => Shared}, + {ok, Rd0} = osiris_log:init_offset_reader(first, Conf), + %% we start by using the default filter size in the read ahead hints + Rd1 = osiris_log:read_ahead_hints(Rd0, 1, DFS), + %% compute the max entry size + %% (meaning we don't read ahead enough above this entry size) + %% first we don't know the actual filter size in the stream, + %% so we assume the default filter size + %% this "reduces" the max size of data we can read in the case + %% of a larger-than-default filter size, because of the extra + %% bytes that belong to the filter + MES1 = MaxEntrySize - (FSize - DFS), + %% then the max entry becomes accurate, whatever the actual filter size + MES2 = MaxEntrySize, + + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% data do not fit in the read ahead + EData = binary:copy(<<"a">>, MES1 + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R}) -> + %% assume we are now using the correct filter size + %% (this setting stays the same for the next tests) + R0 = osiris_log:read_ahead_hints(R, 1, FSize), + %% data just bigger than the first limit + EData = binary:copy(<<"a">>, MES1 + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + case FSize =:= DFS of + true -> + %% default filter size: still does not fit + ?assertEqual(undefined, Content); + false -> + %% with the correct filter size, we now read + %% a bit further than with the first limit + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content) + end, + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% we use the "new" max entry size + %% data do not fit in the read ahead + EData = binary:copy(<<"a">>, MES2 + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% we use the "new" max entry size + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES2), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end + ], + #{w := Wr1, r := Rd2} = run_read_ahead_tests(Tests, offset, FSize, + Wr0, Rd1), + osiris_log:close(Rd2), + osiris_log:close(Wr1) + end, FilterSizes), + ok. + %% Utility +init_reader(offset, Conf) -> + osiris_log:init_offset_reader(first, Conf); +init_reader(data, Conf) -> + osiris_log:init_data_reader({0, empty}, Conf). + +run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) -> + lists:foldl(fun(F, Acc) -> + {H, W, R0} = F(Acc), + R1 = update_read(H, R0), + #{w => W, r => R1, rtype => RType, fsize => FSize} + end, #{w => Wr0, r => Rd0, rtype => RType, fsize => FSize}, Tests). + +-spec update_read(map(), osiris_log:state()) -> osiris_log:state(). +update_read(#{chunk_id := ChId, + num_records := NumRecords, + next_position := NextPos}, R) -> + osiris_log:update_read(R, ChId, NumRecords, NextPos). + truncate_at(File, Pos) -> {ok, Fd} = file:open(File, [raw, binary, read, write]), % truncate the file so that chunk <> is missing and <> is corrupted @@ -1977,7 +2274,13 @@ delete_dir(Dir) -> ok end. +fake_chunk_bin(Blobs, Ts, Epoch, NextChId) -> + iolist_to_binary(fake_chunk(Blobs, Ts, Epoch, NextChId)). + fake_chunk(Blobs, Ts, Epoch, NextChId) -> - iolist_to_binary( - element(1, - osiris_log:make_chunk(Blobs, <<>>, 0, Ts, Epoch, NextChId, 16))). + fake_chunk(Blobs, Ts, Epoch, NextChId, ?DEFAULT_FILTER_SIZE). + +fake_chunk(Blobs, Ts, Epoch, NextChId, FSize) -> + element(1, + osiris_log:make_chunk(Blobs, <<>>, 0, Ts, Epoch, NextChId, + FSize)).