From 7e4dc5aabc736801441bdd36865c9b20cd771648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 4 Jul 2025 07:57:37 +0000 Subject: [PATCH 1/7] Update send_file callback to save a system call The callback is used in RabbitMQ to send the frame header. This commit changes the callback to expect a binary result and to send it along the chunk header. This saves a system call and improves performance with small chunks. --- src/osiris_log.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 7e221b8..e1346c1 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1649,10 +1649,10 @@ is_valid_chunk_on_disk(SegFile, Pos) -> {error, term()} | {end_of_stream, state()}. send_file(Sock, State) -> - send_file(Sock, State, fun(_, _) -> ok end). + send_file(Sock, State, fun(_, _) -> <<>> end). -spec send_file(gen_tcp:socket() | ssl:socket(), state(), - fun((header_map(), non_neg_integer()) -> term())) -> + fun((header_map(), non_neg_integer()) -> binary())) -> {ok, state()} | {error, term()} | {end_of_stream, state()}. @@ -1693,8 +1693,8 @@ send_file(Sock, %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - _ = Callback(Header, ToSend + byte_size(HeaderData)), - case send(Transport, Sock, HeaderData) of + FrameHeader = Callback(Header, ToSend + byte_size(HeaderData)), + case send(Transport, Sock, [FrameHeader, HeaderData]) of ok -> case sendfile(Transport, Fd, Sock, Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of From 28535e9ceafeb52f56c396a61bed2c0e34231327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 8 Jul 2025 15:27:06 +0000 Subject: [PATCH 2/7] Read ahead chunk body if previous chunk was small enough Reading a chunk header then using the sendfile syscall to send the chunk data is not optimal for small chunks, as it amounts to 2 system calls for a small amount of bytes. This commit uses the size of the last chunk to try to read the whole chunk when the library reads the chunk header. If the last chunk data size was under 4096 bytes, the library does not only read the chunk header, but reads ahead to try to read the whole and thus to save the sendfile call later. This optimization should improve the read throughput for streams with small chunks, e.g. streams with 1-message chunks. --- src/osiris_log.erl | 411 ++++++++++++++++++++++++-------------- test/osiris_log_SUITE.erl | 166 ++++++++++++++- 2 files changed, 416 insertions(+), 161 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index e1346c1..4dfb03e 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -64,7 +64,10 @@ sorted_index_files/1, index_files_unsorted/1, make_chunk/7, - orphaned_segments/1 + orphaned_segments/1, + read_header0/1, + last_data_size/2, + update_read/4 ]). % maximum size of a segment in bytes @@ -104,6 +107,9 @@ _/binary>>). -define(SKIP_SEARCH_JUMP, 2048). +-define(READ_AHEAD_LIMIT, 4096). +-define(CAN_READ_AHEAD(LastDataSize), LastDataSize =/= undefined andalso + LastDataSize =< ?READ_AHEAD_LIMIT). %% Specification of the Log format. %% @@ -424,7 +430,8 @@ transport :: transport(), chunk_selector :: all | user_data, position = 0 :: non_neg_integer(), - filter :: undefined | osiris_bloom:mstate()}). + filter :: undefined | osiris_bloom:mstate(), + last_data_size = undefined :: undefined | non_neg_integer()}). -record(write, {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -1389,7 +1396,7 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> {ok, #{num_records := NumRecords, next_position := NextPos} = - Header, + Header, _, #?MODULE{mode = #read{next_offset = ChId} = Read} = State} -> %% skip data portion {ok, Header, @@ -1451,7 +1458,7 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, data_size := DataSize, filter_size := FilterSize, position := Pos, - next_position := NextPos} = Header, + next_position := NextPos} = Header, _, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, position = NextPos}}, @@ -1549,7 +1556,7 @@ read_chunk(#?MODULE{cfg = #cfg{}} = State0) -> filter_size := FilterSize, position := Pos, next_position := NextPos, - trailer_size := TrailerSize}, + trailer_size := TrailerSize}, _, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State} -> ToRead = ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, {ok, ChData} = file:pread(Fd, Pos, ToRead), @@ -1671,45 +1678,60 @@ send_file(Sock, position := Pos, next_position := NextPos, header_data := HeaderData} = Header, + MaybeData, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read0} = State1} -> - %% read header - %% used to write frame headers to socket - %% and return the number of bytes to sendfile - %% this allow users of this api to send all the data - %% or just header and entry data - {ToSkip, ToSend} = - case RType of - offset -> - select_amount_to_send(Selector, ChType, FilterSize, - DataSize, TrailerSize); - data -> - {0, FilterSize + DataSize + TrailerSize} - end, - - Read = Read0#read{next_offset = ChId + NumRecords, - position = NextPos}, + %% only sendfile if either the reader is a data reader %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - FrameHeader = Callback(Header, ToSend + byte_size(HeaderData)), - case send(Transport, Sock, [FrameHeader, HeaderData]) of - ok -> - case sendfile(Transport, Fd, Sock, - Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of + Read = Read0#read{next_offset = ChId + NumRecords, + position = NextPos, + last_data_size = DataSize}, + case MaybeData of + undefined -> + %% read header + %% used to write frame headers to socket + %% and return the number of bytes to sendfile + %% this allow users of this api to send all the data + %% or just header and entry data + {ToSkip, ToSend} = + select_amount_to_send(RType, Selector, ChType, + FilterSize, DataSize, + TrailerSize), + + FrameHeader = Callback(Header, + ToSend + byte_size(HeaderData)), + case send(Transport, Sock, [FrameHeader, HeaderData]) of + ok -> + case sendfile(Transport, Fd, Sock, + Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of + ok -> + State = State1#?MODULE{mode = Read}, + {ok, State}; + Err -> + %% reset the position to the start of the current + %% chunk so that subsequent reads won't error + Err + end; + Err -> + Err + end; + Data -> + FrameHeader = Callback(Header, + byte_size(Data) + byte_size(HeaderData)), + case send(Transport, Sock, [FrameHeader, HeaderData, Data]) of ok -> State = State1#?MODULE{mode = Read}, {ok, State}; Err -> - %% reset the position to the start of the current - %% chunk so that subsequent reads won't error Err - end; - Err -> - Err + end end; false -> + Read = Read0#read{next_offset = ChId + NumRecords, + position = NextPos}, State = State1#?MODULE{mode = Read}, %% skip chunk and recurse send_file(Sock, State, Callback) @@ -1718,6 +1740,11 @@ send_file(Sock, Other end. +select_amount_to_send(data, _Sel, _ChkType, FilterSize, DataSize, TrailerSize) -> + {0, FilterSize + DataSize + TrailerSize}; +select_amount_to_send(offset, Sel, ChkType, FilterSize, DataSize, TrailerSize) -> + select_amount_to_send(Sel, ChkType, FilterSize, DataSize, TrailerSize). + %% There could be many more selectors in the future select_amount_to_send(user_data, ?CHNK_USER, FilterSize, DataSize, _TrailerSize) -> {FilterSize, DataSize}; @@ -2861,138 +2888,212 @@ recover_tracking(Fd, Trk0, Pos0) -> Trk0 end. -read_header0(#?MODULE{cfg = #cfg{directory = Dir, - shared = Shared, - counter = CntRef}, - mode = #read{next_offset = NextChId0, - position = Pos, - filter = Filter} = Read0, - current_file = CurFile, - fd = Fd} = - State) -> +read_header0(State) -> %% reads the next header if permitted case can_read_next(State) of true -> - %% optimistically read 64 bytes (small binary) as it may save us - %% a syscall reading the filter if the filter is of the default - %% 16 byte size - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of - {ok, <> = HeaderData0} -> - <> = HeaderData0, - counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), - counters:add(CntRef, ?C_CHUNKS, 1), - NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - - ChunkFilter = case MaybeFilter of - <> -> - %% filter is of default size or 0 - F; - _ when Filter =/= undefined -> - %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, - FilterSize) of - {ok, F} -> - F; - eof -> - throw({end_of_stream, State}) - end; - _ -> - <<>> - end, - - case osiris_bloom:is_match(ChunkFilter, Filter) of - true -> - {ok, #{chunk_id => NextChId0, - epoch => Epoch, - type => ChType, - crc => Crc, - num_records => NumRecords, - num_entries => NumEntries, - timestamp => Timestamp, - data_size => DataSize, - trailer_size => TrailerSize, - header_data => HeaderData, - filter_size => FilterSize, - next_position => NextPos, - position => Pos}, State}; - false -> - Read = Read0#read{next_offset = NextChId0 + NumRecords, - position = NextPos}, - read_header0(State#?MODULE{mode = Read}); - {retry_with, NewFilter} -> - Read = Read0#read{filter = NewFilter}, - read_header0(State#?MODULE{mode = Read}) - end; - {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> - %% partial header read - %% this can happen when a replica reader reads ahead - %% optimistically - %% treat as end_of_stream + do_read_header(State); + false -> + {end_of_stream, State} + end. + +do_read_header(#?MODULE{cfg = #cfg{directory = Dir, + shared = Shared}, + mode = #read{type = RType, + chunk_selector = Selector, + next_offset = NextChId0, + position = Pos, + last_data_size = Lds} = Read0, + current_file = CurFile, + fd = Fd} = State) -> + ReadAheadOffset = read_ahead_offset(Lds), + + case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadOffset) of + {ok, <> = HeaderData0} + when ?CAN_READ_AHEAD(Lds) -> + {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, + FilterSize, DataSize, + TrailerSize), + %% summing what we need to skip and send gives us the number + %% of bytes we need to read, we make sure we read it ahead + Content = case ReadAheadOffset of + RAO when RAO >= (ToSkip + ToSend) -> + %% we read everything we needed + <<_Skip:ToSkip/binary, + Ctnt:ToSend/binary, + _Rest/binary>> = MaybeFilterAndRest, + Ctnt; + _ -> + %% we did not read enough, the caller will have to do it + undefined + end, + maybe_return_header(State, HeaderData0, MaybeFilterAndRest, Content, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + {ok, <> = HeaderData0} -> + maybe_return_header(State, HeaderData0, MaybeFilter, undefined, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, + TrailerSize, FilterSize); + {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> + %% partial header read + %% this can happen when a replica reader reads ahead + %% optimistically + %% treat as end_of_stream + {end_of_stream, State}; + eof -> + FirstOffset = osiris_log_shared:first_chunk_id(Shared), + %% open next segment file and start there if it exists + NextChId = max(FirstOffset, NextChId0), + %% TODO: replace this check with a last chunk id counter + %% updated by the writer and replicas + SegFile = make_file_name(NextChId, "segment"), + case SegFile == CurFile of + true -> + %% the new filename is the same as the old one + %% this should only really happen for an empty + %% log but would cause an infinite loop if it does {end_of_stream, State}; - eof -> - FirstOffset = osiris_log_shared:first_chunk_id(Shared), - %% open next segment file and start there if it exists - NextChId = max(FirstOffset, NextChId0), - %% TODO: replace this check with a last chunk id counter - %% updated by the writer and replicas - SegFile = make_file_name(NextChId, "segment"), - case SegFile == CurFile of - true -> - %% the new filename is the same as the old one - %% this should only really happen for an empty - %% log but would cause an infinite loop if it does - {end_of_stream, State}; - false -> - case file:open(filename:join(Dir, SegFile), - [raw, binary, read]) of - {ok, Fd2} -> - ok = file:close(Fd), - Read = Read0#read{next_offset = NextChId, - position = ?LOG_HEADER_SIZE}, - read_header0( - State#?MODULE{current_file = SegFile, - fd = Fd2, - mode = Read}); - {error, enoent} -> - {end_of_stream, State} - end - end; - {ok, - <>} -> - %% TODO: we may need to return the new state here if - %% we've crossed segments - {error, {unexpected_chunk_id, UnexpectedChId, NextChId0}}; - Invalid -> - {error, {invalid_chunk_header, Invalid}} + false -> + case file:open(filename:join(Dir, SegFile), + [raw, binary, read]) of + {ok, Fd2} -> + ok = file:close(Fd), + Read = Read0#read{next_offset = NextChId, + position = ?LOG_HEADER_SIZE}, + read_header0( + State#?MODULE{current_file = SegFile, + fd = Fd2, + mode = Read}); + {error, enoent} -> + {end_of_stream, State} + end end; + {ok, + <>} -> + %% TODO: we may need to return the new state here if + %% we've crossed segments + {error, {unexpected_chunk_id, UnexpectedChId, NextChId0}}; + Invalid -> + {error, {invalid_chunk_header, Invalid}} + end. + +read_ahead_offset(LastDataSize) -> + case LastDataSize of + LastDataSize when ?CAN_READ_AHEAD(LastDataSize) -> + %% the previous chunk was small, try to read + %% the next chunk fully in one read + %% this can save us the sendfile call later + ?DEFAULT_FILTER_SIZE + ?READ_AHEAD_LIMIT; + _ -> + %% optimistically read the default filter size. + %% this amounts to 64 bytes with the header (small binary) + %% and it may save us a syscall reading the filter + %% if the filter is of the default 16 byte size + ?DEFAULT_FILTER_SIZE + end. + +maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, + mode = #read{next_offset = NextChId0, + position = Pos, + filter = Filter} = Read0, + fd = Fd} = State, HeaderData0, MaybeFilter, Content, + ChType, NumEntries, NumRecords, Timestamp, Epoch, + NextChId0, Crc, DataSize, TrailerSize, FilterSize) -> + <> = HeaderData0, + + ChunkFilter = case MaybeFilter of + <> -> + %% filter is of default size or 0 + F; + _ when Filter =/= undefined -> + %% the filter is larger than default + case file:pread(Fd, Pos + ?HEADER_SIZE_B, + FilterSize) of + {ok, F} -> + F; + eof -> + throw({end_of_stream, State}) + end; + _ -> + <<>> + end, + + counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), + counters:add(CntRef, ?C_CHUNKS, 1), + NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, + + case osiris_bloom:is_match(ChunkFilter, Filter) of + true -> + {ok, #{chunk_id => NextChId0, + epoch => Epoch, + type => ChType, + crc => Crc, + num_records => NumRecords, + num_entries => NumEntries, + timestamp => Timestamp, + data_size => DataSize, + trailer_size => TrailerSize, + header_data => HeaderData, + filter_size => FilterSize, + next_position => NextPos, + position => Pos}, Content, State}; false -> - {end_of_stream, State} + Read = Read0#read{next_offset = NextChId0 + NumRecords, + position = NextPos}, + read_header0(State#?MODULE{mode = Read}); + {retry_with, NewFilter} -> + Read = Read0#read{filter = NewFilter}, + read_header0(State#?MODULE{mode = Read}) end. +-spec last_data_size(state(), non_neg_integer()) -> state(). +last_data_size(#?MODULE{mode = R = #read{}} = S, Lds) -> + S#?MODULE{mode = R#read{last_data_size = Lds}}. + +update_read(#?MODULE{mode = R0} = S, ChId, NumRecords, Pos) -> + R = R0#read{next_offset = ChId + NumRecords, + position = Pos}, + S#?MODULE{mode = R}. + trigger_retention_eval(#?MODULE{cfg = #cfg{name = Name, directory = Dir, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 89e18f8..d9f98e8 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -92,7 +92,9 @@ all_tests() -> init_partial_writes, init_with_unexpected_file, overview_with_missing_segment, - overview_with_missing_index_at_start + overview_with_missing_index_at_start, + read_header_ahead_offset_reader, + read_header_ahead_offset_reader_filter ]. groups() -> @@ -1071,7 +1073,7 @@ accept_chunk_inital_offset(Config) -> Conf = ?config(osiris_conf, Config), - Ch1 = fake_chunk([<<"blob1">>], ?LINE, 1, 100), + Ch1 = fake_chunk_bin([<<"blob1">>], ?LINE, 1, 100), F0 = osiris_log:init(Conf#{initial_offset => 100}, acceptor), F1 = osiris_log:accept_chunk(Ch1, F0), @@ -1092,7 +1094,7 @@ accept_chunk_iolist_header_in_first_element(Config) -> Conf = ?config(osiris_conf, Config), - <> = fake_chunk([{<<"filter">>, <<"blob1">>}], + <> = fake_chunk_bin([{<<"filter">>, <<"blob1">>}], ?LINE, 1, 100), Ch1 = [Head, Res], F0 = osiris_log:init(Conf#{initial_offset => 100}, acceptor), @@ -1879,8 +1881,154 @@ overview_with_missing_index_at_start(Config) -> filename:join(?config(dir, Config), "*.index")))), ok. +read_header_ahead_offset_reader(Config) -> + RAL = 4096, %% read ahead limit + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% no previous chunk, so not reading ahead + {_, W1} = write_committed([<<"hi">>, <<"ho">>], W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% previous chunk too large to read ahead + R1 = osiris_log:last_data_size(R0, RAL * 2), + {_, W1} = write_committed([<<"hi">>, <<"ho">>], W0), + {ok, H, Content, R2} = osiris_log:read_header0(R1), + ?assertEqual(undefined, Content), + {H, W1, R2} + end, + fun(#{w := W0, r := R0}) -> + %% trigger reading ahead by setting a small value for the + %% last chunk read. + %% this setting stays the same for the rest of the test + R1 = osiris_log:last_data_size(R0, 1), + Entries = [<<"foo">>, <<"bar">>], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R2} = osiris_log:read_header0(R1), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R2} + end, + fun(#{w := W0, r := R0}) -> + %% chunk data do not fit in the read ahead + Entries = [binary:copy(<<"a">>, RAL * 2)], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0, rtype := RType, fsize := FSize}) -> + %% chunk with a non-empty filter + %% data are small enough, they should be read with the header + Entries = [<<"ho">>, {<<"banana">>, <<"hi">>}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, BloomD, ED, _] = fake_chunk(Entries, ?LINE, 1, 100, FSize), + Expected = case RType of + data -> + %% we expect the bloom filter data + [BloomD, ED]; + offset -> + ED + end, + ?assertEqual(iolist_to_binary(Expected), Content), + {H, W1, R1} + end + ], + + FilterSizes = [?DEFAULT_FILTER_SIZE, ?DEFAULT_FILTER_SIZE * 2], + Conf0 = ?config(osiris_conf, Config), + #{dir := Dir0} = Conf0, + lists:foreach(fun({FSize, RType}) -> + Dir1 = filename:join(Dir0, io_lib:format("~p~p", [FSize, RType])), + Conf1 = Conf0#{dir => Dir1, + filter_size => FSize}, + Wr0 = osiris_log:init(Conf1), + Shared = osiris_log:get_shared(Wr0), + Conf = Conf1#{shared => Shared}, + {ok, Rd0} = init_reader(RType, Conf), + #{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, RType, + FSize, Wr0, Rd0), + osiris_log:close(Rd1), + osiris_log:close(Wr1) + end, [{FSize, RType} || FSize <- FilterSizes, RType <- [offset, data]]), + ok. + +read_header_ahead_offset_reader_filter(Config) -> + RAL = 4096, %% read ahead limit + %% we store the entry size on 4 bytes, so we must substract them from the data size + MaxEntrySize = RAL - 4, + DFS = ?DEFAULT_FILTER_SIZE, + FilterSizes = [DFS, DFS * 2], + Conf0 = ?config(osiris_conf, Config), + #{dir := Dir0} = Conf0, + lists:foreach( + fun(FSize) -> + Dir1 = filename:join(Dir0, integer_to_list(FSize)), + Conf1 = Conf0#{dir => Dir1, + filter_size => FSize}, + Wr0 = osiris_log:init(Conf1), + Shared = osiris_log:get_shared(Wr0), + Conf = Conf1#{shared => Shared}, + {ok, Rd0} = osiris_log:init_offset_reader(first, Conf), + Rd1 = osiris_log:last_data_size(Rd0, 1), + %% we always read ahead the default filter size. + %% with a larger-than-default filter, we must consider + %% the extra bytes that belong to the filter, + %% that is (actual filter size) - (default filter size) + %% this reduces the max entry size we can read ahead + MES = MaxEntrySize - (FSize - DFS), + + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% chunk with a non-empty filter + %% data do not fit in the read ahead + EData = binary:copy(<<"a">>, MES + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% chunk with a non-empty filter + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end + ], + #{w := Wr1, r := Rd2} = run_read_ahead_tests(Tests, offset, FSize, + Wr0, Rd1), + osiris_log:close(Rd2), + osiris_log:close(Wr1) + end, FilterSizes), + ok. + %% Utility +init_reader(offset, Conf) -> + osiris_log:init_offset_reader(first, Conf); +init_reader(data, Conf) -> + osiris_log:init_data_reader({0, empty}, Conf). + +run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) -> + lists:foldl(fun(F, Acc) -> + {#{chunk_id := ChId, + num_records := NumRecords, + next_position := NextPos}, W, R0} = F(Acc), + R1 = osiris_log:update_read(R0, ChId, NumRecords, NextPos), + #{w => W, r => R1, rtype => RType, fsize => FSize} + end, #{w => Wr0, r => Rd0, rtype => RType, fsize => FSize}, Tests). + truncate_at(File, Pos) -> {ok, Fd} = file:open(File, [raw, binary, read, write]), % truncate the file so that chunk <> is missing and <> is corrupted @@ -1977,7 +2125,13 @@ delete_dir(Dir) -> ok end. +fake_chunk_bin(Blobs, Ts, Epoch, NextChId) -> + iolist_to_binary(fake_chunk(Blobs, Ts, Epoch, NextChId)). + fake_chunk(Blobs, Ts, Epoch, NextChId) -> - iolist_to_binary( - element(1, - osiris_log:make_chunk(Blobs, <<>>, 0, Ts, Epoch, NextChId, 16))). + fake_chunk(Blobs, Ts, Epoch, NextChId, ?DEFAULT_FILTER_SIZE). + +fake_chunk(Blobs, Ts, Epoch, NextChId, FSize) -> + element(1, + osiris_log:make_chunk(Blobs, <<>>, 0, Ts, Epoch, NextChId, + FSize)). From cf22d92ba0e40bea5402913fcee8134213a624bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 28 Jul 2025 13:44:20 +0200 Subject: [PATCH 3/7] Re-use read ahead data if possible --- src/osiris_log.erl | 97 +++++++++++++++++++++++++++++---------- test/osiris_log_SUITE.erl | 26 +++++++++-- 2 files changed, 94 insertions(+), 29 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4dfb03e..6c067be 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -431,7 +431,8 @@ chunk_selector :: all | user_data, position = 0 :: non_neg_integer(), filter :: undefined | osiris_bloom:mstate(), - last_data_size = undefined :: undefined | non_neg_integer()}). + last_data_size = undefined :: undefined | non_neg_integer(), + read_ahead_data = undefined :: undefined | binary()}). -record(write, {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -2899,11 +2900,10 @@ read_header0(State) -> do_read_header(#?MODULE{cfg = #cfg{directory = Dir, shared = Shared}, - mode = #read{type = RType, - chunk_selector = Selector, - next_offset = NextChId0, + mode = #read{next_offset = NextChId0, position = Pos, - last_data_size = Lds} = Read0, + last_data_size = Lds, + read_ahead_data = undefined} = Read0, current_file = CurFile, fd = Fd} = State) -> ReadAheadOffset = read_ahead_offset(Lds), @@ -2924,26 +2924,17 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir, _Reserved:24, MaybeFilterAndRest/binary>> = HeaderData0} when ?CAN_READ_AHEAD(Lds) -> - {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, - FilterSize, DataSize, - TrailerSize), - %% summing what we need to skip and send gives us the number - %% of bytes we need to read, we make sure we read it ahead - Content = case ReadAheadOffset of - RAO when RAO >= (ToSkip + ToSend) -> - %% we read everything we needed - <<_Skip:ToSkip/binary, - Ctnt:ToSend/binary, - _Rest/binary>> = MaybeFilterAndRest, - Ctnt; - _ -> - %% we did not read enough, the caller will have to do it - undefined - end, - maybe_return_header(State, HeaderData0, MaybeFilterAndRest, Content, - ChType, NumEntries, NumRecords, Timestamp, - Epoch, NextChId0, Crc, DataSize, TrailerSize, - FilterSize); + case read_ahead_chunk(HeaderData0, State) of + need_more_data -> + Read1 = Read0#read{read_ahead_data = undefined}, + maybe_return_header(State#?MODULE{mode = Read1}, + HeaderData0, MaybeFilterAndRest, undefined, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + R -> + R + end; {ok, < {error, {invalid_chunk_header, Invalid}} + end; +do_read_header(#?MODULE{mode = #read{read_ahead_data = RAD} = Read0} = State) -> + case read_ahead_chunk(RAD, State) of + need_more_data -> + %% we don't have enough data in memory + Read1 = Read0#read{read_ahead_data = undefined}, + do_read_header(State#?MODULE{mode = Read1}); + R -> + R end. +read_ahead_chunk(<> = HeaderData0, + #?MODULE{mode = #read{type = RType, + chunk_selector = Selector, + next_offset = NextChId0} = Read0} = State) -> + {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, + FilterSize, DataSize, + TrailerSize), + case byte_size(MaybeFilterAndRest) of + RAS when RAS >= (ToSkip + ToSend) -> + %% we read everything we needed + {ReadAheadData, Content} = + case MaybeFilterAndRest of + <<_Skip:ToSkip/binary, + Ctnt:ToSend/binary, + Rest/binary>> + when byte_size(Rest) > ?HEADER_SIZE_B+ ?DEFAULT_FILTER_SIZE -> + {Rest, Ctnt}; + <<_Skip:ToSkip/binary, + Ctnt:ToSend/binary, + _Rest/binary>> -> + {undefined, Ctnt} + end, + Read1 = Read0#read{read_ahead_data = ReadAheadData}, + maybe_return_header(State#?MODULE{mode = Read1}, + HeaderData0, MaybeFilterAndRest, Content, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + _ -> + need_more_data + end; +read_ahead_chunk(_, _) -> + need_more_data. + read_ahead_offset(LastDataSize) -> case LastDataSize of LastDataSize when ?CAN_READ_AHEAD(LastDataSize) -> diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index d9f98e8..8d1615f 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1936,6 +1936,21 @@ read_header_ahead_offset_reader(Config) -> end, ?assertEqual(iolist_to_binary(Expected), Content), {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + Entries1 = [binary:copy(<<"a">>, 16)], + {_, W1} = write_committed(Entries1, W0), + Entries2 = [binary:copy(<<"b">>, 32)], + {_, W2} = write_committed(Entries2, W1), + + {ok, H1, Content1, R1} = osiris_log:read_header0(R0), + [_, _, D1, _] = fake_chunk(Entries1, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D1), Content1), + + {ok, H2, Content2, R2} = osiris_log:read_header0(update_read(H1, R1)), + [_, _, D2, _] = fake_chunk(Entries2, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D2), Content2), + {H2, W2, R2} end ], @@ -2022,13 +2037,16 @@ init_reader(data, Conf) -> run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) -> lists:foldl(fun(F, Acc) -> - {#{chunk_id := ChId, - num_records := NumRecords, - next_position := NextPos}, W, R0} = F(Acc), - R1 = osiris_log:update_read(R0, ChId, NumRecords, NextPos), + {H, W, R0} = F(Acc), + R1 = update_read(H, R0), #{w => W, r => R1, rtype => RType, fsize => FSize} end, #{w => Wr0, r => Rd0, rtype => RType, fsize => FSize}, Tests). +update_read(#{chunk_id := ChId, + num_records := NumRecords, + next_position := NextPos}, R) -> + osiris_log:update_read(R, ChId, NumRecords, NextPos). + truncate_at(File, Pos) -> {ok, Fd} = file:open(File, [raw, binary, read, write]), % truncate the file so that chunk <> is missing and <> is corrupted From dbc6bd06dd0802cc7ddf07e645ff071344a44377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 28 Jul 2025 14:26:37 +0200 Subject: [PATCH 4/7] Fix dialyzer warning --- src/osiris_log.erl | 5 ++++- test/osiris_log_SUITE.erl | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 6c067be..6370b96 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2889,6 +2889,8 @@ recover_tracking(Fd, Trk0, Pos0) -> Trk0 end. +-spec read_header0(state()) -> {ok, map(), undefined | binary(), state()} | + {end_of_stream, state()}. read_header0(State) -> %% reads the next header if permitted case can_read_next(State) of @@ -3136,7 +3138,8 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, last_data_size(#?MODULE{mode = R = #read{}} = S, Lds) -> S#?MODULE{mode = R#read{last_data_size = Lds}}. -update_read(#?MODULE{mode = R0} = S, ChId, NumRecords, Pos) -> +-spec update_read(state(), offset(), offset(), non_neg_integer()) -> state(). +update_read(#?MODULE{mode = R0 = #read{}} = S, ChId, NumRecords, Pos) -> R = R0#read{next_offset = ChId + NumRecords, position = Pos}, S#?MODULE{mode = R}. diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 8d1615f..bcfe9b0 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -2042,6 +2042,7 @@ run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) -> #{w => W, r => R1, rtype => RType, fsize => FSize} end, #{w => Wr0, r => Rd0, rtype => RType, fsize => FSize}, Tests). +-spec update_read(map(), osiris_log:state()) -> osiris_log:state(). update_read(#{chunk_id := ChId, num_records := NumRecords, next_position := NextPos}, R) -> From 43bbf410cb0ad5c33ad62042ba01b6f5d57c0762 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 29 Jul 2025 13:16:06 +0100 Subject: [PATCH 5/7] Refactoring --- src/osiris_log.erl | 165 ++++++++++++++++---------------------- test/osiris_log_SUITE.erl | 7 +- 2 files changed, 74 insertions(+), 98 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 6370b96..134f0c1 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -108,8 +108,6 @@ -define(SKIP_SEARCH_JUMP, 2048). -define(READ_AHEAD_LIMIT, 4096). --define(CAN_READ_AHEAD(LastDataSize), LastDataSize =/= undefined andalso - LastDataSize =< ?READ_AHEAD_LIMIT). %% Specification of the Log format. %% @@ -2895,66 +2893,24 @@ read_header0(State) -> %% reads the next header if permitted case can_read_next(State) of true -> - do_read_header(State); + read_header_with_ra(State); false -> {end_of_stream, State} end. -do_read_header(#?MODULE{cfg = #cfg{directory = Dir, - shared = Shared}, - mode = #read{next_offset = NextChId0, - position = Pos, - last_data_size = Lds, - read_ahead_data = undefined} = Read0, - current_file = CurFile, - fd = Fd} = State) -> - ReadAheadOffset = read_ahead_offset(Lds), - - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadOffset) of - {ok, <> = HeaderData0} - when ?CAN_READ_AHEAD(Lds) -> - case read_ahead_chunk(HeaderData0, State) of - need_more_data -> - Read1 = Read0#read{read_ahead_data = undefined}, - maybe_return_header(State#?MODULE{mode = Read1}, - HeaderData0, MaybeFilterAndRest, undefined, - ChType, NumEntries, NumRecords, Timestamp, - Epoch, NextChId0, Crc, DataSize, TrailerSize, - FilterSize); - R -> - R - end; - {ok, <> = HeaderData0} -> - maybe_return_header(State, HeaderData0, MaybeFilter, undefined, - ChType, NumEntries, NumRecords, Timestamp, - Epoch, NextChId0, Crc, DataSize, - TrailerSize, FilterSize); +read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, + shared = Shared}, + mode = #read{next_offset = NextChId0, + position = Pos, + last_data_size = Lds, + read_ahead_data = undefined} = Read0, + current_file = CurFile, + fd = Fd} = State) -> + ReadAheadSize = read_ahead_size(Lds), + + case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadSize) of + {ok, Bin} when byte_size(Bin) >= ?HEADER_SIZE_B -> + parse_header(Bin, State); {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> %% partial header read %% this can happen when a replica reader reads ahead @@ -3008,45 +2964,55 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir, Invalid -> {error, {invalid_chunk_header, Invalid}} end; -do_read_header(#?MODULE{mode = #read{read_ahead_data = RAD} = Read0} = State) -> - case read_ahead_chunk(RAD, State) of - need_more_data -> - %% we don't have enough data in memory +read_header_with_ra(#?MODULE{mode = #read{last_data_size = Lds, + read_ahead_data = RAD} = Read0} = State) -> + case byte_size(RAD) > ?HEADER_SIZE_B + Lds of + true -> + case parse_header(RAD, State) of + need_more_data -> + %% we don't have enough data in memory + Read1 = Read0#read{read_ahead_data = undefined}, + read_header_with_ra(State#?MODULE{mode = Read1}); + Result -> + Result + end; + false -> Read1 = Read0#read{read_ahead_data = undefined}, - do_read_header(State#?MODULE{mode = Read1}); - R -> - R + read_header_with_ra(State#?MODULE{mode = Read1}) end. -read_ahead_chunk(<> = HeaderData0, - #?MODULE{mode = #read{type = RType, - chunk_selector = Selector, - next_offset = NextChId0} = Read0} = State) -> + +parse_header(<> = HeaderData0, + #?MODULE{mode = #read{type = RType, + chunk_selector = Selector, + next_offset = NextChId0} = Read0} = State) -> {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, - FilterSize, DataSize, - TrailerSize), - case byte_size(MaybeFilterAndRest) of - RAS when RAS >= (ToSkip + ToSend) -> + FilterSize, DataSize, + TrailerSize), + case byte_size(MaybeFilterAndRest) >= (ToSkip + ToSend) of + true -> %% we read everything we needed {ReadAheadData, Content} = case MaybeFilterAndRest of <<_Skip:ToSkip/binary, Ctnt:ToSend/binary, Rest/binary>> - when byte_size(Rest) > ?HEADER_SIZE_B+ ?DEFAULT_FILTER_SIZE -> + when byte_size(Rest) > ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE -> + %% remained is larger than 64 bytes so worth keeping + %% around {Rest, Ctnt}; <<_Skip:ToSkip/binary, Ctnt:ToSend/binary, @@ -3059,20 +3025,27 @@ read_ahead_chunk(< - need_more_data + false -> + %% having to throw away the read ahead data here + Read1 = Read0#read{read_ahead_data = undefined}, + maybe_return_header(State#?MODULE{mode = Read1}, + HeaderData0, MaybeFilterAndRest, undefined, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize) end; -read_ahead_chunk(_, _) -> +parse_header(_, _) -> need_more_data. -read_ahead_offset(LastDataSize) -> - case LastDataSize of - LastDataSize when ?CAN_READ_AHEAD(LastDataSize) -> +read_ahead_size(LastDataSize) -> + case LastDataSize =/= undefined andalso + LastDataSize =< ?READ_AHEAD_LIMIT of + true -> %% the previous chunk was small, try to read %% the next chunk fully in one read - %% this can save us the sendfile call later + %% this can save us a system call later ?DEFAULT_FILTER_SIZE + ?READ_AHEAD_LIMIT; - _ -> + false -> %% optimistically read the default filter size. %% this amounts to 64 bytes with the header (small binary) %% and it may save us a syscall reading the filter @@ -3087,7 +3060,6 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, fd = Fd} = State, HeaderData0, MaybeFilter, Content, ChType, NumEntries, NumRecords, Timestamp, Epoch, NextChId0, Crc, DataSize, TrailerSize, FilterSize) -> - <> = HeaderData0, ChunkFilter = case MaybeFilter of <> -> @@ -3112,6 +3084,7 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, case osiris_bloom:is_match(ChunkFilter, Filter) of true -> + <> = HeaderData0, {ok, #{chunk_id => NextChId0, epoch => Epoch, type => ChType, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index bcfe9b0..97bd4fa 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1887,7 +1887,10 @@ read_header_ahead_offset_reader(Config) -> [ fun(#{w := W0, r := R0}) -> %% no previous chunk, so not reading ahead - {_, W1} = write_committed([<<"hi">>, <<"ho">>], W0), + %% the messages are large enough to be larger than the default + %% filter size which is always read ahead (16 bytes) + {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), + ct:pal("R0 ~p", [R0]), {ok, H, Content, R1} = osiris_log:read_header0(R0), ?assertEqual(undefined, Content), {H, W1, R1} @@ -1895,7 +1898,7 @@ read_header_ahead_offset_reader(Config) -> fun(#{w := W0, r := R0}) -> %% previous chunk too large to read ahead R1 = osiris_log:last_data_size(R0, RAL * 2), - {_, W1} = write_committed([<<"hi">>, <<"ho">>], W0), + {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), {ok, H, Content, R2} = osiris_log:read_header0(R1), ?assertEqual(undefined, Content), {H, W1, R2} From 3ec4665d01cf807469dc64c3cae90a2a6b6c0d99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 1 Aug 2025 09:27:56 +0000 Subject: [PATCH 6/7] Use actual filter size in read ahead Start with default filter size and use actual size after non-zero value has been read. Attempt to read ahead from parse_header instead of returning the header and no data. The decision is based on the availability of previous read ahead data. --- src/osiris_log.erl | 63 ++++++++++++++++-------- test/osiris_log_SUITE.erl | 101 ++++++++++++++++++++++++++++++-------- 2 files changed, 124 insertions(+), 40 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 134f0c1..dd6560e 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -66,7 +66,7 @@ make_chunk/7, orphaned_segments/1, read_header0/1, - last_data_size/2, + read_ahead_hints/3, update_read/4 ]). @@ -430,6 +430,7 @@ position = 0 :: non_neg_integer(), filter :: undefined | osiris_bloom:mstate(), last_data_size = undefined :: undefined | non_neg_integer(), + filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(), read_ahead_data = undefined :: undefined | binary()}). -record(write, {type = writer :: writer | acceptor, @@ -1665,7 +1666,8 @@ send_file(Sock, State) -> send_file(Sock, #?MODULE{mode = #read{type = RType, chunk_selector = Selector, - transport = Transport}} = State0, + transport = Transport, + filter_size = RaFs}} = State0, Callback) -> case catch read_header0(State0) of {ok, #{type := ChType, @@ -1687,7 +1689,9 @@ send_file(Sock, true -> Read = Read0#read{next_offset = ChId + NumRecords, position = NextPos, - last_data_size = DataSize}, + last_data_size = DataSize, + filter_size = read_ahead_fsize(RaFs, + FilterSize)}, case MaybeData of undefined -> %% read header @@ -2903,10 +2907,11 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, mode = #read{next_offset = NextChId0, position = Pos, last_data_size = Lds, + filter_size = FilterSize, read_ahead_data = undefined} = Read0, current_file = CurFile, fd = Fd} = State) -> - ReadAheadSize = read_ahead_size(Lds), + ReadAheadSize = read_ahead_size(Lds, FilterSize), case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadSize) of {ok, Bin} when byte_size(Bin) >= ?HEADER_SIZE_B -> @@ -2981,7 +2986,6 @@ read_header_with_ra(#?MODULE{mode = #read{last_data_size = Lds, read_header_with_ra(State#?MODULE{mode = Read1}) end. - parse_header(<> = HeaderData0, #?MODULE{mode = #read{type = RType, chunk_selector = Selector, - next_offset = NextChId0} = Read0} = State) -> + next_offset = NextChId0, + read_ahead_data = RAD, + filter_size = LFS} = Read0} = State) -> {ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType, FilterSize, DataSize, TrailerSize), @@ -3010,8 +3016,8 @@ parse_header(<> - when byte_size(Rest) > ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE -> - %% remained is larger than 64 bytes so worth keeping + when byte_size(Rest) > ?HEADER_SIZE_B + LFS -> + %% remaining is larger than 64 bytes so worth keeping %% around {Rest, Ctnt}; <<_Skip:ToSkip/binary, @@ -3026,25 +3032,38 @@ parse_header(< - %% having to throw away the read ahead data here - Read1 = Read0#read{read_ahead_data = undefined}, - maybe_return_header(State#?MODULE{mode = Read1}, - HeaderData0, MaybeFilterAndRest, undefined, - ChType, NumEntries, NumRecords, Timestamp, - Epoch, NextChId0, Crc, DataSize, TrailerSize, - FilterSize) + case RAD of + undefined -> + %% we just read the data, we could not read ahead the whole chunk + %% let's move on to see whether the chunk should be filtered or not + maybe_return_header(State, + HeaderData0, MaybeFilterAndRest, undefined, + ChType, NumEntries, NumRecords, Timestamp, + Epoch, NextChId0, Crc, DataSize, TrailerSize, + FilterSize); + _ -> + %% the data were from a previous read + %% we can ditch them and try to read the chunk ahead + need_more_data + end end; parse_header(_, _) -> need_more_data. -read_ahead_size(LastDataSize) -> +%% keep the previous value if the current one is 0 (i.e. no filter in the chunk) +read_ahead_fsize(Previous, 0) -> + Previous; +read_ahead_fsize(_, Current) -> + Current. + +read_ahead_size(LastDataSize, FilterSize) -> case LastDataSize =/= undefined andalso LastDataSize =< ?READ_AHEAD_LIMIT of true -> %% the previous chunk was small, try to read %% the next chunk fully in one read %% this can save us a system call later - ?DEFAULT_FILTER_SIZE + ?READ_AHEAD_LIMIT; + FilterSize + ?READ_AHEAD_LIMIT; false -> %% optimistically read the default filter size. %% this amounts to 64 bytes with the header (small binary) @@ -3107,10 +3126,14 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef}, read_header0(State#?MODULE{mode = Read}) end. --spec last_data_size(state(), non_neg_integer()) -> state(). -last_data_size(#?MODULE{mode = R = #read{}} = S, Lds) -> - S#?MODULE{mode = R#read{last_data_size = Lds}}. +%% for testing +-spec read_ahead_hints(state(), non_neg_integer(), osiris_bloom:filter_size()) -> + state(). +read_ahead_hints(#?MODULE{mode = R = #read{}} = S, Lds, FilterSize) -> + S#?MODULE{mode = R#read{last_data_size = Lds, + filter_size = FilterSize}}. +%% for testing -spec update_read(state(), offset(), offset(), non_neg_integer()) -> state(). update_read(#?MODULE{mode = R0 = #read{}} = S, ChId, NumRecords, Pos) -> R = R0#read{next_offset = ChId + NumRecords, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 97bd4fa..f887079 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1890,24 +1890,23 @@ read_header_ahead_offset_reader(Config) -> %% the messages are large enough to be larger than the default %% filter size which is always read ahead (16 bytes) {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), - ct:pal("R0 ~p", [R0]), {ok, H, Content, R1} = osiris_log:read_header0(R0), ?assertEqual(undefined, Content), {H, W1, R1} end, - fun(#{w := W0, r := R0}) -> + fun(#{w := W0, r := R0, fsize := FSize}) -> %% previous chunk too large to read ahead - R1 = osiris_log:last_data_size(R0, RAL * 2), + R1 = osiris_log:read_ahead_hints(R0, RAL * 2, FSize), {_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0), {ok, H, Content, R2} = osiris_log:read_header0(R1), ?assertEqual(undefined, Content), {H, W1, R2} end, - fun(#{w := W0, r := R0}) -> + fun(#{w := W0, r := R0, fsize := FSize}) -> %% trigger reading ahead by setting a small value for the %% last chunk read. %% this setting stays the same for the rest of the test - R1 = osiris_log:last_data_size(R0, 1), + R1 = osiris_log:read_ahead_hints(R0, 1, FSize), Entries = [<<"foo">>, <<"bar">>], {_, W1} = write_committed(Entries, W0), {ok, H, Content, R2} = osiris_log:read_header0(R1), @@ -1941,10 +1940,13 @@ read_header_ahead_offset_reader(Config) -> {H, W1, R1} end, fun(#{w := W0, r := R0}) -> - Entries1 = [binary:copy(<<"a">>, 16)], + Entries1 = [binary:copy(<<"a">>, 2000)], {_, W1} = write_committed(Entries1, W0), - Entries2 = [binary:copy(<<"b">>, 32)], + Entries2 = [binary:copy(<<"b">>, 2000)], {_, W2} = write_committed(Entries2, W1), + %% this one is too big to be read ahead fully + Entries3 = [binary:copy(<<"c">>, 5000)], + {_, W3} = write_committed(Entries3, W2), {ok, H1, Content1, R1} = osiris_log:read_header0(R0), [_, _, D1, _] = fake_chunk(Entries1, ?LINE, 1, 100), @@ -1953,7 +1955,11 @@ read_header_ahead_offset_reader(Config) -> {ok, H2, Content2, R2} = osiris_log:read_header0(update_read(H1, R1)), [_, _, D2, _] = fake_chunk(Entries2, ?LINE, 1, 100), ?assertEqual(iolist_to_binary(D2), Content2), - {H2, W2, R2} + + {ok, H3, Content3, R3} = osiris_log:read_header0(update_read(H2, R2)), + ?assertEqual(undefined, Content3), + + {H3, W3, R3} end ], @@ -1972,7 +1978,7 @@ read_header_ahead_offset_reader(Config) -> FSize, Wr0, Rd0), osiris_log:close(Rd1), osiris_log:close(Wr1) - end, [{FSize, RType} || FSize <- FilterSizes, RType <- [offset, data]]), + end, [{FSize, RType} || FSize <- FilterSizes, RType <- [data, offset]]), ok. read_header_ahead_offset_reader_filter(Config) -> @@ -1992,20 +1998,75 @@ read_header_ahead_offset_reader_filter(Config) -> Shared = osiris_log:get_shared(Wr0), Conf = Conf1#{shared => Shared}, {ok, Rd0} = osiris_log:init_offset_reader(first, Conf), - Rd1 = osiris_log:last_data_size(Rd0, 1), - %% we always read ahead the default filter size. - %% with a larger-than-default filter, we must consider - %% the extra bytes that belong to the filter, - %% that is (actual filter size) - (default filter size) - %% this reduces the max entry size we can read ahead - MES = MaxEntrySize - (FSize - DFS), + %% we start by using the default filter size in the read ahead hints + Rd1 = osiris_log:read_ahead_hints(Rd0, 1, DFS), + %% compute the max entry size + %% (meaning we don't read ahead enough above this entry size) + %% first we don't know the actual filter size in the stream, + %% so we assume the default filter size + %% this "reduces" the max size of data we can read in the case + %% of a larger-than-default filter size, because of the extra + %% bytes that belong to the filter + MES1 = MaxEntrySize - (FSize - DFS), + %% then the max entry becomes accurate, whatever the actual filter size + MES2 = MaxEntrySize, Tests = [ fun(#{w := W0, r := R0}) -> - %% chunk with a non-empty filter %% data do not fit in the read ahead - EData = binary:copy(<<"a">>, MES + 1), + EData = binary:copy(<<"a">>, MES1 + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + ?assertEqual(undefined, Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R}) -> + %% assume we are now using the correct filter size + %% (this setting stays the same for the next tests) + R0 = osiris_log:read_ahead_hints(R, 1, FSize), + %% data just bigger than the first limit + EData = binary:copy(<<"a">>, MES1 + 1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + case FSize =:= DFS of + true -> + %% default filter size: still does not fit + ?assertEqual(undefined, Content); + false -> + %% with the correct filter size, we now read + %% a bit further than with the first limit + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content) + end, + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% data exactly fits in the read ahead + EData = binary:copy(<<"a">>, MES1), + Entries = [{<<"banana">>, EData}], + {_, W1} = write_committed(Entries, W0), + {ok, H, Content, R1} = osiris_log:read_header0(R0), + [_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100), + ?assertEqual(iolist_to_binary(D), Content), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% we use the "new" max entry size + %% data do not fit in the read ahead + EData = binary:copy(<<"a">>, MES2 + 1), Entries = [{<<"banana">>, EData}], {_, W1} = write_committed(Entries, W0), {ok, H, Content, R1} = osiris_log:read_header0(R0), @@ -2013,9 +2074,9 @@ read_header_ahead_offset_reader_filter(Config) -> {H, W1, R1} end, fun(#{w := W0, r := R0}) -> - %% chunk with a non-empty filter + %% we use the "new" max entry size %% data exactly fits in the read ahead - EData = binary:copy(<<"a">>, MES), + EData = binary:copy(<<"a">>, MES2), Entries = [{<<"banana">>, EData}], {_, W1} = write_committed(Entries, W0), {ok, H, Content, R1} = osiris_log:read_header0(R0), From b561e82a2a96620ca7214614a66af15219426772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 4 Aug 2025 14:46:12 +0000 Subject: [PATCH 7/7] Use read ahead data in chunk iterator if available --- src/osiris_log.erl | 42 ++++++++++++++-------- test/osiris_log_SUITE.erl | 76 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 99 insertions(+), 19 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index dd6560e..f6fbf6d 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. %% -module(osiris_log). @@ -1443,8 +1443,9 @@ chunk_iterator(State) -> {error, {invalid_chunk_header, term()}}. chunk_iterator(#?MODULE{cfg = #cfg{}, mode = #read{type = RType, - chunk_selector = Selector} - } = State0, CreditHint) + chunk_selector = Selector, + filter_size = RaFs}} = State0, + CreditHint) when (is_integer(CreditHint) andalso CreditHint > 0) orelse is_atom(CreditHint) -> %% reads the next chunk of unparsed chunk data @@ -1458,15 +1459,20 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, data_size := DataSize, filter_size := FilterSize, position := Pos, - next_position := NextPos} = Header, _, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> - State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, - position = NextPos}}, + next_position := NextPos} = Header, MaybeData, + #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read1} = State1} -> case needs_handling(RType, Selector, ChType) of true -> + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos, + last_data_size = DataSize, + filter_size = read_ahead_fsize(RaFs, + FilterSize)}, + + State = State1#?MODULE{mode = Read}, DataPos = Pos + ?HEADER_SIZE_B + FilterSize, Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint, - DataSize, NumEntries), + DataSize, NumEntries, MaybeData), Iterator = #iterator{fd = Fd, data = Data, next_offset = ChId, @@ -1475,6 +1481,9 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, {ok, Header, Iterator, State}; false -> %% skip + Read = Read1#read{next_offset = ChId + NumRecords, + position = NextPos}, + State = State1#?MODULE{mode = Read}, chunk_iterator(State, CreditHint) end; Other -> @@ -1681,13 +1690,13 @@ send_file(Sock, header_data := HeaderData} = Header, MaybeData, #?MODULE{fd = Fd, - mode = #read{next_offset = ChId} = Read0} = State1} -> + mode = #read{next_offset = ChId} = Read1} = State1} -> %% only sendfile if either the reader is a data reader %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - Read = Read0#read{next_offset = ChId + NumRecords, + Read = Read1#read{next_offset = ChId + NumRecords, position = NextPos, last_data_size = DataSize, filter_size = read_ahead_fsize(RaFs, @@ -1733,7 +1742,7 @@ send_file(Sock, end end; false -> - Read = Read0#read{next_offset = ChId + NumRecords, + Read = Read1#read{next_offset = ChId + NumRecords, position = NextPos}, State = State1#?MODULE{mode = Read}, %% skip chunk and recurse @@ -3362,16 +3371,21 @@ dump_crc_check(Fd) -> dump_crc_check(Fd) end. -iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> +iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, _Credit, DataSize, _NumEntries, RAD) + when RAD =/= undefined -> + <> = RAD, + Data; +iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries, undefined) -> + %% FIXME is it about credit = 1 or number of entries = 1? %% no point reading ahead if there is only one entry to be read at this %% time undefined; -iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) +iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries, undefined) when Credit == all orelse NumEntries == 1 -> {ok, Data} = file:pread(Fd, Pos, DataSize), validate_crc(ChunkId, Crc, Data), Data; -iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> +iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries, undefined) -> %% read ahead, assumes roughly equal entry sizes which may not be the case %% TODO round up to nearest block? %% We can only practically validate CRC if we read the whole data diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index f887079..c807a58 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -38,6 +38,7 @@ all_tests() -> subbatch, subbatch_compressed, iterator_read_chunk, + iterator_read_chunk_with_read_ahead, iterator_read_chunk_mixed_sizes_with_credit, read_chunk_parsed, read_chunk_parsed_2, @@ -344,14 +345,16 @@ iterator_read_chunk(Config) -> EntriesRev = [Batch, <<"ho">>, {<<"filter">>, <<"hi">>}], - {ChId, _S1} = write_committed(EntriesRev, S0), - {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1), + {ChId, S1} = write_committed(EntriesRev, S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), HoOffs = ChId + 1, BatchOffs = ChId + 2, {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), {{BatchOffs, Batch}, I} = osiris_log:iterator_next(I2), ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + osiris_log:close(R2), + osiris_log:close(S1), ok. iterator_read_chunk_mixed_sizes_with_credit(Config) -> @@ -365,16 +368,79 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) -> EntriesRev = [Big, <<"ho">>, {<<"filter">>, <<"hi">>}], - {ChId, _S1} = write_committed(EntriesRev, S0), + {ChId, S1} = write_committed(EntriesRev, S0), %% this is a less than ideal case where we have one large and two very - %% small entries inthe same batch. The read ahead only - {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1, 2), + %% small entries in the same batch. We read ahead only the 2 first entries. + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1, 2), HoOffs = ChId + 1, BigOffs = ChId + 2, {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), {{BigOffs, Big}, I} = osiris_log:iterator_next(I2), ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + osiris_log:close(R2), + osiris_log:close(S1), + ok. + +iterator_read_chunk_with_read_ahead(Config) -> + %% the test makes sure reading ahead on header reading does not break + %% the iterator + RAL = 4096, %% read ahead limit + Conf = ?config(osiris_conf, Config), + W = osiris_log:init(Conf), + Shared = osiris_log:get_shared(W), + RConf = Conf#{shared => Shared}, + {ok, R} = osiris_log:init_offset_reader(0, RConf), + Tests = + [ + fun(#{w := W0, r := R0}) -> + %% first chunk, there won't be any data size hints in the reader + EntriesRev = [<<"hi">>, <<"ho">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"ho">>}, I1} = osiris_log:iterator_next(I0), + {{_, <<"hi">>}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one will be read ahead + EntriesRev = [<<"foo">>, <<"bar">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"bar">>}, I1} = osiris_log:iterator_next(I0), + {{_, <<"foo">>}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one will be read ahead + E1 = rand:bytes(RAL - 100), + EntriesRev = [E1 , <<"aaa">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0), + {{_, E1}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end, + fun(#{w := W0, r := R0}) -> + %% this one is too big to be read ahead + E1 = rand:bytes(RAL * 2), + EntriesRev = [E1 , <<"aaa">>], + {_, W1} = write_committed(EntriesRev, W0), + {ok, H, I0, R1} = osiris_log:chunk_iterator(R0), + {{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0), + {{_, E1}, I2} = osiris_log:iterator_next(I1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)), + {H, W1, R1} + end + ], + + #{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, offset, + ?DEFAULT_FILTER_SIZE, W, R), + osiris_log:close(Rd1), + osiris_log:close(Wr1), ok. read_chunk_parsed(Config) ->