diff --git a/src/osiris_file.erl b/src/osiris_file.erl new file mode 100644 index 0000000..ed42699 --- /dev/null +++ b/src/osiris_file.erl @@ -0,0 +1,337 @@ +-module(osiris_file). + +-export([ + advise/4, + close/1, + copy/2, + del_dir/1, + delete/1, + ensure_dir/1, + list_dir/1, + make_dir/1, + open/2, + position/2, + pread/3, + read/2, + read_file_info/1, + sendfile/5, + truncate/1, + write/2 + ]). + + +-type file_handle() :: {module(), term()} | file:io_device(). + +-type posix_file_advise() :: normal | + sequential | + random | + no_reuse | + will_need | + dont_need. + +-type sendfile_option() :: {chunk_size, non_neg_integer()} + | {use_threads, boolean()}. + + +-callback advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when + Handle :: file_handle(), + Offset :: integer(), + Length :: integer(), + Advise :: posix_file_advise(), + Reason :: file:posix() | badarg. + +-callback close(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + +-callback copy(Source, Destination) -> {ok, BytesCopied} | {error, Reason} when + Source :: file_handle() | Filename | {Filename, Modes}, + Destination :: file_handle() | Filename | {Filename, Modes}, + Filename :: file:name_all(), + Modes :: [file:mode()], + BytesCopied :: non_neg_integer(), + Reason :: file:posix() | badarg | terminated. + +-callback del_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name_all(), + Reason :: file:posix() | badarg. + +-callback delete(Filename) -> ok | {error, Reason} when + Filename :: file:name_all(), + Reason :: file:posix() | badarg. + +-callback list_dir(Dir) -> {ok, Filenames} | {error, Reason} when + Dir :: file:name_all(), + Filenames :: [file:filename()], + Reason :: file:posix() + | badarg + | {no_translation, Filename :: unicode:latin1_binary()}. + +-callback open(File, Modes) -> {ok, file_handle()} | {error, Reason} when + File :: Filename | file:iodata(), + Filename :: file:name_all(), + Modes :: [file:mode() | ram | directory], + Reason :: file:posix() | badarg | system_limit. + +-callback position(Handle, Location) -> {ok, NewPosition} | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + NewPosition :: integer(), + Reason :: file:posix() | badarg | terminated. + +-callback pread(Handle, Location, Number) -> + {ok, Data} | eof | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() | badarg | terminated. + +-callback read(Handle, Number) -> {ok, Data} | eof | {error, Reason} when + Handle :: file_handle() | io:device(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() + | badarg + | terminated + | {no_translation, unicode, latin1}. + +-callback read_file_info(File) -> {ok, FileInfo} | {error, Reason} when + File :: file:name_all() | file_handle(), + FileInfo :: file:file_info(), + Reason :: file:posix() | badarg. + +-callback sendfile(Handle, Socket, Offset, Bytes, Opts) -> + {ok, non_neg_integer()} | {error, inet:posix() | + closed | badarg | not_owner} when + Handle :: file_handle(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + Opts :: [sendfile_option()]. + +-callback truncate(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + +-callback write(Handle, Bytes) -> ok | {error, Reason} when + Handle :: file_handle() | io:device(), + Bytes :: iodata(), + Reason :: file:posix() | badarg | terminated. + +-optional_callbacks([write/2]). + +-define(DEFAULT_FILE, osiris_file_default). + +-spec advise(Handle, Offset, Length, Advise) -> ok | {error, Reason} when + Handle :: file_handle(), + Offset :: integer(), + Length :: integer(), + Advise :: posix_file_advise(), + Reason :: file:posix() | badarg. + +advise({Mod, Handle}, Offset, Length, Advise) -> + Mod:advise(Handle, Offset, Length, Advise); +advise(Handle, Offset, Length, Advise) -> + ?DEFAULT_FILE:advise(Handle, Offset, Length, Advise). + + +-spec close(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + +close({Mod, Handle}) -> + Mod:close(Handle); +close(Handle) -> + ?DEFAULT_FILE:close(Handle). + + +-spec copy(Source, Destination) -> {ok, BytesCopied} | {error, Reason} when + Source :: file_handle() | Filename | {Filename, Modes}, + Destination :: file_handle() | Filename | {Filename, Modes}, + Filename :: file:name_all(), + Modes :: [file:mode()], + BytesCopied :: non_neg_integer(), + Reason :: file:posix() | badarg | terminated. +%% TODO +copy(Source, Destination) -> + ?DEFAULT_FILE:copy(Source, Destination). + +-spec del_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name_all(), + Reason :: file:posix() | badarg. +%% TODO +%% Used when a queue is deleted, should perhaps move the entire osiris_log:delete_directory, and +%% let the Mod handle deletion of storage on its side too. +del_dir(Dir) -> + Mod = get_mod(), + Mod:del_dir(Dir). + + +-spec delete(Filename) -> ok | {error, Reason} when + Filename :: file:name_all(), + Reason :: file:posix() | badarg. +%% Do we need the prim_* function calls? +delete(File) -> + Mod = get_mod(), + Mod:delete(File). + +-spec ensure_dir(Name) -> ok | {error, Reason} when + Name :: file:name_all(), + Reason :: file:posix(). +%% Only used for local files +ensure_dir(Dir) -> + ?DEFAULT_FILE:ensure_dir(Dir). + + +-spec list_dir(Dir) -> {ok, Filenames} | {error, Reason} when + Dir :: file:name_all(), + Filenames :: [file:filename()], + Reason :: file:posix() + | badarg + | {no_translation, Filename :: unicode:latin1_binary()}. +%% TODO +list_dir(Dir) -> + Mod = get_mod(), + Mod:list_dir(Dir). + + +-spec make_dir(Dir) -> ok | {error, Reason} when + Dir :: file:name(), + Reason :: file:posix() | badarg. +%% Only used for the local segment file, no need to change it. +make_dir(Dir) -> + ?DEFAULT_FILE:make_dir(Dir). + + +-spec open(File, Modes) -> {ok, file_handle()} | {error, Reason} when + File :: Filename | file:iodata(), + Filename :: file:name_all(), + Modes :: [file:mode() | ram | directory], + Reason :: file:posix() | badarg | system_limit. +open(File, Options) -> + case lists:member(write, Options) of + true -> + %% We do not use tiered storage for writes + ?DEFAULT_FILE:open(File, Options); + false -> + %% Here we will get the correct Mod based on config/manifest file etc. + Mod = get_mod(File), + {ok, Fd} = Mod:open(File, Options), + {ok, {Mod, Fd}} + end. + + +-spec position(Handle, Location) -> {ok, NewPosition} | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + NewPosition :: integer(), + Reason :: file:posix() | badarg | terminated. + +position({Mod, Handle}, Position) -> + Mod:position(Handle, Position); +position(Handle, Position) -> + ?DEFAULT_FILE:position(Handle, Position). + + +-spec pread(Handle, Location, Number) -> + {ok, Data} | eof | {error, Reason} when + Handle :: file_handle(), + Location :: file:location(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() | badarg | terminated. + +pread({Mod, Handle}, Position, Size) -> + Mod:pread(Handle, Position, Size); +pread(Handle, Position, Size) -> + file:pread(Handle, Position, Size). + + +-spec read(Handle, Number) -> {ok, Data} | eof | {error, Reason} when + Handle :: file_handle() | io:device(), + Number :: non_neg_integer(), + Data :: string() | binary(), + Reason :: file:posix() + | badarg + | terminated + | {no_translation, unicode, latin1}. + +read({Mod, Handle}, Size) -> + Mod:read(Handle, Size); +read(Handle, Size) -> + ?DEFAULT_FILE:read(Handle, Size). + + +-spec read_file_info(File) -> {ok, FileInfo} | {error, Reason} when + File :: file:name_all() | file_handle(), + FileInfo :: file:file_info(), + Reason :: file:posix() | badarg. +%% Todo +read_file_info(File) -> + Mod = get_mod(File), + Mod:read_file_info(File). + + +-spec sendfile(Transport, Handle, Socket, Offset, Bytes) -> + ok | {error, inet:posix() | + closed | badarg | not_owner} when + Transport :: tcp | ssl, + Handle :: file_handle(), + Socket :: inet:socket() | socket:socket() | + fun ((iolist()) -> ok | {error, inet:posix() | closed}), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(). + +sendfile(Transport, {Mod, Handle}, Socket, Offset, Length) -> + Mod:sendfile(Transport, Handle, Socket, Offset, Length); +sendfile(Transport, Handle, Socket, Offset, Length) -> + ?DEFAULT_FILE:sendfile(Transport, Handle, Socket, Offset, Length). + + +-spec truncate(Handle) -> ok | {error, Reason} when + Handle :: file_handle(), + Reason :: file:posix() | badarg | terminated. + +truncate({Mod, Handle}) -> + Mod:truncate(Handle); +truncate(Handle) -> + ?DEFAULT_FILE:truncate(Handle). + + +-spec write(Handle, Bytes) -> ok | {error, Reason} when + Handle :: file_handle() | io:device(), + Bytes :: iodata(), + Reason :: file:posix() | badarg | terminated. +write({Mod, Handle}, Data) -> + Mod:write(Handle, Data), + try_write(Mod, Handle, Data); +write(Handle, Data) -> + ?DEFAULT_FILE:write(Handle, Data). + + +-spec try_write(module(), term(), iodata()) -> + ok | {error, term()}. +try_write(Mod, Handle, Data) -> + case erlang:function_exported(Mod, write, 2) of + true -> + Mod:write(Handle, Data); + false -> + ?DEFAULT_FILE:write(Handle, Data) + end. + + +%% TODO code below just hack to make it work for now. +-spec get_mod() -> module(). + +get_mod() -> + ?DEFAULT_FILE. + +get_mod(File) -> + case filelib:is_file(File) of + true -> + ?DEFAULT_FILE; + false -> + application:get_env(osiris, io_segment_module, ?DEFAULT_FILE) + end. diff --git a/src/osiris_file_default.erl b/src/osiris_file_default.erl new file mode 100644 index 0000000..e946d83 --- /dev/null +++ b/src/osiris_file_default.erl @@ -0,0 +1,89 @@ +%% Really need a better name... +-module(osiris_file_default). +-behaviour(osiris_file). + +%% Standard file operations +-export([ + advise/4, + close/1, + copy/2, + del_dir/1, + delete/1, + ensure_dir/1, + list_dir/1, + make_dir/1, + open/2, + position/2, + pread/3, + read/2, + read_file_info/1, + sendfile/5, + truncate/1, + write/2 + ]). + +%% Simple wrappers around file module +advise(Handle, Offset, Length, Advise) -> + file:advise(Handle, Offset, Length, Advise). + +close(Handle) -> + file:close(Handle). + +copy(Source, Destination) -> + file:copy(Source, Destination). + +del_dir(Dir) -> + prim_file:del_dir(Dir). + +delete(File) -> + prim_file:delete(File). + +ensure_dir(Dir) -> + filelib:ensure_dir(Dir). + +list_dir(Dir) -> + prim_file:list_dir(Dir). + +make_dir(Dir) -> + file:make_dir(Dir). + +open(File, Options) -> + file:open(File, Options). + +position(Handle, Position) -> + file:position(Handle, Position). + +pread(Handle, Position, Size) -> + file:pread(Handle, Position, Size). + +read(Handle, Size) -> + file:read(Handle, Size). + +read_file_info(File) -> + prim_file:read_file_info(File). + +sendfile(_Transport, _Handle, _Sock, _Pos, 0) -> + ok; +sendfile(tcp = Transport, Handle, Sock, Pos, ToSend) -> + case file:sendfile(Handle, Sock, Pos, ToSend, []) of + {ok, 0} -> + %% TODO add counter for this? + sendfile(Transport, Handle, Sock, Pos, ToSend); + {ok, BytesSent} -> + sendfile(Transport, Handle, Sock, Pos + BytesSent, ToSend - BytesSent); + {error, _} = Err -> + Err + end; +sendfile(ssl, Handle, Sock, Pos, ToSend) -> + case file:pread(Handle, Pos, ToSend) of + {ok, Data} -> + ssl:send(Sock, Data); + {error, _} = Err -> + Err + end. + +truncate(Handle) -> + file:truncate(Handle). + +write(Handle, Data) -> + file:write(Handle, Data). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5cfb238..ea056af 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -497,8 +497,8 @@ init(#{dir := Dir, ?DEBUG_(Name, "max_segment_size_bytes: ~b, max_segment_size_chunks ~b, retention ~w, filter size ~b", [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), - ok = filelib:ensure_dir(Dir), - case file:make_dir(Dir) of + ok = osiris_file:ensure_dir(Dir), + case osiris_file:make_dir(Dir) of ok -> ok; {error, eexist} -> @@ -578,14 +578,14 @@ init(#{dir := Dir, [element(1, TailInfo), FstChId]), {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), - {ok, Size} = file:position(SegFd, Size), + {ok, Size} = osiris_file:position(SegFd, Size), %% maybe_fix_corrupted_files has truncated the index to the last %% record pointing %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data - ok = file:truncate(SegFd), + ok = osiris_file:truncate(SegFd), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - {ok, IdxEof} = file:position(IdxFd, eof), + {ok, IdxEof} = osiris_file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, #?MODULE{cfg = Cfg, mode = @@ -602,13 +602,13 @@ init(#{dir := Dir, %% the empty log case {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), + {ok, _} = osiris_file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll %% do a truncate just in case. The index would have been truncated %% earlier - ok = file:truncate(SegFd), - {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), + ok = osiris_file:truncate(SegFd), + {ok, _} = osiris_file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), #?MODULE{cfg = Cfg, @@ -631,7 +631,7 @@ maybe_fix_corrupted_files(#{dir := Dir}) -> [begin ?INFO("deleting left over segment '~s' in directory ~s", [F, Dir]), - ok = prim_file:delete(filename:join(Dir, F)) + ok = osiris_file:delete(filename:join(Dir, F)) end|| F <- orphaned_segments(Dir)], ok; maybe_fix_corrupted_files([IdxFile]) -> @@ -641,9 +641,9 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only index doesn't contain a single valid record % make sure it has a valid header - {ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE), - ok = file:write(IdxFd, ?IDX_HEADER), - ok = file:close(IdxFd); + {ok, IdxFd} = osiris_file:open(IdxFile, ?FILE_OPTS_WRITE), + ok = osiris_file:write(IdxFd, ?IDX_HEADER), + ok = osiris_file:close(IdxFd); false -> ok end, @@ -651,9 +651,9 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only segment doesn't contain a single valid chunk % make sure it has a valid header - {ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE), - ok = file:write(SegFd, ?LOG_HEADER), - ok = file:close(SegFd); + {ok, SegFd} = osiris_file:open(SegFile, ?FILE_OPTS_WRITE), + ok = osiris_file:write(SegFd, ?LOG_HEADER), + ok = osiris_file:close(SegFd); false -> ok end; @@ -664,8 +664,8 @@ maybe_fix_corrupted_files(IdxFiles) -> N when N =< ?HEADER_SIZE_B -> % if the segment doesn't contain any chunks, just delete it ?WARNING("deleting an empty segment file: ~0p", [LastSegFile]), - ok = prim_file:delete(LastIdxFile), - ok = prim_file:delete(LastSegFile), + ok = osiris_file:delete(LastIdxFile), + ok = osiris_file:delete(LastSegFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]); LastSegFileSize -> ok = truncate_invalid_idx_records(LastIdxFile, LastSegFileSize) @@ -673,7 +673,7 @@ maybe_fix_corrupted_files(IdxFiles) -> % if the last segment is missing, just delete its index ?WARNING("deleting index of the missing last segment file: ~0p", [LastSegFile]), - ok = prim_file:delete(LastIdxFile), + ok = osiris_file:delete(LastIdxFile), maybe_fix_corrupted_files(IdxFiles -- [LastIdxFile]) end. @@ -701,14 +701,14 @@ truncate_invalid_idx_records(IdxFile, SegSize) -> {ok, IdxFd} = open(IdxFile, [raw, binary, write, read]), {ok, Pos} = position_at_idx_record_boundary(IdxFd, eof), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), - ok = file:truncate(IdxFd), - file:close(IdxFd). + ok = osiris_file:truncate(IdxFd), + osiris_file:close(IdxFd). skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> case Pos >= ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B of true -> - {ok, _} = file:position(IdxFd, Pos - ?INDEX_RECORD_SIZE_B), - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of + {ok, _} = osiris_file:position(IdxFd, Pos - ?INDEX_RECORD_SIZE_B), + case osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of {ok, ?ZERO_IDX_MATCH(_)} -> % trailing zeros found skip_invalid_idx_records(IdxFd, SegFile, SegSize, @@ -729,7 +729,7 @@ skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) -> end; false -> %% TODO should we validate the correctness of index/segment headers? - {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), + {ok, _} = osiris_file:position(IdxFd, ?IDX_HEADER_SIZE), ok end. @@ -881,23 +881,23 @@ chunk_id_index_scan(IdxFile, ChunkId) chunk_id_index_scan0(Fd, ChunkId). chunk_id_index_scan0(Fd, ChunkId) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(ChunkId, Epoch, FilePos)} -> - {ok, IdxPos} = file:position(Fd, cur), - ok = file:close(Fd), + {ok, IdxPos} = osiris_file:position(Fd, cur), + ok = osiris_file:close(Fd), {ChunkId, Epoch, FilePos, IdxPos - ?INDEX_RECORD_SIZE_B}; {ok, _} -> chunk_id_index_scan0(Fd, ChunkId); eof -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), eof end. delete_segment_from_index(Index) -> File = segment_from_index_file(Index), ?DEBUG("osiris_log: deleting segment ~ts", [File]), - ok = prim_file:delete(Index), - ok = prim_file:delete(File), + ok = osiris_file:delete(Index), + ok = osiris_file:delete(File), ok. truncate_to(_Name, _Range, _EpochOffsets, []) -> @@ -955,18 +955,18 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset - {ok, Fd} = file:open(File, [read, write, binary, raw]), - _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), + {ok, Fd} = osiris_file:open(File, [read, write, binary, raw]), + _ = osiris_file:advise(Fd, 0, 0, random), + {ok, IdxFd} = osiris_file:open(IdxFile, [read, write, binary, raw]), NextPos = next_chunk_pos(Fd, Pos), - {ok, _} = file:position(Fd, NextPos), - ok = file:truncate(Fd), + {ok, _} = osiris_file:position(Fd, NextPos), + ok = osiris_file:truncate(Fd), - {ok, _} = file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), - ok = file:truncate(IdxFd), - ok = file:close(Fd), - ok = file:close(IdxFd), + {ok, _} = osiris_file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), + ok = osiris_file:truncate(IdxFd), + ok = osiris_file:close(Fd), + ok = osiris_file:close(IdxFd), %% delete all segments with a first offset larger then ChId %% and return the remainder lists:filter( @@ -1046,7 +1046,7 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case file:open(File, [raw, binary, read]) of + case osiris_file:open(File, [raw, binary, read]) of {ok, Fd} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), @@ -1156,7 +1156,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); {first_in, IdxFile} -> - {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, Fd} = osiris_file:open(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); @@ -1310,7 +1310,7 @@ last_user_chunk_id0(Name, [IdxFile | Rest]) -> {ok, IdxFd} = open(IdxFile, [read, raw, binary]), {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), case Last of {ok, Id, Pos} -> {Id, Pos, IdxFile}; @@ -1322,7 +1322,7 @@ last_user_chunk_id0(Name, [IdxFile | Rest]) -> %% Searches the index file backwards for the chunk id of the last user chunk. last_user_chunk_id_in_index(NextPos, IdxFd) -> - case file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, + osiris_file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, Len + ?ITER_READ_AHEAD_B), {Record0, Rem1} end, @@ -1516,7 +1516,7 @@ iterator_next(#iterator{fd = Fd, %% not enough in Rem0 to read the entire record %% so we need to read it from disk {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, + osiris_file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, Len + ?ITER_READ_AHEAD_B), {Record0, Rem1} end, @@ -1528,7 +1528,7 @@ iterator_next(#iterator{fd = Fd, {{NextOffs, Record}, I}; iterator_next(#iterator{fd = Fd, next_record_pos = Pos} = I) -> - {ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), + {ok, Data} = osiris_file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), iterator_next(I#iterator{data = Data}). -spec read_chunk(state()) -> @@ -1552,7 +1552,7 @@ read_chunk(#?MODULE{cfg = #cfg{}} = State0) -> trailer_size := TrailerSize}, #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State} -> ToRead = ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - {ok, ChData} = file:pread(Fd, Pos, ToRead), + {ok, ChData} = osiris_file:pread(Fd, Pos, ToRead), <<_:?HEADER_SIZE_B/binary, _:FilterSize/binary, RecordData:DataSize/binary, @@ -1608,7 +1608,7 @@ is_valid_chunk_on_disk(SegFile, Pos) -> %% then checks the CRC case open(SegFile, [read, raw, binary]) of {ok, SegFd} -> - IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of + IsValid = case osiris_file:pread(SegFd, Pos, ?HEADER_SIZE_B) of {ok, < FilterSize:8/unsigned, _Reserved:24>>} -> DataPos = Pos + FilterSize + ?HEADER_SIZE_B, - case file:pread(SegFd, DataPos, DataSize) of + case osiris_file:pread(SegFd, DataPos, DataSize) of {ok, Data} -> case erlang:crc32(Data) of Crc -> @@ -1638,7 +1638,7 @@ is_valid_chunk_on_disk(SegFile, Pos) -> _ -> false end, - _ = file:close(SegFd), + _ = osiris_file:close(SegFd), IsValid; _Err -> false @@ -1696,7 +1696,7 @@ send_file(Sock, _ = Callback(Header, ToSend + byte_size(HeaderData)), case send(Transport, Sock, HeaderData) of ok -> - case sendfile(Transport, Fd, Sock, + case osiris_file:sendfile(Transport, Fd, Sock, Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of ok -> State = State1#?MODULE{mode = Read}, @@ -1761,13 +1761,13 @@ delete_directory(Name) when ?IS_STRING(Name) -> delete_dir(Dir). delete_dir(Dir) -> - case file:list_dir(Dir) of + case osiris_file:list_dir(Dir) of {ok, Files} -> [ok = - file:delete( + osiris_file:delete( filename:join(Dir, F)) || F <- Files], - ok = file:del_dir(Dir); + ok = osiris_file:del_dir(Dir); {error, enoent} -> ok end. @@ -1787,7 +1787,7 @@ next_chunk_pos(Fd, Pos) -> Size:32/unsigned, TSize:32/unsigned, FSize:8/unsigned, - _Reserved:24>>} = file:pread(Fd, Pos, ?HEADER_SIZE_B), + _Reserved:24>>} = osiris_file:pread(Fd, Pos, ?HEADER_SIZE_B), Pos + ?HEADER_SIZE_B + FSize + Size + TSize. @@ -1898,12 +1898,12 @@ last_idx_record(IdxFd) -> nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) -> {ok, IdxFd} = open(IdxFile, [read, raw, binary]), IdxRecord = nth_last_idx_record(IdxFd, N), - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), IdxRecord; nth_last_idx_record(IdxFd, N) -> case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B * N}) of {ok, _} -> - file:read(IdxFd, ?INDEX_RECORD_SIZE_B); + osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B); Err -> Err end. @@ -1915,17 +1915,17 @@ last_valid_idx_record(IdxFile) -> SegFile = segment_from_index_file(IdxFile), SegSize = file_size(SegFile), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), - case file:position(IdxFd, {cur, -?INDEX_RECORD_SIZE_B}) of + case osiris_file:position(IdxFd, {cur, -?INDEX_RECORD_SIZE_B}) of {ok, _} -> - IdxRecord = file:read(IdxFd, ?INDEX_RECORD_SIZE_B), - _ = file:close(IdxFd), + IdxRecord = osiris_file:read(IdxFd, ?INDEX_RECORD_SIZE_B), + _ = osiris_file:close(IdxFd), IdxRecord; _ -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), undefined end; Err -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), Err end. @@ -1933,7 +1933,7 @@ first_idx_record(IdxFd) -> idx_read_at(IdxFd, ?IDX_HEADER_SIZE). idx_read_at(Fd, Pos) when is_integer(Pos) -> - case file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of {ok, ?ZERO_IDX_MATCH(_)} -> {error, empty_idx_record}; Ret -> @@ -1944,11 +1944,11 @@ idx_read_at(Fd, Pos) when is_integer(Pos) -> %% in the middle of a record being written concurrently. If that happens, we need to re-position at the nearest %% record boundry. See https://github.com/rabbitmq/osiris/issues/73 position_at_idx_record_boundary(IdxFd, At) -> - case file:position(IdxFd, At) of + case osiris_file:position(IdxFd, At) of {ok, Pos} -> case (Pos - ?IDX_HEADER_SIZE) rem ?INDEX_RECORD_SIZE_B of 0 -> {ok, Pos}; - N -> file:position(IdxFd, {cur, -N}) + N -> osiris_file:position(IdxFd, {cur, -N}) end; Error -> Error end. @@ -1956,10 +1956,10 @@ position_at_idx_record_boundary(IdxFd, At) -> build_segment_info(SegFile, LastChunkPos, IdxFile) -> {ok, Fd} = open(SegFile, [read, binary, raw]), %% we don't want to read blocks into page cache we are unlikely to need - _ = file:advise(Fd, 0, 0, random), - case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of + _ = osiris_file:advise(Fd, 0, 0, random), + case osiris_file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> - _ = file:close(Fd), + _ = osiris_file:close(Fd), eof; {ok, < FirstFSize:8/unsigned, FirstTSize:24/unsigned, _/binary>>} -> - case file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B) of + case osiris_file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B) of {ok, < Size = LastChunkPos + ?HEADER_SIZE_B + LastChunkSize, %% TODO: this file:position/2 all has no actual function and %% is only used to emit a debug log. Remove? - {ok, Eof} = file:position(Fd, eof), + {ok, Eof} = osiris_file:position(Fd, eof), ?DEBUG_IF("~s: segment ~ts has trailing data ~w ~w", [?MODULE, filename:basename(SegFile), Size, Eof], Size =/= Eof), - _ = file:close(Fd), + _ = osiris_file:close(Fd), FstChInfo = #chunk_info{epoch = FirstEpoch, timestamp = FirstTs, id = FirstChId, @@ -2020,7 +2020,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> last = LastChInfo}}; _ -> % last chunk is corrupted - try the previous one - _ = file:close(Fd), + _ = osiris_file:close(Fd), {ok, ?IDX_MATCH(_ChId, _E, PrevChPos)} = nth_last_idx_record(IdxFile, 2), case PrevChPos == LastChunkPos of @@ -2183,7 +2183,7 @@ eval_max_bytes([IdxFile | Rest], Limit, Acc) -> end. file_size(Path) -> - case prim_file:read_file_info(Path) of + case osiris_file:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -2191,7 +2191,7 @@ file_size(Path) -> end. file_size_or_zero(Path) -> - case prim_file:read_file_info(Path) of + case osiris_file:read_file_info(Path) of {ok, #file_info{size = Size}} -> Size; {error, enoent} -> @@ -2217,10 +2217,10 @@ last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) -> {ok, Fd} = open(IdxFile, [read, raw, binary]), case first_idx_record(Fd) of {ok, ?IDX_MATCH(FstChId, FstEpoch, _)} -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), last_epoch_chunk_ids0(Files, {FstEpoch, FstChId, []}); _ -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), [] end; last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> @@ -2232,7 +2232,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE, fun leo_search_fun/3, Acc0), - ok = file:close(Fd), + ok = osiris_file:close(Fd), last_epoch_chunk_ids0(Rem, Acc); {ok, ?IDX_MATCH(LstChId, LstEpoch, _)} -> %% no scan needed, just pass last epoch chunk id pair @@ -2287,7 +2287,7 @@ idx_skip_search(Fd, Pos, Fun, Acc0) -> {return, Acc} -> Acc; {scan, Acc1} -> - {ok, Data} = file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B), + {ok, Data} = osiris_file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B), case idx_lin_scan(Data, Fun, Acc1) of {continue, Acc} -> idx_skip_search(Fd, PeekPos + ?INDEX_RECORD_SIZE_B, @@ -2298,7 +2298,7 @@ idx_skip_search(Fd, Pos, Fun, Acc0) -> end; _ -> %% eof or invalid index record - case file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B) of + case osiris_file:pread(Fd, Pos, SkipSize + ?INDEX_RECORD_SIZE_B) of {ok, Data} -> case idx_lin_scan(Data, Fun, Acc0) of {continue, Acc} -> @@ -2412,10 +2412,10 @@ write_chunk(Chunk, false -> NextOffset = Next + NumRecords, Size = iolist_size(Chunk), - {ok, Cur} = file:position(Fd, cur), - ok = file:write(Fd, Chunk), + {ok, Cur} = osiris_file:position(Fd, cur), + ok = osiris_file:write(Fd, Chunk), - ok = file:write(IdxFd, + ok = osiris_file:write(IdxFd, <= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks. -sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> - ok; -sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> - case file:sendfile(Fd, Sock, Pos, ToSend, []) of - {ok, 0} -> - %% TODO add counter for this? - sendfile(Transport, Fd, Sock, Pos, ToSend); - {ok, BytesSent} -> - sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent); - {error, _} = Err -> - Err - end; -sendfile(ssl, Fd, Sock, Pos, ToSend) -> - case file:pread(Fd, Pos, ToSend) of - {ok, Data} -> - ssl:send(Sock, Data); - {error, _} = Err -> - Err - end. - send(tcp, Sock, Data) -> gen_tcp:send(Sock, Data); send(ssl, Sock, Data) -> ssl:send(Sock, Data). last_timestamp_in_index_file(IdxFile) -> - case file:open(IdxFile, [raw, binary, read]) of + case osiris_file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, @@ -2481,10 +2461,10 @@ last_timestamp_in_index_file(IdxFile) -> _E:64/unsigned, _ChunkPos:32/unsigned, _ChType:8/unsigned>>} -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), {ok, LastTimestamp}; Err -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), Err end; Err -> @@ -2492,7 +2472,7 @@ last_timestamp_in_index_file(IdxFile) -> end. first_timestamp_from_index_files([IdxFile | _]) -> - case file:open(IdxFile, [raw, binary, read]) of + case osiris_file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, @@ -2500,10 +2480,10 @@ first_timestamp_from_index_files([IdxFile | _]) -> _FstE:64/unsigned, _FstChunkPos:32/unsigned, _FstChType:8/unsigned>>} -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), FstTimestamp; _ -> - _ = file:close(IdxFd), + _ = osiris_file:close(IdxFd), 0 end; _Err -> @@ -2527,22 +2507,22 @@ chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) -> {ok, LstFd} = open(LstIdxFile, [read, raw, binary]), case position_at_idx_record_boundary(LstFd, eof) of {ok, Pos} -> - case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, + case osiris_file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(LstChId, _, _)} -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), - case file:pread(FstFd, ?IDX_HEADER_SIZE, + case osiris_file:pread(FstFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(FstChId, _, _)} -> - ok = file:close(FstFd), + ok = osiris_file:close(FstFd), {ok, {FstChId, LstChId}}; Err -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), Err end; Err -> - ok = file:close(LstFd), + ok = osiris_file:close(LstFd), Err end end. @@ -2649,16 +2629,16 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), {ok, IdxFd} = - file:open( + osiris_file:open( filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), - ok = file:write(IdxFd, ?IDX_HEADER), + ok = osiris_file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = - file:open( + osiris_file:open( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), - ok = file:write(Fd, ?LOG_HEADER), + ok = osiris_file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file - {ok, _} = file:position(Fd, eof), - {ok, _} = file:position(IdxFd, eof), + {ok, _} = osiris_file:position(Fd, eof), + {ok, _} = osiris_file:position(IdxFd, eof), counters:add(Cnt, ?C_SEGMENTS, 1), State0#?MODULE{current_file = Filename, @@ -2672,7 +2652,7 @@ open_index_read(File) -> %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied - {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + {ok, ?IDX_HEADER_SIZE} = osiris_file:position(Fd, ?IDX_HEADER_SIZE), Fd. offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> @@ -2688,12 +2668,12 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> false -> {ok, IdxFd} = open(IndexFile, [read, raw, binary]), - _ = file:advise(IdxFd, 0, 0, random), + _ = osiris_file:advise(IdxFd, 0, 0, random), {Offset, SearchResult} = idx_skip_search(IdxFd, ?IDX_HEADER_SIZE, fun offset_search_fun/3, {Offset, not_found}), - ok = file:close(IdxFd), + ok = osiris_file:close(IdxFd), SearchResult end end, @@ -2720,7 +2700,7 @@ throw_missing(Any) -> Any. open(File, Options) -> - throw_missing(file:open(File, Options)). + throw_missing(osiris_file:open(File, Options)). chunk_location_for_timestamp(Idx, Ts) -> %% TODO: optimise using skip search approach @@ -2730,7 +2710,7 @@ chunk_location_for_timestamp(Idx, Ts) -> {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < _ChType:8/unsigned>>} -> case Ts =< Timestamp of true -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), {ChunkId, Timestamp, Epoch, FilePos}; false -> timestamp_idx_scan(Fd, Ts) end; eof -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), eof end. @@ -2807,18 +2787,18 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), - _ = file:advise(Fd, 0, 0, random), + _ = osiris_file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing %% the chance of encountering a snapshot and thus ensure we don't miss any %% tracking entries Trk0 = osiris_tracking:init(undefined, TrkConfig), Trk = recover_tracking(Fd, Trk0, ?LOG_HEADER_SIZE), - _ = file:close(Fd), + _ = osiris_file:close(Fd), Trk. recover_tracking(Fd, Trk0, Pos0) -> - case file:pread(Fd, Pos0, ?HEADER_SIZE_B) of + case osiris_file:pread(Fd, Pos0, ?HEADER_SIZE_B) of {ok, < %% tracking is written a single record so we don't %% have to parse {ok, <<0:1, S:31, Data:S/binary>>} = - file:pread(Fd, Pos + FSize, Size), + osiris_file:pread(Fd, Pos + FSize, Size), Trk = osiris_tracking:append_trailer(ChunkId, Data, Trk0), %% A tracking delta chunk will not have any writer data %% so no need to parse writers here recover_tracking(Fd, Trk, NextPos); ?CHNK_TRK_SNAPSHOT -> {ok, <<0:1, S:31, Data:S/binary>>} = - file:pread(Fd, Pos + FSize, Size), + osiris_file:pread(Fd, Pos + FSize, Size), Trk = osiris_tracking:init(Data, Trk0), recover_tracking(Fd, Trk, NextPos); ?CHNK_USER when TSize > 0 -> - {ok, TData} = file:pread(Fd, Pos + FSize + Size, TSize), + {ok, TData} = osiris_file:pread(Fd, Pos + FSize + Size, TSize), Trk = osiris_tracking:append_trailer(ChunkId, TData, Trk0), recover_tracking(Fd, Trk, NextPos); ?CHNK_USER -> @@ -2876,7 +2856,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% optimistically read 64 bytes (small binary) as it may save us %% a syscall reading the filter if the filter is of the default %% 16 byte size - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of + case osiris_file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of {ok, < %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, + case osiris_file:pread(Fd, Pos + ?HEADER_SIZE_B, FilterSize) of {ok, F} -> F; @@ -2956,10 +2936,10 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case file:open(filename:join(Dir, SegFile), + case osiris_file:open(filename:join(Dir, SegFile), [raw, binary, read]) of {ok, Fd2} -> - ok = file:close(Fd), + ok = osiris_file:close(Fd), Read = Read0#read{next_offset = NextChId, position = ?LOG_HEADER_SIZE}, read_header0( @@ -3029,9 +3009,9 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) -> binary_to_integer(filename:basename(IdxFile, <<".index">>)). first_last_timestamps(IdxFile) -> - case file:open(IdxFile, [raw, read, binary]) of + case osiris_file:open(IdxFile, [raw, read, binary]) of {ok, Fd} -> - _ = file:advise(Fd, 0, 0, random), + _ = osiris_file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of {ok, <<_:64/unsigned, FirstTs:64/signed, @@ -3044,7 +3024,7 @@ first_last_timestamps(IdxFile) -> _:64/unsigned, _:32/unsigned, _:8/unsigned>>} = last_idx_record(Fd), - ok = file:close(Fd), + ok = osiris_file:close(Fd), {FirstTs, LastTs}; {error, einval} -> %% empty index @@ -3114,22 +3094,22 @@ timestamp_idx_file_search0(Ts, [IdxFile | Older], Prev) -> close_fd(undefined) -> ok; close_fd(Fd) -> - _ = file:close(Fd), + _ = osiris_file:close(Fd), ok. dump_init(File) -> - {ok, Fd} = file:open(File, [raw, binary, read]), - {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), + {ok, Fd} = osiris_file:open(File, [raw, binary, read]), + {ok, <<"OSIL", _V:4/binary>> } = osiris_file:read(Fd, ?LOG_HEADER_SIZE), Fd. dump_init_idx(File) -> - {ok, Fd} = file:open(File, [raw, binary, read]), - {ok, <<"OSII", _V:4/binary>> } = file:read(Fd, ?IDX_HEADER_SIZE), + {ok, Fd} = osiris_file:open(File, [raw, binary, read]), + {ok, <<"OSII", _V:4/binary>> } = osiris_file:read(Fd, ?IDX_HEADER_SIZE), Fd. dump_index(Fd) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + case osiris_file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < dump_chunk(Fd) -> - {ok, Pos} = file:position(Fd, cur), - case file:read(Fd, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of + {ok, Pos} = osiris_file:position(Fd, cur), + case osiris_file:read(Fd, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of {ok, < F; _ when FilterSize > 0 -> %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, + case osiris_file:pread(Fd, Pos + ?HEADER_SIZE_B, FilterSize) of {ok, F} -> F; @@ -3183,9 +3163,9 @@ dump_chunk(Fd) -> _ -> <<>> end, - {ok, Data} = file:pread(Fd, Pos + FilterSize + ?HEADER_SIZE_B, DataSize), + {ok, Data} = osiris_file:pread(Fd, Pos + FilterSize + ?HEADER_SIZE_B, DataSize), CrcMatch = erlang:crc32(Data) =:= Crc, - _ = file:position(Fd, NextPos), + _ = osiris_file:position(Fd, NextPos), #{chunk_id => NextChId0, epoch => Epoch, type => ChType, @@ -3221,7 +3201,7 @@ iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> undefined; iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) when Credit == all orelse NumEntries == 1 -> - {ok, Data} = file:pread(Fd, Pos, DataSize), + {ok, Data} = osiris_file:pread(Fd, Pos, DataSize), validate_crc(ChunkId, Crc, Data), Data; iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> @@ -3230,11 +3210,11 @@ iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> %% We can only practically validate CRC if we read the whole data Credit = min(Credit0, NumEntries), Size = DataSize div NumEntries * Credit, - {ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), + {ok, Data} = osiris_file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), Data. list_dir(Dir) -> - case prim_file:list_dir(Dir) of + case osiris_file:list_dir(Dir) of {error, enoent} -> []; {ok, Files} ->