108
108
109
109
-define (SKIP_SEARCH_JUMP , 2048 ).
110
110
-define (READ_AHEAD_LIMIT , 4096 ).
111
- -define (CAN_READ_AHEAD (LastDataSize ), LastDataSize =/= undefined andalso
112
- LastDataSize =< ? READ_AHEAD_LIMIT ).
113
111
114
112
% % Specification of the Log format.
115
113
% %
@@ -2895,66 +2893,24 @@ read_header0(State) ->
2895
2893
% % reads the next header if permitted
2896
2894
case can_read_next (State ) of
2897
2895
true ->
2898
- do_read_header (State );
2896
+ read_header_with_ra (State );
2899
2897
false ->
2900
2898
{end_of_stream , State }
2901
2899
end .
2902
2900
2903
- do_read_header (#? MODULE {cfg = # cfg {directory = Dir ,
2904
- shared = Shared },
2905
- mode = # read {next_offset = NextChId0 ,
2906
- position = Pos ,
2907
- last_data_size = Lds ,
2908
- read_ahead_data = undefined } = Read0 ,
2909
- current_file = CurFile ,
2910
- fd = Fd } = State ) ->
2911
- ReadAheadOffset = read_ahead_offset (Lds ),
2912
-
2913
- case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadOffset ) of
2914
- {ok , <<? MAGIC :4 /unsigned ,
2915
- ? VERSION :4 /unsigned ,
2916
- ChType :8 /unsigned ,
2917
- NumEntries :16 /unsigned ,
2918
- NumRecords :32 /unsigned ,
2919
- Timestamp :64 /signed ,
2920
- Epoch :64 /unsigned ,
2921
- NextChId0 :64 /unsigned ,
2922
- Crc :32 /integer ,
2923
- DataSize :32 /unsigned ,
2924
- TrailerSize :32 /unsigned ,
2925
- FilterSize :8 /unsigned ,
2926
- _Reserved :24 ,
2927
- MaybeFilterAndRest /binary >> = HeaderData0 }
2928
- when ? CAN_READ_AHEAD (Lds ) ->
2929
- case read_ahead_chunk (HeaderData0 , State ) of
2930
- need_more_data ->
2931
- Read1 = Read0 # read {read_ahead_data = undefined },
2932
- maybe_return_header (State #? MODULE {mode = Read1 },
2933
- HeaderData0 , MaybeFilterAndRest , undefined ,
2934
- ChType , NumEntries , NumRecords , Timestamp ,
2935
- Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2936
- FilterSize );
2937
- R ->
2938
- R
2939
- end ;
2940
- {ok , <<? MAGIC :4 /unsigned ,
2941
- ? VERSION :4 /unsigned ,
2942
- ChType :8 /unsigned ,
2943
- NumEntries :16 /unsigned ,
2944
- NumRecords :32 /unsigned ,
2945
- Timestamp :64 /signed ,
2946
- Epoch :64 /unsigned ,
2947
- NextChId0 :64 /unsigned ,
2948
- Crc :32 /integer ,
2949
- DataSize :32 /unsigned ,
2950
- TrailerSize :32 /unsigned ,
2951
- FilterSize :8 /unsigned ,
2952
- _Reserved :24 ,
2953
- MaybeFilter /binary >> = HeaderData0 } ->
2954
- maybe_return_header (State , HeaderData0 , MaybeFilter , undefined ,
2955
- ChType , NumEntries , NumRecords , Timestamp ,
2956
- Epoch , NextChId0 , Crc , DataSize ,
2957
- TrailerSize , FilterSize );
2901
+ read_header_with_ra (#? MODULE {cfg = # cfg {directory = Dir ,
2902
+ shared = Shared },
2903
+ mode = # read {next_offset = NextChId0 ,
2904
+ position = Pos ,
2905
+ last_data_size = Lds ,
2906
+ read_ahead_data = undefined } = Read0 ,
2907
+ current_file = CurFile ,
2908
+ fd = Fd } = State ) ->
2909
+ ReadAheadSize = read_ahead_size (Lds ),
2910
+
2911
+ case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadSize ) of
2912
+ {ok , Bin } when byte_size (Bin ) >= ? HEADER_SIZE_B ->
2913
+ parse_header (Bin , State );
2958
2914
{ok , Bin } when byte_size (Bin ) < ? HEADER_SIZE_B ->
2959
2915
% % partial header read
2960
2916
% % this can happen when a replica reader reads ahead
@@ -3008,45 +2964,55 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
3008
2964
Invalid ->
3009
2965
{error , {invalid_chunk_header , Invalid }}
3010
2966
end ;
3011
- do_read_header (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
3012
- case read_ahead_chunk (RAD , State ) of
3013
- need_more_data ->
3014
- % % we don't have enough data in memory
2967
+ read_header_with_ra (#? MODULE {mode = # read {last_data_size = Lds ,
2968
+ read_ahead_data = RAD } = Read0 } = State ) ->
2969
+ case byte_size (RAD ) > ? HEADER_SIZE_B + Lds of
2970
+ true ->
2971
+ case parse_header (RAD , State ) of
2972
+ need_more_data ->
2973
+ % % we don't have enough data in memory
2974
+ Read1 = Read0 # read {read_ahead_data = undefined },
2975
+ read_header_with_ra (State #? MODULE {mode = Read1 });
2976
+ Result ->
2977
+ Result
2978
+ end ;
2979
+ false ->
3015
2980
Read1 = Read0 # read {read_ahead_data = undefined },
3016
- do_read_header (State #? MODULE {mode = Read1 });
3017
- R ->
3018
- R
2981
+ read_header_with_ra (State #? MODULE {mode = Read1 })
3019
2982
end .
3020
2983
3021
- read_ahead_chunk (<<? MAGIC :4 /unsigned ,
3022
- ? VERSION :4 /unsigned ,
3023
- ChType :8 /unsigned ,
3024
- NumEntries :16 /unsigned ,
3025
- NumRecords :32 /unsigned ,
3026
- Timestamp :64 /signed ,
3027
- Epoch :64 /unsigned ,
3028
- NextChId0 :64 /unsigned ,
3029
- Crc :32 /integer ,
3030
- DataSize :32 /unsigned ,
3031
- TrailerSize :32 /unsigned ,
3032
- FilterSize :8 /unsigned ,
3033
- _Reserved :24 ,
3034
- MaybeFilterAndRest /binary >> = HeaderData0 ,
3035
- #? MODULE {mode = # read {type = RType ,
3036
- chunk_selector = Selector ,
3037
- next_offset = NextChId0 } = Read0 } = State ) ->
2984
+
2985
+ parse_header (<<? MAGIC :4 /unsigned ,
2986
+ ? VERSION :4 /unsigned ,
2987
+ ChType :8 /unsigned ,
2988
+ NumEntries :16 /unsigned ,
2989
+ NumRecords :32 /unsigned ,
2990
+ Timestamp :64 /signed ,
2991
+ Epoch :64 /unsigned ,
2992
+ NextChId0 :64 /unsigned ,
2993
+ Crc :32 /integer ,
2994
+ DataSize :32 /unsigned ,
2995
+ TrailerSize :32 /unsigned ,
2996
+ FilterSize :8 /unsigned ,
2997
+ _Reserved :24 ,
2998
+ MaybeFilterAndRest /binary >> = HeaderData0 ,
2999
+ #? MODULE {mode = # read {type = RType ,
3000
+ chunk_selector = Selector ,
3001
+ next_offset = NextChId0 } = Read0 } = State ) ->
3038
3002
{ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
3039
- FilterSize , DataSize ,
3040
- TrailerSize ),
3041
- case byte_size (MaybeFilterAndRest ) of
3042
- RAS when RAS >= ( ToSkip + ToSend ) ->
3003
+ FilterSize , DataSize ,
3004
+ TrailerSize ),
3005
+ case byte_size (MaybeFilterAndRest ) >= ( ToSkip + ToSend ) of
3006
+ true ->
3043
3007
% % we read everything we needed
3044
3008
{ReadAheadData , Content } =
3045
3009
case MaybeFilterAndRest of
3046
3010
<<_Skip :ToSkip /binary ,
3047
3011
Ctnt :ToSend /binary ,
3048
3012
Rest /binary >>
3049
- when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3013
+ when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3014
+ % % remained is larger than 64 bytes so worth keeping
3015
+ % % around
3050
3016
{Rest , Ctnt };
3051
3017
<<_Skip :ToSkip /binary ,
3052
3018
Ctnt :ToSend /binary ,
@@ -3059,20 +3025,27 @@ read_ahead_chunk(<<?MAGIC:4/unsigned,
3059
3025
ChType , NumEntries , NumRecords , Timestamp ,
3060
3026
Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3061
3027
FilterSize );
3062
- _ ->
3063
- need_more_data
3028
+ false ->
3029
+ % % having to throw away the read ahead data here
3030
+ Read1 = Read0 # read {read_ahead_data = undefined },
3031
+ maybe_return_header (State #? MODULE {mode = Read1 },
3032
+ HeaderData0 , MaybeFilterAndRest , undefined ,
3033
+ ChType , NumEntries , NumRecords , Timestamp ,
3034
+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3035
+ FilterSize )
3064
3036
end ;
3065
- read_ahead_chunk (_ , _ ) ->
3037
+ parse_header (_ , _ ) ->
3066
3038
need_more_data .
3067
3039
3068
- read_ahead_offset (LastDataSize ) ->
3069
- case LastDataSize of
3070
- LastDataSize when ? CAN_READ_AHEAD (LastDataSize ) ->
3040
+ read_ahead_size (LastDataSize ) ->
3041
+ case LastDataSize =/= undefined andalso
3042
+ LastDataSize =< ? READ_AHEAD_LIMIT of
3043
+ true ->
3071
3044
% % the previous chunk was small, try to read
3072
3045
% % the next chunk fully in one read
3073
- % % this can save us the sendfile call later
3046
+ % % this can save us a system call later
3074
3047
? DEFAULT_FILTER_SIZE + ? READ_AHEAD_LIMIT ;
3075
- _ ->
3048
+ false ->
3076
3049
% % optimistically read the default filter size.
3077
3050
% % this amounts to 64 bytes with the header (small binary)
3078
3051
% % and it may save us a syscall reading the filter
@@ -3087,7 +3060,6 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef},
3087
3060
fd = Fd } = State , HeaderData0 , MaybeFilter , Content ,
3088
3061
ChType , NumEntries , NumRecords , Timestamp , Epoch ,
3089
3062
NextChId0 , Crc , DataSize , TrailerSize , FilterSize ) ->
3090
- <<HeaderData :? HEADER_SIZE_B /binary , _ /binary >> = HeaderData0 ,
3091
3063
3092
3064
ChunkFilter = case MaybeFilter of
3093
3065
<<F :FilterSize /binary , _ /binary >> ->
@@ -3112,6 +3084,7 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef},
3112
3084
3113
3085
case osiris_bloom :is_match (ChunkFilter , Filter ) of
3114
3086
true ->
3087
+ <<HeaderData :? HEADER_SIZE_B /binary , _ /binary >> = HeaderData0 ,
3115
3088
{ok , #{chunk_id => NextChId0 ,
3116
3089
epoch => Epoch ,
3117
3090
type => ChType ,
0 commit comments