From 674ae5743edaa88fb166b6bb30e9cdf8b12aa89a Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Thu, 13 Mar 2025 21:06:27 +0000 Subject: [PATCH 1/9] Optional io device to open files --- src/osiris_log.erl | 94 ++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5cfb238..787f041 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -105,6 +105,8 @@ -define(SKIP_SEARCH_JUMP, 2048). +-define(DEFAULT_IO_MODULE, file). + %% Specification of the Log format. %% %% Notes: @@ -577,14 +579,14 @@ init(#{dir := Dir, ?DEBUG_(Name, " next offset ~b first offset ~b", [element(1, TailInfo), FstChId]), - {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), {ok, Size} = file:position(SegFd, Size), %% maybe_fix_corrupted_files has truncated the index to the last %% record pointing %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data ok = file:truncate(SegFd), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), {ok, IdxEof} = file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, #?MODULE{cfg = Cfg, @@ -600,8 +602,8 @@ init(#{dir := Dir, index = IdxFilename, last = undefined}, _} -> %% the empty log case - {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll @@ -641,7 +643,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only index doesn't contain a single valid record % make sure it has a valid header - {ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFile, ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), ok = file:close(IdxFd); false -> @@ -651,7 +653,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only segment doesn't contain a single valid chunk % make sure it has a valid header - {ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(SegFile, ?FILE_OPTS_WRITE), ok = file:write(SegFd, ?LOG_HEADER), ok = file:close(SegFd); false -> @@ -698,7 +700,7 @@ truncate_invalid_idx_records(IdxFile, SegSize) -> % add an option to perform a full segment scan and reconstruct % the index for the valid chunks. SegFile = segment_from_index_file(IdxFile), - {ok, IdxFd} = open(IdxFile, [raw, binary, write, read]), + {ok, IdxFd} = open_idx(IdxFile, [raw, binary, write, read]), {ok, Pos} = position_at_idx_record_boundary(IdxFd, eof), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), ok = file:truncate(IdxFd), @@ -716,7 +718,7 @@ skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> {ok, ?IDX_MATCH(_, _, ChunkPos)} -> % a non-zero index record case ChunkPos < SegSize andalso - is_valid_chunk_on_disk(SegFile, ChunkPos) of + is_valid_chunk_on_disk(SegFile, ChunkPos) of true -> ok; false -> @@ -955,9 +957,9 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset - {ok, Fd} = file:open(File, [read, write, binary, raw]), + {ok, Fd} = open_seg(File, [read, write, binary, raw]), _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), + {ok, IdxFd} = open_idx(IdxFile, [read, write, binary, raw]), NextPos = next_chunk_pos(Fd, Pos), {ok, _} = file:position(Fd, NextPos), @@ -1046,7 +1048,7 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case file:open(File, [raw, binary, read]) of + case open_seg(File, [raw, binary, read]) of {ok, Fd} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), @@ -1156,7 +1158,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); {first_in, IdxFile} -> - {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, Fd} = open_seg(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); @@ -1267,7 +1269,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, readers_counter_fun := ReaderCounterFun, options := Options} = Conf) -> - {ok, Fd} = open(SegmentFile, [raw, binary, read]), + {ok, Fd} = open_seg(SegmentFile, [raw, binary, read]), Cnt = make_counter(Conf), ReaderCounterFun(1), FilterMatcher = case Options of @@ -1307,7 +1309,7 @@ last_user_chunk_id0(_, []) -> not_found; last_user_chunk_id0(Name, [IdxFile | Rest]) -> %% Do not read-ahead since we read the index file backwards chunk by chunk. - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), _ = file:close(IdxFd), @@ -1606,7 +1608,7 @@ iter_all_records({X, I}, Acc0) -> is_valid_chunk_on_disk(SegFile, Pos) -> %% read a chunk from a specified location in the segment %% then checks the CRC - case open(SegFile, [read, raw, binary]) of + case open_seg(SegFile, [read, raw, binary]) of {ok, SegFd} -> IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of {ok, @@ -1896,7 +1898,7 @@ last_idx_record(IdxFd) -> nth_last_idx_record(IdxFd, 1). nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) -> - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), IdxRecord = nth_last_idx_record(IdxFd, N), _ = file:close(IdxFd), IdxRecord; @@ -1909,7 +1911,7 @@ nth_last_idx_record(IdxFd, N) -> end. last_valid_idx_record(IdxFile) -> - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), case position_at_idx_record_boundary(IdxFd, eof) of {ok, Pos} -> SegFile = segment_from_index_file(IdxFile), @@ -1954,7 +1956,7 @@ position_at_idx_record_boundary(IdxFd, At) -> end. build_segment_info(SegFile, LastChunkPos, IdxFile) -> - {ok, Fd} = open(SegFile, [read, binary, raw]), + {ok, Fd} = open_seg(SegFile, [read, binary, raw]), %% we don't want to read blocks into page cache we are unlikely to need _ = file:advise(Fd, 0, 0, random), case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of @@ -2183,7 +2185,9 @@ eval_max_bytes([IdxFile | Rest], Limit, Acc) -> end. file_size(Path) -> - case prim_file:read_file_info(Path) of + IOMod = application:get_env(osiris, io_segment_module, + prim_file), + case IOMod:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -2214,7 +2218,7 @@ last_epoch_chunk_ids0([], undefined) -> %% the empty stream []; last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) -> - {ok, Fd} = open(IdxFile, [read, raw, binary]), + {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), case first_idx_record(Fd) of {ok, ?IDX_MATCH(FstChId, FstEpoch, _)} -> ok = file:close(Fd), @@ -2228,7 +2232,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> case last_valid_idx_record(IdxFile) of {ok, ?IDX_MATCH(_LstChId, LstEpoch, _)} when LstEpoch > PrevE -> - {ok, Fd} = open(IdxFile, [read, raw, binary]), + {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE, fun leo_search_fun/3, Acc0), @@ -2449,6 +2453,13 @@ max_segment_size_reached( sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; +sendfile(tcp = _Transport, Fd, Sock, Pos, ToSend) when is_pid(Fd) -> + case file:pread(Fd, Pos, ToSend) of + {ok, Data} -> + gen_tcp:send(Sock, Data); + {error, _} = Err -> + Err + end; sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> case file:sendfile(Fd, Sock, Pos, ToSend, []) of {ok, 0} -> @@ -2473,7 +2484,7 @@ send(ssl, Sock, Data) -> ssl:send(Sock, Data). last_timestamp_in_index_file(IdxFile) -> - case file:open(IdxFile, [raw, binary, read]) of + case open_idx(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, @@ -2492,7 +2503,7 @@ last_timestamp_in_index_file(IdxFile) -> end. first_timestamp_from_index_files([IdxFile | _]) -> - case file:open(IdxFile, [raw, binary, read]) of + case open_idx(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, @@ -2524,14 +2535,14 @@ chunk_id_range_from_idx_files(Files) -> end. chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) -> - {ok, LstFd} = open(LstIdxFile, [read, raw, binary]), + {ok, LstFd} = open_idx(LstIdxFile, [read, raw, binary]), case position_at_idx_record_boundary(LstFd, eof) of {ok, Pos} -> case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(LstChId, _, _)} -> ok = file:close(LstFd), - {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), + {ok, FstFd} = open_idx(FstIdxFile, [read, raw, binary]), case file:pread(FstFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(FstChId, _, _)} -> @@ -2649,11 +2660,11 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), {ok, IdxFd} = - file:open( + open_idx( filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = - file:open( + open_seg( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), ok = file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file @@ -2668,7 +2679,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. open_index_read(File) -> - {ok, Fd} = open(File, [read, raw, binary, read_ahead]), + {ok, Fd} = open_idx(File, [read, raw, binary, read_ahead]), %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied @@ -2686,7 +2697,7 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> true -> offset_out_of_range; false -> - {ok, IdxFd} = open(IndexFile, + {ok, IdxFd} = open_idx(IndexFile, [read, raw, binary]), _ = file:advise(IdxFd, 0, 0, random), {Offset, SearchResult} = @@ -2719,8 +2730,21 @@ throw_missing({error, enoent}) -> throw_missing(Any) -> Any. -open(File, Options) -> - throw_missing(file:open(File, Options)). +open_idx(File, Options) -> + IOMod = application:get_env(osiris, io_idx_module, + ?DEFAULT_IO_MODULE), + open(IOMod, File, Options). +open_seg(File, Options) -> + IOMod = application:get_env(osiris, io_segment_module, + ?DEFAULT_IO_MODULE), + open_seg(IOMod, File, Options). +open_seg(IOMod, File, Options) -> + open(IOMod, File, Options). + +%% open(File, Options) -> +%% open(file, File, Options). +open(Mod, File, Options) -> + throw_missing(Mod:open(File, Options)). chunk_location_for_timestamp(Idx, Ts) -> %% TODO: optimise using skip search approach @@ -2806,7 +2830,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), + {ok, Fd} = open_seg(filename:join(Dir, File), [read, raw, binary]), _ = file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing @@ -2895,7 +2919,6 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), counters:add(CntRef, ?C_CHUNKS, 1), NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - ChunkFilter = case MaybeFilter of <> -> %% filter is of default size or 0 @@ -2956,7 +2979,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case file:open(filename:join(Dir, SegFile), + case open_seg(filename:join(Dir, SegFile), [raw, binary, read]) of {ok, Fd2} -> ok = file:close(Fd), @@ -3029,7 +3052,7 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) -> binary_to_integer(filename:basename(IdxFile, <<".index">>)). first_last_timestamps(IdxFile) -> - case file:open(IdxFile, [raw, read, binary]) of + case open_idx(IdxFile, [raw, read, binary]) of {ok, Fd} -> _ = file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of @@ -3117,7 +3140,6 @@ close_fd(Fd) -> _ = file:close(Fd), ok. - dump_init(File) -> {ok, Fd} = file:open(File, [raw, binary, read]), {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), From a740adc5599faa2c71cb8383c5736b672e8df78e Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Fri, 16 May 2025 17:25:11 +0000 Subject: [PATCH 2/9] Add file read behaviour --- src/osiris_file.erl | 72 ++++++++++++++++++++++++++++ src/osiris_file_default.erl | 71 ++++++++++++++++++++++++++++ src/osiris_log.erl | 94 ++++++++++++++----------------------- 3 files changed, 179 insertions(+), 58 deletions(-) create mode 100644 src/osiris_file.erl create mode 100644 src/osiris_file_default.erl diff --git a/src/osiris_file.erl b/src/osiris_file.erl new file mode 100644 index 0000000..303244f --- /dev/null +++ b/src/osiris_file.erl @@ -0,0 +1,72 @@ +-module(osiris_file). + +%% -export([ +%% advise/4, +%% close/1, +%% copy/2, +%% del_dir/1, +%% delete/1, +%% ensure_dir/1, +%% list_dir/1, +%% make_dir/1, +%% open/2, +%% position/2, +%% pread/3, +%% read/2, +%% read_file_info/1, +%% sendfile/5, +%% truncate/1, +%% write/2 +%% ]). + +-callback advise(Handle :: term(), Offset :: non_neg_integer(), + Length :: non_neg_integer(), Advise :: term()) -> + ok | {error, Reason :: term()}. + +-callback close(Handle :: term()) -> + ok | {error, Reason :: term()}. + +-callback copy(Source :: file:filename_all(), Destination :: file:filename_all()) -> + {ok, BytesCopied :: non_neg_integer()} | {error, Reason :: term()}. + +-callback del_dir(Dir :: file:filename_all()) -> + ok | {error, Reason :: term()}. + +-callback delete(File :: file:filename_all()) -> + ok | {error, Reason :: term()}. + +-callback make_dir(Dir :: file:filename_all()) -> + ok | {error, Reason :: term()}. + +-callback ensure_dir(Dir :: file:filename_all()) -> + ok | {error, Reason :: term()}. + +-callback list_dir(Dir :: file:filename_all()) -> + {ok, [file:filename()]} | {error, Reason :: term()}. + +-callback open(File :: file:filename_all(), Options :: list()) -> + {ok, Handle :: term()} | {error, Reason :: term()}. + +-callback position(Handle :: term(), Position :: file:position()) -> + {ok, NewPosition :: non_neg_integer()} | {error, Reason :: term()}. + +-callback pread(Handle :: term(), Position :: non_neg_integer(), Size :: non_neg_integer()) -> + {ok, Data :: binary()} | eof | {error, Reason :: term()}. + +-callback read(Handle :: term(), Size :: non_neg_integer()) -> + {ok, Data :: binary()} | eof | {error, Reason :: term()}. + +-callback read_file_info(File :: file:filename_all()) -> + {ok, file:file_info()} | {error, Reason :: term()}. + +-callback sendfile(Handle :: term(), Socket :: term(), + Offset :: non_neg_integer(), Length :: non_neg_integer(), + Options :: list()) -> + {ok, BytesSent :: non_neg_integer()} | {error, Reason :: term()}. + +-callback truncate(Handle :: term()) -> + ok | {error, Reason :: term()}. + +%% Might not be needed. +-callback write(Handle :: term(), Data :: iodata()) -> + ok | {error, Reason :: term()}. diff --git a/src/osiris_file_default.erl b/src/osiris_file_default.erl new file mode 100644 index 0000000..c4a9de2 --- /dev/null +++ b/src/osiris_file_default.erl @@ -0,0 +1,71 @@ +-module(osiris_file_default). +-behaviour(osiris_file). + +%% Standard file operations +-export([ + advise/4, + close/1, + copy/2, + del_dir/1, + delete/1, + ensure_dir/1, + list_dir/1, + make_dir/1, + open/2, + position/2, + pread/3, + read/2, + read_file_info/1, + sendfile/5, + truncate/1, + write/2 + ]). + +%% Simple wrappers around file module +advise(Handle, Offset, Length, Advise) -> + file:advise(Handle, Offset, Length, Advise). + +close(Handle) -> + file:close(Handle). + +copy(Source, Destination) -> + file:copy(Source, Destination). + +del_dir(Dir) -> + file:del_dir(Dir). + +delete(File) -> + prim_file:delete(File). + +ensure_dir(Dir) -> + filelib:ensure_dir(Dir). + +list_dir(Dir) -> + prim_file:list_dir(Dir). + +make_dir(Dir) -> + file:make_dir(Dir). + +open(File, Options) -> + file:open(File, Options). + +position(Handle, Position) -> + file:position(Handle, Position). + +pread(Handle, Position, Size) -> + file:pread(Handle, Position, Size). + +read(Handle, Size) -> + file:read(Handle, Size). + +read_file_info(File) -> + prim_file:read_file_info(File). + +sendfile(Handle, Socket, Offset, Length, Options) -> + file:sendfile(Handle, Socket, Offset, Length, Options). + +truncate(Handle) -> + file:truncate(Handle). + +write(Handle, Data) -> + file:write(Handle, Data). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 787f041..5cfb238 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -105,8 +105,6 @@ -define(SKIP_SEARCH_JUMP, 2048). --define(DEFAULT_IO_MODULE, file). - %% Specification of the Log format. %% %% Notes: @@ -579,14 +577,14 @@ init(#{dir := Dir, ?DEBUG_(Name, " next offset ~b first offset ~b", [element(1, TailInfo), FstChId]), - {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), {ok, Size} = file:position(SegFd, Size), %% maybe_fix_corrupted_files has truncated the index to the last %% record pointing %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data ok = file:truncate(SegFd), - {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), {ok, IdxEof} = file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, #?MODULE{cfg = Cfg, @@ -602,8 +600,8 @@ init(#{dir := Dir, index = IdxFilename, last = undefined}, _} -> %% the empty log case - {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), - {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll @@ -643,7 +641,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only index doesn't contain a single valid record % make sure it has a valid header - {ok, IdxFd} = open_idx(IdxFile, ?FILE_OPTS_WRITE), + {ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), ok = file:close(IdxFd); false -> @@ -653,7 +651,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only segment doesn't contain a single valid chunk % make sure it has a valid header - {ok, SegFd} = open_seg(SegFile, ?FILE_OPTS_WRITE), + {ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE), ok = file:write(SegFd, ?LOG_HEADER), ok = file:close(SegFd); false -> @@ -700,7 +698,7 @@ truncate_invalid_idx_records(IdxFile, SegSize) -> % add an option to perform a full segment scan and reconstruct % the index for the valid chunks. SegFile = segment_from_index_file(IdxFile), - {ok, IdxFd} = open_idx(IdxFile, [raw, binary, write, read]), + {ok, IdxFd} = open(IdxFile, [raw, binary, write, read]), {ok, Pos} = position_at_idx_record_boundary(IdxFd, eof), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), ok = file:truncate(IdxFd), @@ -718,7 +716,7 @@ skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> {ok, ?IDX_MATCH(_, _, ChunkPos)} -> % a non-zero index record case ChunkPos < SegSize andalso - is_valid_chunk_on_disk(SegFile, ChunkPos) of + is_valid_chunk_on_disk(SegFile, ChunkPos) of true -> ok; false -> @@ -957,9 +955,9 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset - {ok, Fd} = open_seg(File, [read, write, binary, raw]), + {ok, Fd} = file:open(File, [read, write, binary, raw]), _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = open_idx(IdxFile, [read, write, binary, raw]), + {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), NextPos = next_chunk_pos(Fd, Pos), {ok, _} = file:position(Fd, NextPos), @@ -1048,7 +1046,7 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case open_seg(File, [raw, binary, read]) of + case file:open(File, [raw, binary, read]) of {ok, Fd} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), @@ -1158,7 +1156,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); {first_in, IdxFile} -> - {ok, Fd} = open_seg(IdxFile, [raw, binary, read]), + {ok, Fd} = file:open(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); @@ -1269,7 +1267,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, readers_counter_fun := ReaderCounterFun, options := Options} = Conf) -> - {ok, Fd} = open_seg(SegmentFile, [raw, binary, read]), + {ok, Fd} = open(SegmentFile, [raw, binary, read]), Cnt = make_counter(Conf), ReaderCounterFun(1), FilterMatcher = case Options of @@ -1309,7 +1307,7 @@ last_user_chunk_id0(_, []) -> not_found; last_user_chunk_id0(Name, [IdxFile | Rest]) -> %% Do not read-ahead since we read the index file backwards chunk by chunk. - {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open(IdxFile, [read, raw, binary]), {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), _ = file:close(IdxFd), @@ -1608,7 +1606,7 @@ iter_all_records({X, I}, Acc0) -> is_valid_chunk_on_disk(SegFile, Pos) -> %% read a chunk from a specified location in the segment %% then checks the CRC - case open_seg(SegFile, [read, raw, binary]) of + case open(SegFile, [read, raw, binary]) of {ok, SegFd} -> IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of {ok, @@ -1898,7 +1896,7 @@ last_idx_record(IdxFd) -> nth_last_idx_record(IdxFd, 1). nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) -> - {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open(IdxFile, [read, raw, binary]), IdxRecord = nth_last_idx_record(IdxFd, N), _ = file:close(IdxFd), IdxRecord; @@ -1911,7 +1909,7 @@ nth_last_idx_record(IdxFd, N) -> end. last_valid_idx_record(IdxFile) -> - {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open(IdxFile, [read, raw, binary]), case position_at_idx_record_boundary(IdxFd, eof) of {ok, Pos} -> SegFile = segment_from_index_file(IdxFile), @@ -1956,7 +1954,7 @@ position_at_idx_record_boundary(IdxFd, At) -> end. build_segment_info(SegFile, LastChunkPos, IdxFile) -> - {ok, Fd} = open_seg(SegFile, [read, binary, raw]), + {ok, Fd} = open(SegFile, [read, binary, raw]), %% we don't want to read blocks into page cache we are unlikely to need _ = file:advise(Fd, 0, 0, random), case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of @@ -2185,9 +2183,7 @@ eval_max_bytes([IdxFile | Rest], Limit, Acc) -> end. file_size(Path) -> - IOMod = application:get_env(osiris, io_segment_module, - prim_file), - case IOMod:read_file_info(Path) of + case prim_file:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -2218,7 +2214,7 @@ last_epoch_chunk_ids0([], undefined) -> %% the empty stream []; last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) -> - {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), + {ok, Fd} = open(IdxFile, [read, raw, binary]), case first_idx_record(Fd) of {ok, ?IDX_MATCH(FstChId, FstEpoch, _)} -> ok = file:close(Fd), @@ -2232,7 +2228,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> case last_valid_idx_record(IdxFile) of {ok, ?IDX_MATCH(_LstChId, LstEpoch, _)} when LstEpoch > PrevE -> - {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), + {ok, Fd} = open(IdxFile, [read, raw, binary]), Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE, fun leo_search_fun/3, Acc0), @@ -2453,13 +2449,6 @@ max_segment_size_reached( sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; -sendfile(tcp = _Transport, Fd, Sock, Pos, ToSend) when is_pid(Fd) -> - case file:pread(Fd, Pos, ToSend) of - {ok, Data} -> - gen_tcp:send(Sock, Data); - {error, _} = Err -> - Err - end; sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> case file:sendfile(Fd, Sock, Pos, ToSend, []) of {ok, 0} -> @@ -2484,7 +2473,7 @@ send(ssl, Sock, Data) -> ssl:send(Sock, Data). last_timestamp_in_index_file(IdxFile) -> - case open_idx(IdxFile, [raw, binary, read]) of + case file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, @@ -2503,7 +2492,7 @@ last_timestamp_in_index_file(IdxFile) -> end. first_timestamp_from_index_files([IdxFile | _]) -> - case open_idx(IdxFile, [raw, binary, read]) of + case file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, @@ -2535,14 +2524,14 @@ chunk_id_range_from_idx_files(Files) -> end. chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) -> - {ok, LstFd} = open_idx(LstIdxFile, [read, raw, binary]), + {ok, LstFd} = open(LstIdxFile, [read, raw, binary]), case position_at_idx_record_boundary(LstFd, eof) of {ok, Pos} -> case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(LstChId, _, _)} -> ok = file:close(LstFd), - {ok, FstFd} = open_idx(FstIdxFile, [read, raw, binary]), + {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), case file:pread(FstFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(FstChId, _, _)} -> @@ -2660,11 +2649,11 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), {ok, IdxFd} = - open_idx( + file:open( filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = - open_seg( + file:open( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), ok = file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file @@ -2679,7 +2668,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. open_index_read(File) -> - {ok, Fd} = open_idx(File, [read, raw, binary, read_ahead]), + {ok, Fd} = open(File, [read, raw, binary, read_ahead]), %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied @@ -2697,7 +2686,7 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> true -> offset_out_of_range; false -> - {ok, IdxFd} = open_idx(IndexFile, + {ok, IdxFd} = open(IndexFile, [read, raw, binary]), _ = file:advise(IdxFd, 0, 0, random), {Offset, SearchResult} = @@ -2730,21 +2719,8 @@ throw_missing({error, enoent}) -> throw_missing(Any) -> Any. -open_idx(File, Options) -> - IOMod = application:get_env(osiris, io_idx_module, - ?DEFAULT_IO_MODULE), - open(IOMod, File, Options). -open_seg(File, Options) -> - IOMod = application:get_env(osiris, io_segment_module, - ?DEFAULT_IO_MODULE), - open_seg(IOMod, File, Options). -open_seg(IOMod, File, Options) -> - open(IOMod, File, Options). - -%% open(File, Options) -> -%% open(file, File, Options). -open(Mod, File, Options) -> - throw_missing(Mod:open(File, Options)). +open(File, Options) -> + throw_missing(file:open(File, Options)). chunk_location_for_timestamp(Idx, Ts) -> %% TODO: optimise using skip search approach @@ -2830,7 +2806,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, Fd} = open_seg(filename:join(Dir, File), [read, raw, binary]), + {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), _ = file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing @@ -2919,6 +2895,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), counters:add(CntRef, ?C_CHUNKS, 1), NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, + ChunkFilter = case MaybeFilter of <> -> %% filter is of default size or 0 @@ -2979,7 +2956,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case open_seg(filename:join(Dir, SegFile), + case file:open(filename:join(Dir, SegFile), [raw, binary, read]) of {ok, Fd2} -> ok = file:close(Fd), @@ -3052,7 +3029,7 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) -> binary_to_integer(filename:basename(IdxFile, <<".index">>)). first_last_timestamps(IdxFile) -> - case open_idx(IdxFile, [raw, read, binary]) of + case file:open(IdxFile, [raw, read, binary]) of {ok, Fd} -> _ = file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of @@ -3140,6 +3117,7 @@ close_fd(Fd) -> _ = file:close(Fd), ok. + dump_init(File) -> {ok, Fd} = file:open(File, [raw, binary, read]), {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), From 56a5bc86372f251add2ca09bad8457d06697fa9d Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Sun, 18 May 2025 04:32:21 +0000 Subject: [PATCH 3/9] further abstraction --- src/osiris_file.erl | 135 ++++++++++++++++++++---- src/osiris_log.erl | 243 ++++++++++++++++++++++---------------------- 2 files changed, 238 insertions(+), 140 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index 303244f..b69ae78 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -1,23 +1,23 @@ -module(osiris_file). -%% -export([ -%% advise/4, -%% close/1, -%% copy/2, -%% del_dir/1, -%% delete/1, -%% ensure_dir/1, -%% list_dir/1, -%% make_dir/1, -%% open/2, -%% position/2, -%% pread/3, -%% read/2, -%% read_file_info/1, -%% sendfile/5, -%% truncate/1, -%% write/2 -%% ]). +-export([ + advise/4, + close/1, + copy/2, + del_dir/1, + delete/1, + ensure_dir/1, + list_dir/1, + make_dir/1, + open/2, + position/2, + pread/3, + read/2, + read_file_info/1, + sendfile/5, + truncate/1, + write/2 + ]). -callback advise(Handle :: term(), Offset :: non_neg_integer(), Length :: non_neg_integer(), Advise :: term()) -> @@ -67,6 +67,103 @@ -callback truncate(Handle :: term()) -> ok | {error, Reason :: term()}. -%% Might not be needed. -callback write(Handle :: term(), Data :: iodata()) -> ok | {error, Reason :: term()}. + +-optional_callbacks([write/2]). + +advise({Mod, Handle}, Offset, Length, Advise) -> + Mod:advise(Handle, Offset, Length, Advise); +advise(Handle, Offset, Length, Advise) -> + file:advise(Handle, Offset, Length, Advise). + +close({Mod, Handle}) -> + Mod:close(Handle); +close(Handle) -> + file:close(Handle). + + +%% TODO +copy(Source, Destination) -> + file:copy(Source, Destination). +%% TODO +del_dir(Dir) -> + file:del_dir(Dir). + +%% TODO +delete(File) -> + prim_file:delete(File). + +%% TODO +ensure_dir(Dir) -> + filelib:ensure_dir(Dir). + +%% TODO +list_dir(Dir) -> + prim_file:list_dir(Dir). + +%% TODO +make_dir(Dir) -> + file:make_dir(Dir). + +open(File, Options) -> + case lists:member(write, Options) of + true -> + %% We do not use tiered storage for writes + file:open(File, Options); + false -> + %% Here we will get the correct Mod based on config/manifest file etc. + Mod = get_mod(), + {ok, Fd} = Mod:open(File, Options), + {ok, {Mod, Fd}} + end. + + +position({Mod, Handle}, Position) -> + Mod:position(Handle, Position); +position(Handle, Position) -> + file:position(Handle, Position). + + +pread({Mod, Handle}, Position, Size) -> + Mod:pread(Handle, Position, Size); +pread(Handle, Position, Size) -> + file:pread(Handle, Position, Size). + + +read({Mod, Handle}, Size) -> + Mod:read(Handle, Size); +read(Handle, Size) -> + file:read(Handle, Size). + + +%% Todo +read_file_info(File) -> + prim_file:read_file_info(File). + +sendfile({Mod, Handle}, Socket, Offset, Length, Options) -> + Mod:sendfile(Handle, Socket, Offset, Length, Options); +sendfile(Handle, Socket, Offset, Length, Options) -> + file:sendfile(Handle, Socket, Offset, Length, Options). + +truncate({Mod, Handle}) -> + Mod:truncate(Handle); +truncate(Handle) -> + file:truncate(Handle). + + +write({Mod, Handle}, Data) -> + Mod:write(Handle, Data); +write(Handle, Data) -> + file:write(Handle, Data). + +%% try_write(Mod, Handle, Data) -> +%% case erlang:function_exported(Mod, write, 2) of +%% true -> +%% Mod:write(Handle, Data); +%% false -> +%% file:write(Handle, Data) +%% end. + +get_mod() -> + file. diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5cfb238..acb1170 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -578,14 +578,14 @@ init(#{dir := Dir, [element(1, TailInfo), FstChId]), {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), - {ok, Size} = file:position(SegFd, Size), + {ok, Size} = osiris_file:position(SegFd, Size), %% maybe_fix_corrupted_files has truncated the index to the last %% record pointing %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data - ok = file:truncate(SegFd), + ok = osiris_file:truncate(SegFd), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - {ok, IdxEof} = file:position(IdxFd, eof), + {ok, IdxEof} = osiris_file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, #?MODULE{cfg = Cfg, mode = @@ -602,13 +602,13 @@ init(#{dir := Dir, %% the empty log case {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), + {ok, _} = osiris_file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll %% do a truncate just in case. The index would have been truncated %% earlier - ok = file:truncate(SegFd), - {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), + ok = osiris_file:truncate(SegFd), + {ok, _} = osiris_file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), #?MODULE{cfg = Cfg, @@ -641,9 +641,9 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only index doesn't contain a single valid record % make sure it has a valid header - {ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE), - ok = file:write(IdxFd, ?IDX_HEADER), - ok = file:close(IdxFd); + {ok, IdxFd} = osiris_file:open(IdxFile, ?FILE_OPTS_WRITE), + ok = osiris_file:write(IdxFd, ?IDX_HEADER), + ok = osiris_file:close(IdxFd); false -> ok end, @@ -651,9 +651,9 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only segment doesn't contain a single valid chunk % make sure it has a valid header - {ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE), - ok = file:write(SegFd, ?LOG_HEADER), - ok = file:close(SegFd); + {ok, SegFd} = osiris_file:open(SegFile, ?FILE_OPTS_WRITE), + ok = osiris_file:write(SegFd, ?LOG_HEADER), + ok = osiris_file:close(SegFd); false -> ok end; @@ -701,14 +701,14 @@ truncate_invalid_idx_records(IdxFile, SegSize) -> {ok, IdxFd} = open(IdxFile, [raw, binary, write, read]), {ok, Pos} = position_at_idx_record_boundary(IdxFd, eof), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), - ok = file:truncate(IdxFd), - file:close(IdxFd). + ok = osiris_file:truncate(IdxFd), + osiris_file:close(IdxFd). skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> case Pos >= ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B of true -> - {ok, _} = file:position(IdxFd, Pos - ?INDEX_RECORD_SIZE_B), - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of + {ok, _} = osiris_file:position(IdxFd, Pos - ?INDEX_RECORD_SIZE_B), + case osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of {ok, ?ZERO_IDX_MATCH(_)} -> % trailing zeros found skip_invalid_idx_records(IdxFd, SegFile, SegSize, @@ -729,7 +729,7 @@ skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> end; false -> %% TODO should we validate the correctness of index/segment headers? - {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), + {ok, _} = osiris_file:position(IdxFd, ?IDX_HEADER_SIZE), ok end. @@ -881,15 +881,15 @@ chunk_id_index_scan(IdxFile, ChunkId) chunk_id_index_scan0(Fd, ChunkId). chunk_id_index_scan0(Fd, ChunkId) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(ChunkId, Epoch, FilePos)} -> - {ok, IdxPos} = file:position(Fd, cur), - ok = file:close(Fd), + {ok, IdxPos} = osiris_file:position(Fd, cur), + ok = osiris_file:close(Fd), {ChunkId, Epoch, FilePos, IdxPos - ?INDEX_RECORD_SIZE_B}; {ok, _} -> chunk_id_index_scan0(Fd, ChunkId); eof -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), eof end. @@ -955,18 +955,18 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset - {ok, Fd} = file:open(File, [read, write, binary, raw]), - _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), + {ok, Fd} = osiris_file:open(File, [read, write, binary, raw]), + _ = osiris_file:advise(Fd, 0, 0, random), + {ok, IdxFd} = osiris_file:open(IdxFile, [read, write, binary, raw]), NextPos = next_chunk_pos(Fd, Pos), - {ok, _} = file:position(Fd, NextPos), - ok = file:truncate(Fd), + {ok, _} = osiris_file:position(Fd, NextPos), + ok = osiris_file:truncate(Fd), - {ok, _} = file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), - ok = file:truncate(IdxFd), - ok = file:close(Fd), - ok = file:close(IdxFd), + {ok, _} = osiris_file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), + ok = osiris_file:truncate(IdxFd), + ok = osiris_file:close(Fd), + ok = osiris_file:close(IdxFd), %% delete all segments with a first offset larger then ChId %% and return the remainder lists:filter( @@ -1046,7 +1046,7 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case file:open(File, [raw, binary, read]) of + case osiris_file:open(File, [raw, binary, read]) of {ok, Fd} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), @@ -1156,7 +1156,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); {first_in, IdxFile} -> - {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, Fd} = osiris_file:open(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); @@ -1310,7 +1310,7 @@ last_user_chunk_id0(Name, [IdxFile | Rest]) -> {ok, IdxFd} = open(IdxFile, [read, raw, binary]), {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), case Last of {ok, Id, Pos} -> {Id, Pos, IdxFile}; @@ -1322,7 +1322,7 @@ last_user_chunk_id0(Name, [IdxFile | Rest]) -> %% Searches the index file backwards for the chunk id of the last user chunk. last_user_chunk_id_in_index(NextPos, IdxFd) -> - case file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, + osiris_file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, Len + ?ITER_READ_AHEAD_B), {Record0, Rem1} end, @@ -1516,7 +1516,7 @@ iterator_next(#iterator{fd = Fd, %% not enough in Rem0 to read the entire record %% so we need to read it from disk {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, + osiris_file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, Len + ?ITER_READ_AHEAD_B), {Record0, Rem1} end, @@ -1528,7 +1528,7 @@ iterator_next(#iterator{fd = Fd, {{NextOffs, Record}, I}; iterator_next(#iterator{fd = Fd, next_record_pos = Pos} = I) -> - {ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), + {ok, Data} = osiris_file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), iterator_next(I#iterator{data = Data}). -spec read_chunk(state()) -> @@ -1552,7 +1552,7 @@ read_chunk(#?MODULE{cfg = #cfg{}} = State0) -> trailer_size := TrailerSize}, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State} -> ToRead = ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - {ok, ChData} = file:pread(Fd, Pos, ToRead), + {ok, ChData} = osiris_file:pread(Fd, Pos, ToRead), <<_:?HEADER_SIZE_B/binary, _:FilterSize/binary, RecordData:DataSize/binary, @@ -1608,7 +1608,7 @@ is_valid_chunk_on_disk(SegFile, Pos) -> %% then checks the CRC case open(SegFile, [read, raw, binary]) of {ok, SegFd} -> - IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of + IsValid = case osiris_file:pread(SegFd, Pos, ?HEADER_SIZE_B) of {ok, < FilterSize:8/unsigned, _Reserved:24>>} -> DataPos = Pos + FilterSize + ?HEADER_SIZE_B, - case file:pread(SegFd, DataPos, DataSize) of + case osiris_file:pread(SegFd, DataPos, DataSize) of {ok, Data} -> case erlang:crc32(Data) of Crc -> @@ -1638,7 +1638,7 @@ is_valid_chunk_on_disk(SegFile, Pos) -> _ -> false end, - _ = file:close(SegFd), + _ = osiris_file:close(SegFd), IsValid; _Err -> false @@ -1787,7 +1787,7 @@ next_chunk_pos(Fd, Pos) -> Size:32/unsigned, TSize:32/unsigned, FSize:8/unsigned, - _Reserved:24>>} = file:pread(Fd, Pos, ?HEADER_SIZE_B), + _Reserved:24>>} = osiris_file:pread(Fd, Pos, ?HEADER_SIZE_B), Pos + ?HEADER_SIZE_B + FSize + Size + TSize. @@ -1898,12 +1898,12 @@ last_idx_record(IdxFd) -> nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) -> {ok, IdxFd} = open(IdxFile, [read, raw, binary]), IdxRecord = nth_last_idx_record(IdxFd, N), - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), IdxRecord; nth_last_idx_record(IdxFd, N) -> case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B * N}) of {ok, _} -> - file:read(IdxFd, ?INDEX_RECORD_SIZE_B); + osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B); Err -> Err end. @@ -1915,17 +1915,17 @@ last_valid_idx_record(IdxFile) -> SegFile = segment_from_index_file(IdxFile), SegSize = file_size(SegFile), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), - case file:position(IdxFd, {cur, -?INDEX_RECORD_SIZE_B}) of + case osiris_file:position(IdxFd, {cur, -?INDEX_RECORD_SIZE_B}) of {ok, _} -> - IdxRecord = file:read(IdxFd, ?INDEX_RECORD_SIZE_B), - _ = file:close(IdxFd), + IdxRecord = osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B), + _ = osiris_file:close(IdxFd), IdxRecord; _ -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), undefined end; Err -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), Err end. @@ -1933,7 +1933,7 @@ first_idx_record(IdxFd) -> idx_read_at(IdxFd, ?IDX_HEADER_SIZE). idx_read_at(Fd, Pos) when is_integer(Pos) -> - case file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of {ok, ?ZERO_IDX_MATCH(_)} -> {error, empty_idx_record}; Ret -> @@ -1944,11 +1944,11 @@ idx_read_at(Fd, Pos) when is_integer(Pos) -> %% in the middle of a record being written concurrently. If that happens, we need to re-position at the nearest %% record boundry. See https://github.com/rabbitmq/osiris/issues/73 position_at_idx_record_boundary(IdxFd, At) -> - case file:position(IdxFd, At) of + case osiris_file:position(IdxFd, At) of {ok, Pos} -> case (Pos - ?IDX_HEADER_SIZE) rem ?INDEX_RECORD_SIZE_B of 0 -> {ok, Pos}; - N -> file:position(IdxFd, {cur, -N}) + N -> osiris_file:position(IdxFd, {cur, -N}) end; Error -> Error end. @@ -1956,10 +1956,10 @@ position_at_idx_record_boundary(IdxFd, At) -> build_segment_info(SegFile, LastChunkPos, IdxFile) -> {ok, Fd} = open(SegFile, [read, binary, raw]), %% we don't want to read blocks into page cache we are unlikely to need - _ = file:advise(Fd, 0, 0, random), - case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of + _ = osiris_file:advise(Fd, 0, 0, random), + case osiris_file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> - _ = file:close(Fd), + _ = osiris_file:close(Fd), eof; {ok, < FirstFSize:8/unsigned, FirstTSize:24/unsigned, _/binary>>} -> - case file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B) of + case osiris_file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B) of {ok, < Size = LastChunkPos + ?HEADER_SIZE_B + LastChunkSize, %% TODO: this file:position/2 all has no actual function and %% is only used to emit a debug log. Remove? - {ok, Eof} = file:position(Fd, eof), + {ok, Eof} = osiris_file:position(Fd, eof), ?DEBUG_IF("~s: segment ~ts has trailing data ~w ~w", [?MODULE, filename:basename(SegFile), Size, Eof], Size =/= Eof), - _ = file:close(Fd), + _ = osiris_file:close(Fd), FstChInfo = #chunk_info{epoch = FirstEpoch, timestamp = FirstTs, id = FirstChId, @@ -2020,7 +2020,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> last = LastChInfo}}; _ -> % last chunk is corrupted - try the previous one - _ = file:close(Fd), + _ = osiris_file:close(Fd), {ok, ?IDX_MATCH(_ChId, _E, PrevChPos)} = nth_last_idx_record(IdxFile, 2), case PrevChPos == LastChunkPos of @@ -2217,10 +2217,10 @@ last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) -> {ok, Fd} = open(IdxFile, [read, raw, binary]), case first_idx_record(Fd) of {ok, ?IDX_MATCH(FstChId, FstEpoch, _)} -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), last_epoch_chunk_ids0(Files, {FstEpoch, FstChId, []}); _ -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), [] end; last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> @@ -2232,7 +2232,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE, fun leo_search_fun/3, Acc0), - ok = file:close(Fd), + ok = osiris_file:close(Fd), last_epoch_chunk_ids0(Rem, Acc); {ok, ?IDX_MATCH(LstChId, LstEpoch, _)} -> %% no scan needed, just pass last epoch chunk id pair @@ -2287,7 +2287,7 @@ idx_skip_search(Fd, Pos, Fun, Acc0) -> {return, Acc} -> Acc; {scan, Acc1} -> - {ok, Data} = file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B), + {ok, Data} = osiris_file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B), case idx_lin_scan(Data, Fun, Acc1) of {continue, Acc} -> idx_skip_search(Fd, PeekPos + ?INDEX_RECORD_SIZE_B, @@ -2298,7 +2298,7 @@ idx_skip_search(Fd, Pos, Fun, Acc0) -> end; _ -> %% eof or invalid index record - case file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B) of {ok, Data} -> case idx_lin_scan(Data, Fun, Acc0) of {continue, Acc} -> @@ -2412,10 +2412,10 @@ write_chunk(Chunk, false -> NextOffset = Next + NumRecords, Size = iolist_size(Chunk), - {ok, Cur} = file:position(Fd, cur), - ok = file:write(Fd, Chunk), + {ok, Cur} = osiris_file:position(Fd, cur), + ok = osiris_file:write(Fd, Chunk), - ok = file:write(IdxFd, + ok = osiris_file:write(IdxFd, < ok; sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> - case file:sendfile(Fd, Sock, Pos, ToSend, []) of + case osiris_file:sendfile(Fd, Sock, Pos, ToSend, []) of {ok, 0} -> %% TODO add counter for this? sendfile(Transport, Fd, Sock, Pos, ToSend); @@ -2460,7 +2460,7 @@ sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> Err end; sendfile(ssl, Fd, Sock, Pos, ToSend) -> - case file:pread(Fd, Pos, ToSend) of + case osiris_file:pread(Fd, Pos, ToSend) of {ok, Data} -> ssl:send(Sock, Data); {error, _} = Err -> @@ -2473,7 +2473,7 @@ send(ssl, Sock, Data) -> ssl:send(Sock, Data). last_timestamp_in_index_file(IdxFile) -> - case file:open(IdxFile, [raw, binary, read]) of + case osiris_file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, @@ -2481,10 +2481,10 @@ last_timestamp_in_index_file(IdxFile) -> _E:64/unsigned, _ChunkPos:32/unsigned, _ChType:8/unsigned>>} -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), {ok, LastTimestamp}; Err -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), Err end; Err -> @@ -2492,7 +2492,7 @@ last_timestamp_in_index_file(IdxFile) -> end. first_timestamp_from_index_files([IdxFile | _]) -> - case file:open(IdxFile, [raw, binary, read]) of + case osiris_file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, @@ -2500,10 +2500,10 @@ first_timestamp_from_index_files([IdxFile | _]) -> _FstE:64/unsigned, _FstChunkPos:32/unsigned, _FstChType:8/unsigned>>} -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), FstTimestamp; _ -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), 0 end; _Err -> @@ -2527,22 +2527,22 @@ chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) -> {ok, LstFd} = open(LstIdxFile, [read, raw, binary]), case position_at_idx_record_boundary(LstFd, eof) of {ok, Pos} -> - case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, + case osiris_file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(LstChId, _, _)} -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), - case file:pread(FstFd, ?IDX_HEADER_SIZE, + case osiris_file:pread(FstFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(FstChId, _, _)} -> - ok = file:close(FstFd), + ok = osiris_file:close(FstFd), {ok, {FstChId, LstChId}}; Err -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), Err end; Err -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), Err end end. @@ -2649,16 +2649,16 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), {ok, IdxFd} = - file:open( + osiris_file:open( filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), - ok = file:write(IdxFd, ?IDX_HEADER), + ok = osiris_file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = - file:open( + osiris_file:open( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), - ok = file:write(Fd, ?LOG_HEADER), + ok = osiris_file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file - {ok, _} = file:position(Fd, eof), - {ok, _} = file:position(IdxFd, eof), + {ok, _} = osiris_file:position(Fd, eof), + {ok, _} = osiris_file:position(IdxFd, eof), counters:add(Cnt, ?C_SEGMENTS, 1), State0#?MODULE{current_file = Filename, @@ -2672,7 +2672,7 @@ open_index_read(File) -> %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied - {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + {ok, ?IDX_HEADER_SIZE} = osiris_file:position(Fd, ?IDX_HEADER_SIZE), Fd. offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> @@ -2688,12 +2688,12 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> false -> {ok, IdxFd} = open(IndexFile, [read, raw, binary]), - _ = file:advise(IdxFd, 0, 0, random), + _ = osiris_file:advise(IdxFd, 0, 0, random), {Offset, SearchResult} = idx_skip_search(IdxFd, ?IDX_HEADER_SIZE, fun offset_search_fun/3, {Offset, not_found}), - ok = file:close(IdxFd), + ok = osiris_file:close(IdxFd), SearchResult end end, @@ -2720,7 +2720,7 @@ throw_missing(Any) -> Any. open(File, Options) -> - throw_missing(file:open(File, Options)). + throw_missing(osiris_file:open(File, Options)). chunk_location_for_timestamp(Idx, Ts) -> %% TODO: optimise using skip search approach @@ -2730,7 +2730,7 @@ chunk_location_for_timestamp(Idx, Ts) -> {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < _ChType:8/unsigned>>} -> case Ts =< Timestamp of true -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), {ChunkId, Timestamp, Epoch, FilePos}; false -> timestamp_idx_scan(Fd, Ts) end; eof -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), eof end. @@ -2806,19 +2806,20 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), - _ = file:advise(Fd, 0, 0, random), + {ok, {Mod, Fdn}} = open(filename:join(Dir, File), [read, raw, binary]), + Fd = {Mod, Fdn}, + _ = osiris_file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing %% the chance of encountering a snapshot and thus ensure we don't miss any %% tracking entries Trk0 = osiris_tracking:init(undefined, TrkConfig), Trk = recover_tracking(Fd, Trk0, ?LOG_HEADER_SIZE), - _ = file:close(Fd), + _ = osiris_file:close(Fd), Trk. recover_tracking(Fd, Trk0, Pos0) -> - case file:pread(Fd, Pos0, ?HEADER_SIZE_B) of + case osiris_file:pread(Fd, Pos0, ?HEADER_SIZE_B) of {ok, < %% tracking is written a single record so we don't %% have to parse {ok, <<0:1, S:31, Data:S/binary>>} = - file:pread(Fd, Pos + FSize, Size), + osiris_file:pread(Fd, Pos + FSize, Size), Trk = osiris_tracking:append_trailer(ChunkId, Data, Trk0), %% A tracking delta chunk will not have any writer data %% so no need to parse writers here recover_tracking(Fd, Trk, NextPos); ?CHNK_TRK_SNAPSHOT -> {ok, <<0:1, S:31, Data:S/binary>>} = - file:pread(Fd, Pos + FSize, Size), + osiris_file:pread(Fd, Pos + FSize, Size), Trk = osiris_tracking:init(Data, Trk0), recover_tracking(Fd, Trk, NextPos); ?CHNK_USER when TSize > 0 -> - {ok, TData} = file:pread(Fd, Pos + FSize + Size, TSize), + {ok, TData} = osiris_file:pread(Fd, Pos + FSize + Size, TSize), Trk = osiris_tracking:append_trailer(ChunkId, TData, Trk0), recover_tracking(Fd, Trk, NextPos); ?CHNK_USER -> @@ -2876,7 +2877,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% optimistically read 64 bytes (small binary) as it may save us %% a syscall reading the filter if the filter is of the default %% 16 byte size - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of + case osiris_file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of {ok, < %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, + case osiris_file:pread(Fd, Pos + ?HEADER_SIZE_B, FilterSize) of {ok, F} -> F; @@ -2956,10 +2957,10 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case file:open(filename:join(Dir, SegFile), + case osiris_file:open(filename:join(Dir, SegFile), [raw, binary, read]) of {ok, Fd2} -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), Read = Read0#read{next_offset = NextChId, position = ?LOG_HEADER_SIZE}, read_header0( @@ -3029,9 +3030,9 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) -> binary_to_integer(filename:basename(IdxFile, <<".index">>)). first_last_timestamps(IdxFile) -> - case file:open(IdxFile, [raw, read, binary]) of + case osiris_file:open(IdxFile, [raw, read, binary]) of {ok, Fd} -> - _ = file:advise(Fd, 0, 0, random), + _ = osiris_file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of {ok, <<_:64/unsigned, FirstTs:64/signed, @@ -3044,7 +3045,7 @@ first_last_timestamps(IdxFile) -> _:64/unsigned, _:32/unsigned, _:8/unsigned>>} = last_idx_record(Fd), - ok = file:close(Fd), + ok = osiris_file:close(Fd), {FirstTs, LastTs}; {error, einval} -> %% empty index @@ -3114,22 +3115,22 @@ timestamp_idx_file_search0(Ts, [IdxFile | Older], Prev) -> close_fd(undefined) -> ok; close_fd(Fd) -> - _ = file:close(Fd), + _ = osiris_file:close(Fd), ok. dump_init(File) -> - {ok, Fd} = file:open(File, [raw, binary, read]), - {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), + {ok, Fd} = osiris_file:open(File, [raw, binary, read]), + {ok, <<"OSIL", _V:4/binary>> } = osiris_file:read(Fd, ?LOG_HEADER_SIZE), Fd. dump_init_idx(File) -> - {ok, Fd} = file:open(File, [raw, binary, read]), - {ok, <<"OSII", _V:4/binary>> } = file:read(Fd, ?IDX_HEADER_SIZE), + {ok, Fd} = osiris_file:open(File, [raw, binary, read]), + {ok, <<"OSII", _V:4/binary>> } = osiris_file:read(Fd, ?IDX_HEADER_SIZE), Fd. dump_index(Fd) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < dump_chunk(Fd) -> - {ok, Pos} = file:position(Fd, cur), - case file:read(Fd, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of + {ok, Pos} = osiris_file:position(Fd, cur), + case osiris_file:read(Fd, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of {ok, < F; _ when FilterSize > 0 -> %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, + case osiris_file:pread(Fd, Pos + ?HEADER_SIZE_B, FilterSize) of {ok, F} -> F; @@ -3183,9 +3184,9 @@ dump_chunk(Fd) -> _ -> <<>> end, - {ok, Data} = file:pread(Fd, Pos + FilterSize + ?HEADER_SIZE_B, DataSize), + {ok, Data} = osiris_file:pread(Fd, Pos + FilterSize + ?HEADER_SIZE_B, DataSize), CrcMatch = erlang:crc32(Data) =:= Crc, - _ = file:position(Fd, NextPos), + _ = osiris_file:position(Fd, NextPos), #{chunk_id => NextChId0, epoch => Epoch, type => ChType, @@ -3221,7 +3222,7 @@ iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> undefined; iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) when Credit == all orelse NumEntries == 1 -> - {ok, Data} = file:pread(Fd, Pos, DataSize), + {ok, Data} = osiris_file:pread(Fd, Pos, DataSize), validate_crc(ChunkId, Crc, Data), Data; iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> @@ -3230,7 +3231,7 @@ iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> %% We can only practically validate CRC if we read the whole data Credit = min(Credit0, NumEntries), Size = DataSize div NumEntries * Credit, - {ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), + {ok, Data} = osiris_file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), Data. list_dir(Dir) -> From 7c2fef9bdd9bd8dbe5225dd054e2f62e720b8c32 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Mon, 19 May 2025 17:29:14 +0000 Subject: [PATCH 4/9] Put all file* operations in a new module --- src/osiris_file.erl | 142 ++++++++++++++++++++++++++++++++++++++++---- src/osiris_log.erl | 28 ++++----- 2 files changed, 145 insertions(+), 25 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index b69ae78..c703264 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -12,6 +12,7 @@ open/2, position/2, pread/3, + prim_delete/1, read/2, read_file_info/1, sendfile/5, @@ -20,7 +21,7 @@ ]). -callback advise(Handle :: term(), Offset :: non_neg_integer(), - Length :: non_neg_integer(), Advise :: term()) -> + Length :: non_neg_integer(), Advise :: term()) -> ok | {error, Reason :: term()}. -callback close(Handle :: term()) -> @@ -60,8 +61,8 @@ {ok, file:file_info()} | {error, Reason :: term()}. -callback sendfile(Handle :: term(), Socket :: term(), - Offset :: non_neg_integer(), Length :: non_neg_integer(), - Options :: list()) -> + Offset :: non_neg_integer(), Length :: non_neg_integer(), + Options :: list()) -> {ok, BytesSent :: non_neg_integer()} | {error, Reason :: term()}. -callback truncate(Handle :: term()) -> @@ -72,40 +73,97 @@ -optional_callbacks([write/2]). +-type file_handle() :: {module(), term()} | file:io_device(). + +-type posix_file_advise() :: normal | + sequential | + random | + no_reuse | + will_need | + dont_need. + +-spec advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when + Handle :: file_handle(), + Offset :: integer(), + Length :: integer(), + Advise :: posix_file_advise(), + Reason :: file:posix() | badarg. + advise({Mod, Handle}, Offset, Length, Advise) -> Mod:advise(Handle, Offset, Length, Advise); advise(Handle, Offset, Length, Advise) -> file:advise(Handle, Offset, Length, Advise). + +-spec close(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + close({Mod, Handle}) -> Mod:close(Handle); close(Handle) -> file:close(Handle). +-spec copy(file:filename_all(), file:filename_all()) -> + {ok, non_neg_integer()} | {error, term()}. %% TODO copy(Source, Destination) -> file:copy(Source, Destination). + + +-spec del_dir(file:filename_all()) -> + ok | {error, term()}. %% TODO +%% Used when a queue is deleted, should perhaps move the entire osiris_log:delete_directory, and +%% let the Mod handle deletion of storage on its side too. del_dir(Dir) -> - file:del_dir(Dir). + Mod = get_mod(), + Mod:del_dir(Dir). -%% TODO + +-spec delete(file:filename_all()) -> + ok | {error, term()}. +%% Do we need the prim_* function calls? delete(File) -> - prim_file:delete(File). + Mod = get_mod(), + Mod:delete(File). -%% TODO +-spec prim_delete(file:filename_all()) -> + ok | {error, term()}. + +prim_delete(File) -> + Mod = get_mod(prim_file), + Mod:delete(File). + + +-spec ensure_dir(file:filename_all()) -> + ok | {error, term()}. +%% Only used for local files ensure_dir(Dir) -> filelib:ensure_dir(Dir). + +-spec list_dir(file:filename_all()) -> + {ok, [file:filename()]} | {error, term()}. %% TODO list_dir(Dir) -> - prim_file:list_dir(Dir). + Mod = get_mod(prim_file), + Mod:list_dir(Dir). -%% TODO + +-spec make_dir(file:filename_all()) -> + ok | {error, term()}. +%% Only used for the local segment file, no need to change it. make_dir(Dir) -> file:make_dir(Dir). + +-spec open(File, Modes) -> {ok, file_handle()} | {error, Reason} when + File :: Filename | file:iodata(), + Filename :: file:name_all(), + Modes :: [file:mode() | ram | directory], + Reason :: file:posix() | badarg | system_limit. open(File, Options) -> case lists:member(write, Options) of true -> @@ -119,44 +177,94 @@ open(File, Options) -> end. +-spec position(Handle, Location) -> {ok, NewPosition} | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + NewPosition :: integer(), + Reason :: file:posix() | badarg | terminated. + position({Mod, Handle}, Position) -> Mod:position(Handle, Position); position(Handle, Position) -> file:position(Handle, Position). +-spec pread(Handle, Location, Number) -> + {ok, Data} | eof | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() | badarg | terminated. + pread({Mod, Handle}, Position, Size) -> Mod:pread(Handle, Position, Size); pread(Handle, Position, Size) -> file:pread(Handle, Position, Size). +-spec read(Handle, Number) -> {ok, Data} | eof | {error, Reason} when + Handle :: file_handle() | io:device(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() + | badarg + | terminated + | {no_translation, unicode, latin1}. + read({Mod, Handle}, Size) -> Mod:read(Handle, Size); read(Handle, Size) -> file:read(Handle, Size). +-spec read_file_info(file:filename_all()) -> + {ok, file:file_info()} | {error, term()}. %% Todo read_file_info(File) -> - prim_file:read_file_info(File). + Mod = get_mod(prim_file), + Mod:read_file_info(File). + + +-type sendfile_option() :: {chunk_size, non_neg_integer()} + | {use_threads, boolean()}. +-spec sendfile(Handle, Socket, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | + closed | badarg | not_owner} when + Handle :: file_handle(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + Opts :: [sendfile_option()]. sendfile({Mod, Handle}, Socket, Offset, Length, Options) -> Mod:sendfile(Handle, Socket, Offset, Length, Options); sendfile(Handle, Socket, Offset, Length, Options) -> file:sendfile(Handle, Socket, Offset, Length, Options). + +-spec truncate(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + truncate({Mod, Handle}) -> Mod:truncate(Handle); truncate(Handle) -> file:truncate(Handle). +-spec write(Handle, Bytes) -> ok | {error, Reason} when + Handle :: file_handle() | io:device(), + Bytes :: iodata(), + Reason :: file:posix() | badarg | terminated. write({Mod, Handle}, Data) -> Mod:write(Handle, Data); write(Handle, Data) -> file:write(Handle, Data). +%% -spec try_write(module(), term(), iodata()) -> +%% ok | {error, term()}. %% try_write(Mod, Handle, Data) -> %% case erlang:function_exported(Mod, write, 2) of %% true -> @@ -165,5 +273,17 @@ write(Handle, Data) -> %% file:write(Handle, Data) %% end. + +-spec get_mod() -> module(). + get_mod() -> - file. + get_mod(file). + +get_mod(file) -> + %% TODO. This will figure out the correct module to use, based on + %% info in the magical manifest file. + file; +get_mod(prim_file) -> + %% Just temporary solutin till I figure out why + %% we even use prim_file? + prim_file. diff --git a/src/osiris_log.erl b/src/osiris_log.erl index acb1170..9936831 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -497,8 +497,8 @@ init(#{dir := Dir, ?DEBUG_(Name, "max_segment_size_bytes: ~b, max_segment_size_chunks ~b, retention ~w, filter size ~b", [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), - ok = filelib:ensure_dir(Dir), - case file:make_dir(Dir) of + ok = osiris_file:ensure_dir(Dir), + case osiris_file:make_dir(Dir) of ok -> ok; {error, eexist} -> @@ -631,7 +631,7 @@ maybe_fix_corrupted_files(#{dir := Dir}) -> [begin ?INFO("deleting left over segment '~s' in directory ~s", [F, Dir]), - ok = prim_file:delete(filename:join(Dir, F)) + ok = osiris_file:prim_delete(filename:join(Dir, F)) end|| F <- orphaned_segments(Dir)], ok; maybe_fix_corrupted_files([IdxFile]) -> @@ -664,8 +664,8 @@ maybe_fix_corrupted_files(IdxFiles) -> N when N =< ?HEADER_SIZE_B -> % if the segment doesn't contain any chunks, just delete it ?WARNING("deleting an empty segment file: ~0p", [LastSegFile]), - ok = prim_file:delete(LastIdxFile), - ok = prim_file:delete(LastSegFile), + ok = osiris_file:prim_delete(LastIdxFile), + ok = osiris_file:prim_delete(LastSegFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]); LastSegFileSize -> ok = truncate_invalid_idx_records(LastIdxFile, LastSegFileSize) @@ -673,7 +673,7 @@ maybe_fix_corrupted_files(IdxFiles) -> % if the last segment is missing, just delete its index ?WARNING("deleting index of the missing last segment file: ~0p", [LastSegFile]), - ok = prim_file:delete(LastIdxFile), + ok = osiris_file:prim_delete(LastIdxFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]) end. @@ -896,8 +896,8 @@ chunk_id_index_scan0(Fd, ChunkId) -> delete_segment_from_index(Index) -> File = segment_from_index_file(Index), ?DEBUG("osiris_log: deleting segment ~ts", [File]), - ok = prim_file:delete(Index), - ok = prim_file:delete(File), + ok = osiris_file:prim_delete(Index), + ok = osiris_file:prim_delete(File), ok. truncate_to(_Name, _Range, _EpochOffsets, []) -> @@ -1761,13 +1761,13 @@ delete_directory(Name) when ?IS_STRING(Name) -> delete_dir(Dir). delete_dir(Dir) -> - case file:list_dir(Dir) of + case osiris_file:list_dir(Dir) of {ok, Files} -> [ok = - file:delete( + osiris_file:delete( filename:join(Dir, F)) || F <- Files], - ok = file:del_dir(Dir); + ok = osiris_file:del_dir(Dir); {error, enoent} -> ok end. @@ -2183,7 +2183,7 @@ eval_max_bytes([IdxFile | Rest], Limit, Acc) -> end. file_size(Path) -> - case prim_file:read_file_info(Path) of + case osiris_file:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -2191,7 +2191,7 @@ file_size(Path) -> end. file_size_or_zero(Path) -> - case prim_file:read_file_info(Path) of + case osiris_file:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -3235,7 +3235,7 @@ iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> Data. list_dir(Dir) -> - case prim_file:list_dir(Dir) of + case osiris_file:list_dir(Dir) of {error, enoent} -> []; {ok, Files} -> From 8cad52ae021548bea7bfa50783b7f8c988202895 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Mon, 19 May 2025 17:49:54 +0000 Subject: [PATCH 5/9] Correct callback and spec --- src/osiris_file.erl | 163 +++++++++++++++++++++++++++++--------------- 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index c703264..0be076b 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -20,67 +20,108 @@ write/2 ]). --callback advise(Handle :: term(), Offset :: non_neg_integer(), - Length :: non_neg_integer(), Advise :: term()) -> - ok | {error, Reason :: term()}. --callback close(Handle :: term()) -> - ok | {error, Reason :: term()}. +-type file_handle() :: {module(), term()} | file:io_device(). --callback copy(Source :: file:filename_all(), Destination :: file:filename_all()) -> - {ok, BytesCopied :: non_neg_integer()} | {error, Reason :: term()}. +-type posix_file_advise() :: normal | + sequential | + random | + no_reuse | + will_need | + dont_need. --callback del_dir(Dir :: file:filename_all()) -> - ok | {error, Reason :: term()}. +-type sendfile_option() :: {chunk_size, non_neg_integer()} + | {use_threads, boolean()}. --callback delete(File :: file:filename_all()) -> - ok | {error, Reason :: term()}. --callback make_dir(Dir :: file:filename_all()) -> - ok | {error, Reason :: term()}. +-callback advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when + Handle :: file_handle(), + Offset :: integer(), + Length :: integer(), + Advise :: posix_file_advise(), + Reason :: file:posix() | badarg. --callback ensure_dir(Dir :: file:filename_all()) -> - ok | {error, Reason :: term()}. +-callback close(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. --callback list_dir(Dir :: file:filename_all()) -> - {ok, [file:filename()]} | {error, Reason :: term()}. +-callback copy(Source, Destination) -> {ok, BytesCopied} | {error, Reason} when + Source :: file_handle() | Filename | {Filename, Modes}, + Destination :: file_handle() | Filename | {Filename, Modes}, + Filename :: file:name_all(), + Modes :: [file:mode()], + BytesCopied :: non_neg_integer(), + Reason :: file:posix() | badarg | terminated. --callback open(File :: file:filename_all(), Options :: list()) -> - {ok, Handle :: term()} | {error, Reason :: term()}. +-callback del_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name_all(), + Reason :: file:posix() | badarg. --callback position(Handle :: term(), Position :: file:position()) -> - {ok, NewPosition :: non_neg_integer()} | {error, Reason :: term()}. +-callback delete(Filename) -> ok | {error, Reason} when + Filename :: file:name_all(), + Reason :: file:posix() | badarg. --callback pread(Handle :: term(), Position :: non_neg_integer(), Size :: non_neg_integer()) -> - {ok, Data :: binary()} | eof | {error, Reason :: term()}. +-callback list_dir(Dir) -> {ok, Filenames} | {error, Reason} when + Dir :: file:name_all(), + Filenames :: [file:filename()], + Reason :: file:posix() + | badarg + | {no_translation, Filename :: unicode:latin1_binary()}. --callback read(Handle :: term(), Size :: non_neg_integer()) -> - {ok, Data :: binary()} | eof | {error, Reason :: term()}. +-callback open(File, Modes) -> {ok, file_handle()} | {error, Reason} when + File :: Filename | file:iodata(), + Filename :: file:name_all(), + Modes :: [file:mode() | ram | directory], + Reason :: file:posix() | badarg | system_limit. --callback read_file_info(File :: file:filename_all()) -> - {ok, file:file_info()} | {error, Reason :: term()}. +-callback position(Handle, Location) -> {ok, NewPosition} | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + NewPosition :: integer(), + Reason :: file:posix() | badarg | terminated. --callback sendfile(Handle :: term(), Socket :: term(), - Offset :: non_neg_integer(), Length :: non_neg_integer(), - Options :: list()) -> - {ok, BytesSent :: non_neg_integer()} | {error, Reason :: term()}. +-callback pread(Handle, Location, Number) -> + {ok, Data} | eof | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() | badarg | terminated. --callback truncate(Handle :: term()) -> - ok | {error, Reason :: term()}. +-callback read(Handle, Number) -> {ok, Data} | eof | {error, Reason} when + Handle :: file_handle() | io:device(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() + | badarg + | terminated + | {no_translation, unicode, latin1}. --callback write(Handle :: term(), Data :: iodata()) -> - ok | {error, Reason :: term()}. +-callback read_file_info(File) -> {ok, FileInfo} | {error, Reason} when + File :: file:name_all() | file_handle(), + FileInfo :: file:file_info(), + Reason :: file:posix() | badarg. --optional_callbacks([write/2]). +-callback sendfile(Handle, Socket, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | + closed | badarg | not_owner} when + Handle :: file_handle(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + Opts :: [sendfile_option()]. --type file_handle() :: {module(), term()} | file:io_device(). +-callback truncate(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. --type posix_file_advise() :: normal | - sequential | - random | - no_reuse | - will_need | - dont_need. +-callback write(Handle, Bytes) -> ok | {error, Reason} when + Handle :: file_handle() | io:device(), + Bytes :: iodata(), + Reason :: file:posix() | badarg | terminated. + +-optional_callbacks([write/2]). -spec advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when Handle :: file_handle(), @@ -105,15 +146,20 @@ close(Handle) -> file:close(Handle). --spec copy(file:filename_all(), file:filename_all()) -> - {ok, non_neg_integer()} | {error, term()}. +-spec copy(Source, Destination) -> {ok, BytesCopied} | {error, Reason} when + Source :: file_handle() | Filename | {Filename, Modes}, + Destination :: file_handle() | Filename | {Filename, Modes}, + Filename :: file:name_all(), + Modes :: [file:mode()], + BytesCopied :: non_neg_integer(), + Reason :: file:posix() | badarg | terminated. %% TODO copy(Source, Destination) -> file:copy(Source, Destination). - --spec del_dir(file:filename_all()) -> - ok | {error, term()}. +-spec del_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name_all(), + Reason :: file:posix() | badarg. %% TODO %% Used when a queue is deleted, should perhaps move the entire osiris_log:delete_directory, and %% let the Mod handle deletion of storage on its side too. @@ -122,8 +168,9 @@ del_dir(Dir) -> Mod:del_dir(Dir). --spec delete(file:filename_all()) -> - ok | {error, term()}. +-spec delete(Filename) -> ok | {error, Reason} when + Filename :: file:name_all(), + Reason :: file:posix() | badarg. %% Do we need the prim_* function calls? delete(File) -> Mod = get_mod(), @@ -144,8 +191,12 @@ ensure_dir(Dir) -> filelib:ensure_dir(Dir). --spec list_dir(file:filename_all()) -> - {ok, [file:filename()]} | {error, term()}. +-spec list_dir(Dir) -> {ok, Filenames} | {error, Reason} when + Dir :: file:name_all(), + Filenames :: [file:filename()], + Reason :: file:posix() + | badarg + | {no_translation, Filename :: unicode:latin1_binary()}. %% TODO list_dir(Dir) -> Mod = get_mod(prim_file), @@ -218,16 +269,16 @@ read(Handle, Size) -> file:read(Handle, Size). --spec read_file_info(file:filename_all()) -> - {ok, file:file_info()} | {error, term()}. +-spec read_file_info(File) -> {ok, FileInfo} | {error, Reason} when + File :: file:name_all() | file_handle(), + FileInfo :: file:file_info(), + Reason :: file:posix() | badarg. %% Todo read_file_info(File) -> Mod = get_mod(prim_file), Mod:read_file_info(File). --type sendfile_option() :: {chunk_size, non_neg_integer()} - | {use_threads, boolean()}. -spec sendfile(Handle, Socket, Offset, Bytes, Opts) -> {'ok', non_neg_integer()} | {'error', inet:posix() | closed | badarg | not_owner} when From 597fe2dfc154f881b265f30703608525dc70ce3f Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 20 May 2025 23:51:42 +0000 Subject: [PATCH 6/9] eh --- src/osiris_file.erl | 58 +++++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index 0be076b..a02ac5d 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -81,7 +81,7 @@ Reason :: file:posix() | badarg | terminated. -callback pread(Handle, Location, Number) -> - {ok, Data} | eof | {error, Reason} when + {ok, Data} | eof | {error, Reason} when Handle :: file_handle(), Location :: file:location(), Number :: non_neg_integer(), @@ -103,14 +103,14 @@ Reason :: file:posix() | badarg. -callback sendfile(Handle, Socket, Offset, Bytes, Opts) -> - {'ok', non_neg_integer()} | {'error', inet:posix() | - closed | badarg | not_owner} when + {ok, non_neg_integer()} | {error, inet:posix() | + closed | badarg | not_owner} when Handle :: file_handle(), Socket :: inet:socket() | socket:socket() | fun ((iolist()) -> ok | {error, inet:posix() | closed}), - Offset :: non_neg_integer(), - Bytes :: non_neg_integer(), - Opts :: [sendfile_option()]. + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + Opts :: [sendfile_option()]. -callback truncate(Handle) -> ok | {error, Reason} when Handle :: file_handle(), @@ -176,16 +176,19 @@ delete(File) -> Mod = get_mod(), Mod:delete(File). --spec prim_delete(file:filename_all()) -> - ok | {error, term()}. + +-spec prim_delete(Filename) -> ok | {error, Reason} when + Filename :: file:name_all(), + Reason :: file:posix() | badarg. prim_delete(File) -> - Mod = get_mod(prim_file), + Mod = get_mod(prim_file, File), Mod:delete(File). --spec ensure_dir(file:filename_all()) -> - ok | {error, term()}. +-spec ensure_dir(Name) -> ok | {error, Reason} when + Name :: file:name_all(), + Reason :: file:posix(). %% Only used for local files ensure_dir(Dir) -> filelib:ensure_dir(Dir). @@ -203,8 +206,9 @@ list_dir(Dir) -> Mod:list_dir(Dir). --spec make_dir(file:filename_all()) -> - ok | {error, term()}. +-spec make_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name(), + Reason :: file:posix() | badarg. %% Only used for the local segment file, no need to change it. make_dir(Dir) -> file:make_dir(Dir). @@ -222,7 +226,7 @@ open(File, Options) -> file:open(File, Options); false -> %% Here we will get the correct Mod based on config/manifest file etc. - Mod = get_mod(), + Mod = get_mod(File), {ok, Fd} = Mod:open(File, Options), {ok, {Mod, Fd}} end. @@ -275,13 +279,13 @@ read(Handle, Size) -> Reason :: file:posix() | badarg. %% Todo read_file_info(File) -> - Mod = get_mod(prim_file), + Mod = get_mod(prim_file, File), Mod:read_file_info(File). -spec sendfile(Handle, Socket, Offset, Bytes, Opts) -> - {'ok', non_neg_integer()} | {'error', inet:posix() | - closed | badarg | not_owner} when + {ok, non_neg_integer()} | {error, inet:posix() | + closed | badarg | not_owner} when Handle :: file_handle(), Socket :: inet:socket() | socket:socket() | fun ((iolist()) -> ok | {error, inet:posix() | closed}), @@ -337,4 +341,22 @@ get_mod(file) -> get_mod(prim_file) -> %% Just temporary solutin till I figure out why %% we even use prim_file? - prim_file. + prim_file; +get_mod(File) -> + case filelib:is_file(File) of + true -> + file; + false -> + application:get_env(osiris, io_segment_module, file) + end. + + +get_mod(prim_file, File) -> + %% Just temporary solutin till I figure out why + %% we even use prim_file? + case filelib:is_file(File) of + true -> + prim_file; + false -> + application:get_env(osiris, io_segment_module, prim_file) + end. From d5de5e48f1962ddc94ace5d74012a41cd1140286 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 20 May 2025 23:52:06 +0000 Subject: [PATCH 7/9] log fix --- src/osiris_log.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 9936831..ed3acee 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2447,10 +2447,13 @@ max_segment_size_reached( CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks. +%%TODO This should be fully up to the backend. sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> case osiris_file:sendfile(Fd, Sock, Pos, ToSend, []) of + ok -> + ok; {ok, 0} -> %% TODO add counter for this? sendfile(Transport, Fd, Sock, Pos, ToSend); @@ -2806,8 +2809,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, {Mod, Fdn}} = open(filename:join(Dir, File), [read, raw, binary]), - Fd = {Mod, Fdn}, + {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), _ = osiris_file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing From b99f92c684a49af79aa3a31aa0cae841a2194aa4 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Fri, 23 May 2025 19:40:13 +0000 Subject: [PATCH 8/9] sendfile fix --- src/osiris_file.erl | 58 ++++++++++++++++++------------------- src/osiris_file_default.erl | 22 ++++++++++++-- src/osiris_log.erl | 25 +--------------- 3 files changed, 50 insertions(+), 55 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index a02ac5d..b38a938 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -123,6 +123,8 @@ -optional_callbacks([write/2]). +-define(DEFAULT_FILE, osiris_file_default). + -spec advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when Handle :: file_handle(), Offset :: integer(), @@ -133,7 +135,7 @@ advise({Mod, Handle}, Offset, Length, Advise) -> Mod:advise(Handle, Offset, Length, Advise); advise(Handle, Offset, Length, Advise) -> - file:advise(Handle, Offset, Length, Advise). + ?DEFAULT_FILE:advise(Handle, Offset, Length, Advise). -spec close(Handle) -> ok | {error, Reason} when @@ -143,7 +145,7 @@ advise(Handle, Offset, Length, Advise) -> close({Mod, Handle}) -> Mod:close(Handle); close(Handle) -> - file:close(Handle). + ?DEFAULT_FILE:close(Handle). -spec copy(Source, Destination) -> {ok, BytesCopied} | {error, Reason} when @@ -155,7 +157,7 @@ close(Handle) -> Reason :: file:posix() | badarg | terminated. %% TODO copy(Source, Destination) -> - file:copy(Source, Destination). + ?DEFAULT_FILE:copy(Source, Destination). -spec del_dir(Dir) -> ok | {error, Reason} when Dir :: file:name_all(), @@ -191,7 +193,7 @@ prim_delete(File) -> Reason :: file:posix(). %% Only used for local files ensure_dir(Dir) -> - filelib:ensure_dir(Dir). + ?DEFAULT_FILE:ensure_dir(Dir). -spec list_dir(Dir) -> {ok, Filenames} | {error, Reason} when @@ -211,7 +213,7 @@ list_dir(Dir) -> Reason :: file:posix() | badarg. %% Only used for the local segment file, no need to change it. make_dir(Dir) -> - file:make_dir(Dir). + ?DEFAULT_FILE:make_dir(Dir). -spec open(File, Modes) -> {ok, file_handle()} | {error, Reason} when @@ -223,7 +225,7 @@ open(File, Options) -> case lists:member(write, Options) of true -> %% We do not use tiered storage for writes - file:open(File, Options); + ?DEFAULT_FILE:open(File, Options); false -> %% Here we will get the correct Mod based on config/manifest file etc. Mod = get_mod(File), @@ -241,7 +243,7 @@ open(File, Options) -> position({Mod, Handle}, Position) -> Mod:position(Handle, Position); position(Handle, Position) -> - file:position(Handle, Position). + ?DEFAULT_FILE:position(Handle, Position). -spec pread(Handle, Location, Number) -> @@ -270,7 +272,7 @@ pread(Handle, Position, Size) -> read({Mod, Handle}, Size) -> Mod:read(Handle, Size); read(Handle, Size) -> - file:read(Handle, Size). + ?DEFAULT_FILE:read(Handle, Size). -spec read_file_info(File) -> {ok, FileInfo} | {error, Reason} when @@ -283,20 +285,20 @@ read_file_info(File) -> Mod:read_file_info(File). --spec sendfile(Handle, Socket, Offset, Bytes, Opts) -> - {ok, non_neg_integer()} | {error, inet:posix() | - closed | badarg | not_owner} when +-spec sendfile(Transport, Handle, Socket, Offset, Bytes) -> + ok | {error, inet:posix() | + closed | badarg | not_owner} when + Transport :: tcp | ssl, Handle :: file_handle(), Socket :: inet:socket() | socket:socket() | fun ((iolist()) -> ok | {error, inet:posix() | closed}), Offset :: non_neg_integer(), - Bytes :: non_neg_integer(), - Opts :: [sendfile_option()]. + Bytes :: non_neg_integer(). -sendfile({Mod, Handle}, Socket, Offset, Length, Options) -> - Mod:sendfile(Handle, Socket, Offset, Length, Options); -sendfile(Handle, Socket, Offset, Length, Options) -> - file:sendfile(Handle, Socket, Offset, Length, Options). +sendfile(Transport, {Mod, Handle}, Socket, Offset, Length) -> + Mod:sendfile(Transport, Handle, Socket, Offset, Length); +sendfile(Transport, Handle, Socket, Offset, Length) -> + ?DEFAULT_FILE:sendfile(Transport, Handle, Socket, Offset, Length). -spec truncate(Handle) -> ok | {error, Reason} when @@ -306,7 +308,7 @@ sendfile(Handle, Socket, Offset, Length, Options) -> truncate({Mod, Handle}) -> Mod:truncate(Handle); truncate(Handle) -> - file:truncate(Handle). + ?DEFAULT_FILE:truncate(Handle). -spec write(Handle, Bytes) -> ok | {error, Reason} when @@ -316,7 +318,7 @@ truncate(Handle) -> write({Mod, Handle}, Data) -> Mod:write(Handle, Data); write(Handle, Data) -> - file:write(Handle, Data). + ?DEFAULT_FILE:write(Handle, Data). %% -spec try_write(module(), term(), iodata()) -> %% ok | {error, term()}. @@ -329,25 +331,23 @@ write(Handle, Data) -> %% end. +%% TODO code below just hack to make it work for now. -spec get_mod() -> module(). get_mod() -> - get_mod(file). + ?DEFAULT_FILE; -get_mod(file) -> - %% TODO. This will figure out the correct module to use, based on - %% info in the magical manifest file. - file; get_mod(prim_file) -> %% Just temporary solutin till I figure out why %% we even use prim_file? - prim_file; + %% prim_file; + ?DEFAULT_FILE; get_mod(File) -> case filelib:is_file(File) of true -> - file; + ?DEFAULT_FILE; false -> - application:get_env(osiris, io_segment_module, file) + application:get_env(osiris, io_segment_module, ?DEFAULT_FILE) end. @@ -356,7 +356,7 @@ get_mod(prim_file, File) -> %% we even use prim_file? case filelib:is_file(File) of true -> - prim_file; + ?DEFAULT_FILE; false -> - application:get_env(osiris, io_segment_module, prim_file) + application:get_env(osiris, io_segment_module, ?DEFAULT_FILE) end. diff --git a/src/osiris_file_default.erl b/src/osiris_file_default.erl index c4a9de2..2695f5d 100644 --- a/src/osiris_file_default.erl +++ b/src/osiris_file_default.erl @@ -1,3 +1,4 @@ +%% Really need a better name... -module(osiris_file_default). -behaviour(osiris_file). @@ -61,8 +62,25 @@ read(Handle, Size) -> read_file_info(File) -> prim_file:read_file_info(File). -sendfile(Handle, Socket, Offset, Length, Options) -> - file:sendfile(Handle, Socket, Offset, Length, Options). +sendfile(_Transport, _Handle, _Sock, _Pos, 0) -> + ok; +sendfile(tcp = Transport, Handle, Sock, Pos, ToSend) -> + case file:sendfile(Handle, Sock, Pos, ToSend, []) of + {ok, 0} -> + %% TODO add counter for this? + sendfile(Transport, Handle, Sock, Pos, ToSend); + {ok, BytesSent} -> + sendfile(Transport, Handle, Sock, Pos + BytesSent, ToSend - BytesSent); + {error, _} = Err -> + Err + end; +sendfile(ssl, Handle, Sock, Pos, ToSend) -> + case file:pread(Handle, Pos, ToSend) of + {ok, Data} -> + ssl:send(Sock, Data); + {error, _} = Err -> + Err + end. truncate(Handle) -> file:truncate(Handle). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index ed3acee..327414a 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1696,7 +1696,7 @@ send_file(Sock, _ = Callback(Header, ToSend + byte_size(HeaderData)), case send(Transport, Sock, HeaderData) of ok -> - case sendfile(Transport, Fd, Sock, + case osiris_file:sendfile(Transport, Fd, Sock, Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of ok -> State = State1#?MODULE{mode = Read}, @@ -2447,29 +2447,6 @@ max_segment_size_reached( CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks. -%%TODO This should be fully up to the backend. -sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> - ok; -sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> - case osiris_file:sendfile(Fd, Sock, Pos, ToSend, []) of - ok -> - ok; - {ok, 0} -> - %% TODO add counter for this? - sendfile(Transport, Fd, Sock, Pos, ToSend); - {ok, BytesSent} -> - sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent); - {error, _} = Err -> - Err - end; -sendfile(ssl, Fd, Sock, Pos, ToSend) -> - case osiris_file:pread(Fd, Pos, ToSend) of - {ok, Data} -> - ssl:send(Sock, Data); - {error, _} = Err -> - Err - end. - send(tcp, Sock, Data) -> gen_tcp:send(Sock, Data); send(ssl, Sock, Data) -> From fb1d529dc7de543dc4071541486648cb8cc763d8 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Thu, 29 May 2025 17:24:13 +0000 Subject: [PATCH 9/9] no prim special handling --- src/osiris_file.erl | 55 ++++++++++--------------------------- src/osiris_file_default.erl | 2 +- src/osiris_log.erl | 12 ++++---- 3 files changed, 22 insertions(+), 47 deletions(-) diff --git a/src/osiris_file.erl b/src/osiris_file.erl index b38a938..ed42699 100644 --- a/src/osiris_file.erl +++ b/src/osiris_file.erl @@ -12,7 +12,6 @@ open/2, position/2, pread/3, - prim_delete/1, read/2, read_file_info/1, sendfile/5, @@ -178,16 +177,6 @@ delete(File) -> Mod = get_mod(), Mod:delete(File). - --spec prim_delete(Filename) -> ok | {error, Reason} when - Filename :: file:name_all(), - Reason :: file:posix() | badarg. - -prim_delete(File) -> - Mod = get_mod(prim_file, File), - Mod:delete(File). - - -spec ensure_dir(Name) -> ok | {error, Reason} when Name :: file:name_all(), Reason :: file:posix(). @@ -204,7 +193,7 @@ ensure_dir(Dir) -> | {no_translation, Filename :: unicode:latin1_binary()}. %% TODO list_dir(Dir) -> - Mod = get_mod(prim_file), + Mod = get_mod(), Mod:list_dir(Dir). @@ -281,7 +270,7 @@ read(Handle, Size) -> Reason :: file:posix() | badarg. %% Todo read_file_info(File) -> - Mod = get_mod(prim_file, File), + Mod = get_mod(File), Mod:read_file_info(File). @@ -316,32 +305,29 @@ truncate(Handle) -> Bytes :: iodata(), Reason :: file:posix() | badarg | terminated. write({Mod, Handle}, Data) -> - Mod:write(Handle, Data); + Mod:write(Handle, Data), + try_write(Mod, Handle, Data); write(Handle, Data) -> ?DEFAULT_FILE:write(Handle, Data). -%% -spec try_write(module(), term(), iodata()) -> -%% ok | {error, term()}. -%% try_write(Mod, Handle, Data) -> -%% case erlang:function_exported(Mod, write, 2) of -%% true -> -%% Mod:write(Handle, Data); -%% false -> -%% file:write(Handle, Data) -%% end. + +-spec try_write(module(), term(), iodata()) -> + ok | {error, term()}. +try_write(Mod, Handle, Data) -> + case erlang:function_exported(Mod, write, 2) of + true -> + Mod:write(Handle, Data); + false -> + ?DEFAULT_FILE:write(Handle, Data) + end. %% TODO code below just hack to make it work for now. -spec get_mod() -> module(). get_mod() -> - ?DEFAULT_FILE; + ?DEFAULT_FILE. -get_mod(prim_file) -> - %% Just temporary solutin till I figure out why - %% we even use prim_file? - %% prim_file; - ?DEFAULT_FILE; get_mod(File) -> case filelib:is_file(File) of true -> @@ -349,14 +335,3 @@ get_mod(File) -> false -> application:get_env(osiris, io_segment_module, ?DEFAULT_FILE) end. - - -get_mod(prim_file, File) -> - %% Just temporary solutin till I figure out why - %% we even use prim_file? - case filelib:is_file(File) of - true -> - ?DEFAULT_FILE; - false -> - application:get_env(osiris, io_segment_module, ?DEFAULT_FILE) - end. diff --git a/src/osiris_file_default.erl b/src/osiris_file_default.erl index 2695f5d..e946d83 100644 --- a/src/osiris_file_default.erl +++ b/src/osiris_file_default.erl @@ -33,7 +33,7 @@ copy(Source, Destination) -> file:copy(Source, Destination). del_dir(Dir) -> - file:del_dir(Dir). + prim_file:del_dir(Dir). delete(File) -> prim_file:delete(File). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 327414a..ea056af 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -631,7 +631,7 @@ maybe_fix_corrupted_files(#{dir := Dir}) -> [begin ?INFO("deleting left over segment '~s' in directory ~s", [F, Dir]), - ok = osiris_file:prim_delete(filename:join(Dir, F)) + ok = osiris_file:delete(filename:join(Dir, F)) end|| F <- orphaned_segments(Dir)], ok; maybe_fix_corrupted_files([IdxFile]) -> @@ -664,8 +664,8 @@ maybe_fix_corrupted_files(IdxFiles) -> N when N =< ?HEADER_SIZE_B -> % if the segment doesn't contain any chunks, just delete it ?WARNING("deleting an empty segment file: ~0p", [LastSegFile]), - ok = osiris_file:prim_delete(LastIdxFile), - ok = osiris_file:prim_delete(LastSegFile), + ok = osiris_file:delete(LastIdxFile), + ok = osiris_file:delete(LastSegFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]); LastSegFileSize -> ok = truncate_invalid_idx_records(LastIdxFile, LastSegFileSize) @@ -673,7 +673,7 @@ maybe_fix_corrupted_files(IdxFiles) -> % if the last segment is missing, just delete its index ?WARNING("deleting index of the missing last segment file: ~0p", [LastSegFile]), - ok = osiris_file:prim_delete(LastIdxFile), + ok = osiris_file:delete(LastIdxFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]) end. @@ -896,8 +896,8 @@ chunk_id_index_scan0(Fd, ChunkId) -> delete_segment_from_index(Index) -> File = segment_from_index_file(Index), ?DEBUG("osiris_log: deleting segment ~ts", [File]), - ok = osiris_file:prim_delete(Index), - ok = osiris_file:prim_delete(File), + ok = osiris_file:delete(Index), + ok = osiris_file:delete(File), ok. truncate_to(_Name, _Range, _EpochOffsets, []) ->