Skip to content

Commit c16bed9

Browse files
committed
Refactoring
1 parent 19ca741 commit c16bed9

File tree

2 files changed

+62
-94
lines changed

2 files changed

+62
-94
lines changed

src/osiris_log.erl

Lines changed: 57 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@
108108

109109
-define(SKIP_SEARCH_JUMP, 2048).
110110
-define(READ_AHEAD_LIMIT, 4096).
111-
-define(CAN_READ_AHEAD(LastDataSize), LastDataSize =/= undefined andalso
112-
LastDataSize =< ?READ_AHEAD_LIMIT).
113111

114112
%% Specification of the Log format.
115113
%%
@@ -2895,66 +2893,24 @@ read_header0(State) ->
28952893
%% reads the next header if permitted
28962894
case can_read_next(State) of
28972895
true ->
2898-
do_read_header(State);
2896+
read_header_with_ra(State);
28992897
false ->
29002898
{end_of_stream, State}
29012899
end.
29022900

2903-
do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
2904-
shared = Shared},
2905-
mode = #read{next_offset = NextChId0,
2906-
position = Pos,
2907-
last_data_size = Lds,
2908-
read_ahead_data = undefined} = Read0,
2909-
current_file = CurFile,
2910-
fd = Fd} = State) ->
2911-
ReadAheadOffset = read_ahead_offset(Lds),
2912-
2913-
case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadOffset) of
2914-
{ok, <<?MAGIC:4/unsigned,
2915-
?VERSION:4/unsigned,
2916-
ChType:8/unsigned,
2917-
NumEntries:16/unsigned,
2918-
NumRecords:32/unsigned,
2919-
Timestamp:64/signed,
2920-
Epoch:64/unsigned,
2921-
NextChId0:64/unsigned,
2922-
Crc:32/integer,
2923-
DataSize:32/unsigned,
2924-
TrailerSize:32/unsigned,
2925-
FilterSize:8/unsigned,
2926-
_Reserved:24,
2927-
MaybeFilterAndRest/binary>> = HeaderData0}
2928-
when ?CAN_READ_AHEAD(Lds) ->
2929-
case read_ahead_chunk(HeaderData0, State) of
2930-
need_more_data ->
2931-
Read1 = Read0#read{read_ahead_data = undefined},
2932-
maybe_return_header(State#?MODULE{mode = Read1},
2933-
HeaderData0, MaybeFilterAndRest, undefined,
2934-
ChType, NumEntries, NumRecords, Timestamp,
2935-
Epoch, NextChId0, Crc, DataSize, TrailerSize,
2936-
FilterSize);
2937-
R ->
2938-
R
2939-
end;
2940-
{ok, <<?MAGIC:4/unsigned,
2941-
?VERSION:4/unsigned,
2942-
ChType:8/unsigned,
2943-
NumEntries:16/unsigned,
2944-
NumRecords:32/unsigned,
2945-
Timestamp:64/signed,
2946-
Epoch:64/unsigned,
2947-
NextChId0:64/unsigned,
2948-
Crc:32/integer,
2949-
DataSize:32/unsigned,
2950-
TrailerSize:32/unsigned,
2951-
FilterSize:8/unsigned,
2952-
_Reserved:24,
2953-
MaybeFilter/binary>> = HeaderData0} ->
2954-
maybe_return_header(State, HeaderData0, MaybeFilter, undefined,
2955-
ChType, NumEntries, NumRecords, Timestamp,
2956-
Epoch, NextChId0, Crc, DataSize,
2957-
TrailerSize, FilterSize);
2901+
read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir,
2902+
shared = Shared},
2903+
mode = #read{next_offset = NextChId0,
2904+
position = Pos,
2905+
last_data_size = Lds,
2906+
read_ahead_data = undefined} = Read0,
2907+
current_file = CurFile,
2908+
fd = Fd} = State) ->
2909+
ReadAheadSize = read_ahead_size(Lds),
2910+
2911+
case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadSize) of
2912+
{ok, Bin} when byte_size(Bin) >= ?HEADER_SIZE_B ->
2913+
parse_header(Bin, State);
29582914
{ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B ->
29592915
%% partial header read
29602916
%% this can happen when a replica reader reads ahead
@@ -3008,45 +2964,47 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
30082964
Invalid ->
30092965
{error, {invalid_chunk_header, Invalid}}
30102966
end;
3011-
do_read_header(#?MODULE{mode = #read{read_ahead_data = RAD} = Read0} = State) ->
3012-
case read_ahead_chunk(RAD, State) of
2967+
read_header_with_ra(#?MODULE{mode = #read{read_ahead_data = RAD} = Read0} = State) ->
2968+
case parse_header(RAD, State) of
30132969
need_more_data ->
30142970
%% we don't have enough data in memory
30152971
Read1 = Read0#read{read_ahead_data = undefined},
3016-
do_read_header(State#?MODULE{mode = Read1});
3017-
R ->
3018-
R
2972+
read_header_with_ra(State#?MODULE{mode = Read1});
2973+
Result ->
2974+
Result
30192975
end.
30202976

3021-
read_ahead_chunk(<<?MAGIC:4/unsigned,
3022-
?VERSION:4/unsigned,
3023-
ChType:8/unsigned,
3024-
NumEntries:16/unsigned,
3025-
NumRecords:32/unsigned,
3026-
Timestamp:64/signed,
3027-
Epoch:64/unsigned,
3028-
NextChId0:64/unsigned,
3029-
Crc:32/integer,
3030-
DataSize:32/unsigned,
3031-
TrailerSize:32/unsigned,
3032-
FilterSize:8/unsigned,
3033-
_Reserved:24,
3034-
MaybeFilterAndRest/binary>> = HeaderData0,
3035-
#?MODULE{mode = #read{type = RType,
3036-
chunk_selector = Selector,
3037-
next_offset = NextChId0} = Read0} = State) ->
2977+
parse_header(<<?MAGIC:4/unsigned,
2978+
?VERSION:4/unsigned,
2979+
ChType:8/unsigned,
2980+
NumEntries:16/unsigned,
2981+
NumRecords:32/unsigned,
2982+
Timestamp:64/signed,
2983+
Epoch:64/unsigned,
2984+
NextChId0:64/unsigned,
2985+
Crc:32/integer,
2986+
DataSize:32/unsigned,
2987+
TrailerSize:32/unsigned,
2988+
FilterSize:8/unsigned,
2989+
_Reserved:24,
2990+
MaybeFilterAndRest/binary>> = HeaderData0,
2991+
#?MODULE{mode = #read{type = RType,
2992+
chunk_selector = Selector,
2993+
next_offset = NextChId0} = Read0} = State) ->
30382994
{ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType,
3039-
FilterSize, DataSize,
3040-
TrailerSize),
3041-
case byte_size(MaybeFilterAndRest) of
3042-
RAS when RAS >= (ToSkip + ToSend) ->
2995+
FilterSize, DataSize,
2996+
TrailerSize),
2997+
case byte_size(MaybeFilterAndRest) >= (ToSkip + ToSend) of
2998+
true ->
30432999
%% we read everything we needed
30443000
{ReadAheadData, Content} =
30453001
case MaybeFilterAndRest of
30463002
<<_Skip:ToSkip/binary,
30473003
Ctnt:ToSend/binary,
30483004
Rest/binary>>
3049-
when byte_size(Rest) > ?HEADER_SIZE_B+ ?DEFAULT_FILTER_SIZE ->
3005+
when byte_size(Rest) > ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE ->
3006+
%% remained is larger than 64 bytes so worth keeping
3007+
%% around
30503008
{Rest, Ctnt};
30513009
<<_Skip:ToSkip/binary,
30523010
Ctnt:ToSend/binary,
@@ -3059,18 +3017,25 @@ read_ahead_chunk(<<?MAGIC:4/unsigned,
30593017
ChType, NumEntries, NumRecords, Timestamp,
30603018
Epoch, NextChId0, Crc, DataSize, TrailerSize,
30613019
FilterSize);
3062-
_ ->
3063-
need_more_data
3020+
false ->
3021+
%% having to throw away the read ahead data here
3022+
Read1 = Read0#read{read_ahead_data = undefined},
3023+
maybe_return_header(State#?MODULE{mode = Read1},
3024+
HeaderData0, MaybeFilterAndRest, undefined,
3025+
ChType, NumEntries, NumRecords, Timestamp,
3026+
Epoch, NextChId0, Crc, DataSize, TrailerSize,
3027+
FilterSize)
30643028
end;
3065-
read_ahead_chunk(_, _) ->
3029+
parse_header(_, _) ->
30663030
need_more_data.
30673031

