431
431
chunk_selector :: all | user_data ,
432
432
position = 0 :: non_neg_integer (),
433
433
filter :: undefined | osiris_bloom :mstate (),
434
- last_data_size = undefined :: undefined | non_neg_integer ()}).
434
+ last_data_size = undefined :: undefined | non_neg_integer (),
435
+ read_ahead_data = undefined :: undefined | binary ()}).
435
436
-record (write ,
436
437
{type = writer :: writer | acceptor ,
437
438
segment_size = {? LOG_HEADER_SIZE , 0 } :: {non_neg_integer (), non_neg_integer ()},
@@ -2899,11 +2900,10 @@ read_header0(State) ->
2899
2900
2900
2901
do_read_header (#? MODULE {cfg = # cfg {directory = Dir ,
2901
2902
shared = Shared },
2902
- mode = # read {type = RType ,
2903
- chunk_selector = Selector ,
2904
- next_offset = NextChId0 ,
2903
+ mode = # read {next_offset = NextChId0 ,
2905
2904
position = Pos ,
2906
- last_data_size = Lds } = Read0 ,
2905
+ last_data_size = Lds ,
2906
+ read_ahead_data = undefined } = Read0 ,
2907
2907
current_file = CurFile ,
2908
2908
fd = Fd } = State ) ->
2909
2909
ReadAheadOffset = read_ahead_offset (Lds ),
@@ -2924,26 +2924,17 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
2924
2924
_Reserved :24 ,
2925
2925
MaybeFilterAndRest /binary >> = HeaderData0 }
2926
2926
when ? CAN_READ_AHEAD (Lds ) ->
2927
- {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
2928
- FilterSize , DataSize ,
2929
- TrailerSize ),
2930
- % % summing what we need to skip and send gives us the number
2931
- % % of bytes we need to read, we make sure we read it ahead
2932
- Content = case ReadAheadOffset of
2933
- RAO when RAO >= (ToSkip + ToSend ) ->
2934
- % % we read everything we needed
2935
- <<_Skip :ToSkip /binary ,
2936
- Ctnt :ToSend /binary ,
2937
- _Rest /binary >> = MaybeFilterAndRest ,
2938
- Ctnt ;
2939
- _ ->
2940
- % % we did not read enough, the caller will have to do it
2941
- undefined
2942
- end ,
2943
- maybe_return_header (State , HeaderData0 , MaybeFilterAndRest , Content ,
2944
- ChType , NumEntries , NumRecords , Timestamp ,
2945
- Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2946
- FilterSize );
2927
+ case read_ahead_chunk (HeaderData0 , State ) of
2928
+ need_more_data ->
2929
+ Read1 = Read0 # read {read_ahead_data = undefined },
2930
+ maybe_return_header (State #? MODULE {mode = Read1 },
2931
+ HeaderData0 , MaybeFilterAndRest , undefined ,
2932
+ ChType , NumEntries , NumRecords , Timestamp ,
2933
+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2934
+ FilterSize );
2935
+ R ->
2936
+ R
2937
+ end ;
2947
2938
{ok , <<? MAGIC :4 /unsigned ,
2948
2939
? VERSION :4 /unsigned ,
2949
2940
ChType :8 /unsigned ,
@@ -3014,8 +3005,64 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
3014
3005
{error , {unexpected_chunk_id , UnexpectedChId , NextChId0 }};
3015
3006
Invalid ->
3016
3007
{error , {invalid_chunk_header , Invalid }}
3008
+ end ;
3009
+ do_read_header (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
3010
+ case read_ahead_chunk (RAD , State ) of
3011
+ need_more_data ->
3012
+ % % we don't have enough data in memory
3013
+ Read1 = Read0 # read {read_ahead_data = undefined },
3014
+ do_read_header (State #? MODULE {mode = Read1 });
3015
+ R ->
3016
+ R
3017
3017
end .
3018
3018
3019
+ read_ahead_chunk (<<? MAGIC :4 /unsigned ,
3020
+ ? VERSION :4 /unsigned ,
3021
+ ChType :8 /unsigned ,
3022
+ NumEntries :16 /unsigned ,
3023
+ NumRecords :32 /unsigned ,
3024
+ Timestamp :64 /signed ,
3025
+ Epoch :64 /unsigned ,
3026
+ NextChId0 :64 /unsigned ,
3027
+ Crc :32 /integer ,
3028
+ DataSize :32 /unsigned ,
3029
+ TrailerSize :32 /unsigned ,
3030
+ FilterSize :8 /unsigned ,
3031
+ _Reserved :24 ,
3032
+ MaybeFilterAndRest /binary >> = HeaderData0 ,
3033
+ #? MODULE {mode = # read {type = RType ,
3034
+ chunk_selector = Selector ,
3035
+ next_offset = NextChId0 } = Read0 } = State ) ->
3036
+ {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
3037
+ FilterSize , DataSize ,
3038
+ TrailerSize ),
3039
+ case byte_size (MaybeFilterAndRest ) of
3040
+ RAS when RAS >= (ToSkip + ToSend ) ->
3041
+ % % we read everything we needed
3042
+ {ReadAheadData , Content } =
3043
+ case MaybeFilterAndRest of
3044
+ <<_Skip :ToSkip /binary ,
3045
+ Ctnt :ToSend /binary ,
3046
+ Rest /binary >>
3047
+ when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3048
+ {Rest , Ctnt };
3049
+ <<_Skip :ToSkip /binary ,
3050
+ Ctnt :ToSend /binary ,
3051
+ _Rest /binary >> ->
3052
+ {undefined , Ctnt }
3053
+ end ,
3054
+ Read1 = Read0 # read {read_ahead_data = ReadAheadData },
3055
+ maybe_return_header (State #? MODULE {mode = Read1 },
3056
+ HeaderData0 , MaybeFilterAndRest , Content ,
3057
+ ChType , NumEntries , NumRecords , Timestamp ,
3058
+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3059
+ FilterSize );
3060
+ _ ->
3061
+ need_more_data
3062
+ end ;
3063
+ read_ahead_chunk (_ , _ ) ->
3064
+ need_more_data .
3065
+
3019
3066
read_ahead_offset (LastDataSize ) ->
3020
3067
case LastDataSize of
3021
3068
LastDataSize when ? CAN_READ_AHEAD (LastDataSize ) ->
0 commit comments