3068-
read_ahead_offset(LastDataSize) ->
3069-
case LastDataSize of
3070-
LastDataSize when ?CAN_READ_AHEAD(LastDataSize) ->
3032+
read_ahead_size(LastDataSize) ->
3033+
case LastDataSize =/= undefined andalso
3034+
LastDataSize =< ?READ_AHEAD_LIMIT of
3035+
true ->
30713036
%% the previous chunk was small, try to read
30723037
%% the next chunk fully in one read
3073-
%% this can save us the sendfile call later
3038+
%% this can save us a system call later
30743039
?DEFAULT_FILTER_SIZE + ?READ_AHEAD_LIMIT;
30753040
_ ->
30763041
%% optimistically read the default filter size.

test/osiris_log_SUITE.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,15 +1887,18 @@ read_header_ahead_offset_reader(Config) ->
18871887
[
18881888
fun(#{w := W0, r := R0}) ->
18891889
%% no previous chunk, so not reading ahead
1890-
{_, W1} = write_committed([<<"hi">>, <<"ho">>], W0),
1890+
%% the messages are large enough to be larger than the default
1891+
%% filter size which is always read ahead (16 bytes)
1892+
{_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0),
1893+
ct:pal("R0 ~p", [R0]),
18911894
{ok, H, Content, R1} = osiris_log:read_header0(R0),
18921895
?assertEqual(undefined, Content),
18931896
{H, W1, R1}
18941897
end,
18951898
fun(#{w := W0, r := R0}) ->
18961899
%% previous chunk too large to read ahead
18971900
R1 = osiris_log:last_data_size(R0, RAL * 2),
1898-
{_, W1} = write_committed([<<"hi">>, <<"ho">>], W0),
1901+
{_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0),
18991902
{ok, H, Content, R2} = osiris_log:read_header0(R1),
19001903
?assertEqual(undefined, Content),
19011904
{H, W1, R2}

0 commit comments

Comments
 (0)