diff --git a/src/osiris.hrl b/src/osiris.hrl index ffb39c9..91f723f 100644 --- a/src/osiris.hrl +++ b/src/osiris.hrl @@ -47,6 +47,18 @@ -define(FILE_OPTS_WRITE, [raw, binary, write, read]). -define(INDEX_RECORD_SIZE_B, 29). +%% record chunk_info does not map exactly to an index record (field 'num' differs) +-record(chunk_info, + {id :: osiris:offset(), + timestamp :: non_neg_integer(), + epoch :: osiris:epoch(), + num :: non_neg_integer(), + type :: osiris_log:chunk_type(), + %% size of data + filter + trailer + size :: non_neg_integer(), + %% position in segment file + pos :: integer() + }). %% chunk types -define(CHNK_USER, 0). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 7e221b8..35c3be3 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -7,7 +7,7 @@ -module(osiris_log). --include("osiris.hrl"). +-include("src/osiris.hrl"). -include_lib("kernel/include/file.hrl"). -export([init/1, @@ -54,6 +54,15 @@ delete_directory/1, make_counter/1]). +%% osiris_log_manifest callbacks (default implementations) +-export([writer_manifest/1, + acceptor_manifest/3, + find_data_reader_position/2, + find_offset_reader_position/2, + handle_event/2, + close_manifest/1, + delete/1]). + -export([dump_init/1, dump_init_idx/1, dump_chunk/1, @@ -67,6 +76,8 @@ orphaned_segments/1 ]). +-behaviour(osiris_log_manifest). + % maximum size of a segment in bytes -define(DEFAULT_MAX_SEGMENT_SIZE_B, 500 * 1000 * 1000). % maximum number of chunks per segment @@ -420,6 +431,7 @@ }). -record(read, {type :: data | offset, + reader :: {module(), osiris_log_reader:state()}, next_offset = 0 :: offset(), transport :: transport(), chunk_selector :: all | user_data, @@ -429,42 +441,42 @@ {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, current_epoch :: non_neg_integer(), - tail_info = {0, empty} :: osiris:tail_info() + tail_info = {0, empty} :: osiris:tail_info(), + manifest :: {module(), osiris_log_manifest:state()} }). -record(?MODULE, {cfg :: #cfg{}, mode :: #read{} | #write{}, current_file :: undefined | file:filename_all(), - index_fd :: undefined | file:io_device(), fd :: undefined | file:io_device() }). -%% record chunk_info does not map exactly to an index record (field 'num' differs) --record(chunk_info, - {id :: offset(), - timestamp :: non_neg_integer(), - epoch :: epoch(), - num :: non_neg_integer(), - type :: chunk_type(), - %% size of data + filter + trailer - size :: non_neg_integer(), - %% position in segment file - pos :: integer() - }). -record(seg_info, {file :: file:filename_all(), size = 0 :: non_neg_integer(), + chunks = 0 :: non_neg_integer(), index :: file:filename_all(), first :: undefined | #chunk_info{}, last :: undefined | #chunk_info{}}). +%% Default manifest implementation which lists the configured dir for index +%% files. +-record(manifest, + {name :: osiris:name(), + directory :: file:filename_all(), + index_fd :: file:io_device() | undefined, + retention :: [retention_spec()], + retention_eval_fun :: fun()}). -opaque state() :: #?MODULE{}. +-type manifest() :: #manifest{}. -export_type([state/0, chunk_iterator/0, range/0, config/0, counter_spec/0, - transport/0]). + transport/0, + chunk_type/0, + manifest/0]). -spec directory(osiris:config() | list()) -> file:filename_all(). directory(#{name := Name, dir := Dir}) -> @@ -483,20 +495,10 @@ init(Config) -> -spec init(config(), writer | acceptor) -> state(). init(#{dir := Dir, name := Name, - epoch := Epoch} = Config, + epoch := Epoch} = Config0, WriterType) -> - %% scan directory for segments if in write mode - MaxSizeBytes = maps:get(max_segment_size_bytes, Config, - ?DEFAULT_MAX_SEGMENT_SIZE_B), - MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, - ?DEFAULT_MAX_SEGMENT_SIZE_C), - Retention = maps:get(retention, Config, []), - FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE), ?INFO("Stream: ~ts will use ~ts for osiris log data directory", [Name, Dir]), - ?DEBUG_(Name, "max_segment_size_bytes: ~b, - max_segment_size_chunks ~b, retention ~w, filter size ~b", - [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), ok = filelib:ensure_dir(Dir), case file:make_dir(Dir) of ok -> @@ -507,29 +509,42 @@ init(#{dir := Dir, throw(Err) end, - Cnt = make_counter(Config), + ManifestMod = application:get_env(osiris, log_manifest, ?MODULE), + {Info, Config, Manifest0} = case Config0 of + #{acceptor_manifest := {I, M}} -> + {I, Config0, M}; + _ -> + Config1 = with_defaults(Config0), + ManifestMod:writer_manifest(Config1) + end, + + MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, + ?DEFAULT_MAX_SEGMENT_SIZE_C), + #{max_segment_size_bytes := MaxSizeBytes, + retention := Retention, + filter_size := FilterSize, + shared := Shared, + counter := Cnt, + counter_id := CounterId, + tracking_config := TrackingConfig} = Config, + ?DEBUG_(Name, "max_segment_size_bytes: ~b, + max_segment_size_chunks ~b, retention ~w, filter size ~b", + [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), %% initialise offset counter to -1 as 0 is the first offset in the log and %% it hasn't necessarily been written yet, for an empty log the first offset %% is initialised to 0 however and will be updated after each retention run. counters:put(Cnt, ?C_OFFSET, -1), counters:put(Cnt, ?C_SEGMENTS, 0), - Shared = case Config of - #{shared := S} -> - S; - _ -> - osiris_log_shared:new() - end, Cfg = #cfg{directory = Dir, name = Name, max_segment_size_bytes = MaxSizeBytes, max_segment_size_chunks = MaxSizeChunks, - tracking_config = maps:get(tracking_config, Config, #{}), + tracking_config = TrackingConfig, retention = Retention, counter = Cnt, - counter_id = counter_id(Config), + counter_id = CounterId, shared = Shared, filter_size = FilterSize}, - ok = maybe_fix_corrupted_files(Config), DefaultNextOffset = case Config of #{initial_offset := IO} when WriterType == acceptor -> @@ -537,8 +552,9 @@ init(#{dir := Dir, _ -> 0 end, - case first_and_last_seginfos(Config) of - none -> + + case Info of + #{num_segments := 0} -> osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), open_new_segment(#?MODULE{cfg = Cfg, @@ -546,17 +562,19 @@ init(#{dir := Dir, #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = {ManifestMod, + Manifest0}, current_epoch = Epoch}}); - {NumSegments, - #seg_info{first = #chunk_info{id = FstChId, - timestamp = FstTs}}, - #seg_info{file = Filename, - index = IdxFilename, - size = Size, - last = #chunk_info{epoch = LastEpoch, - timestamp = LastTs, - id = LastChId, - num = LastNum}}} -> + #{num_segments := NumSegments, + first_offset := FstChId, + first_timestamp := FstTs, + active_segment := #{file := Filename, + size := Size, + chunks := NumChunks, + last := #chunk_info{epoch = LastEpoch, + timestamp = LastTs, + id = LastChId, + num = LastNum}}} -> %% assert epoch is same or larger %% than last known epoch case LastEpoch > Epoch of @@ -584,43 +602,98 @@ init(#{dir := Dir, %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data ok = file:truncate(SegFd), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - {ok, IdxEof} = file:position(IdxFd, eof), - NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, + Event = {segment_opened, undefined, Filename}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = TailInfo, segment_size = {Size, NumChunks}, + manifest = {ManifestMod, Manifest}, current_epoch = Epoch}, current_file = filename:basename(Filename), - fd = SegFd, - index_fd = IdxFd}; - {1, #seg_info{file = Filename, - index = IdxFilename, - last = undefined}, _} -> + fd = SegFd}; + #{num_segments := 1, + active_segment := #{file := Filename}} -> %% the empty log case {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll %% do a truncate just in case. The index would have been truncated %% earlier ok = file:truncate(SegFd), - {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), + Event = {segment_opened, undefined, Filename}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = {ManifestMod, Manifest}, current_epoch = Epoch}, current_file = filename:basename(Filename), - fd = SegFd, - index_fd = IdxFd} + fd = SegFd} end. +writer_manifest(#{dir := Dir, + name := Name, + retention := RetentionSpec, + counter := Cnt, + shared := Shared} = Config) -> + ok = maybe_fix_corrupted_files(Config), + Info = case first_and_last_seginfos(Config) of + none -> + #{num_segments => 0, + segment_offsets => []}; + {SegmentOffsets, + #seg_info{first = #chunk_info{id = FirstOffset, + timestamp = FirstTimestamp}}, + #seg_info{file = LastFile, + size = LastSize, + chunks = LastNumChunks, + first = LastFirstChunk, + last = #chunk_info{} = LastLastChunk}} -> + SegInfo = #{file => LastFile, + size => LastSize, + chunks => LastNumChunks, + first => LastFirstChunk, + last => LastLastChunk}, + #{num_segments => length(SegmentOffsets), + first_offset => FirstOffset, + first_timestamp => FirstTimestamp, + active_segment => SegInfo, + segment_offsets => SegmentOffsets}; + {[_] = SegmentOffsets, + #seg_info{last = undefined, + file = Filename, + size = Size}, _} -> + SegInfo = #{file => Filename, + size => Size}, + #{num_segments => 1, + active_segment => SegInfo, + segment_offsets => SegmentOffsets} + end, + %% updates first offset and first timestamp + %% after retention has been evaluated + EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft}) + when is_integer(FstOff), + is_integer(FstTs) -> + osiris_log_shared:set_first_chunk_id(Shared, FstOff), + counters:put(Cnt, ?C_FIRST_OFFSET, FstOff), + counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs), + counters:put(Cnt, ?C_SEGMENTS, NumSegLeft); + (_) -> + ok + end, + Manifest = #manifest{name = Name, + directory = Dir, + retention = RetentionSpec, + retention_eval_fun = EvalFun}, + %% The segment_opened event will create the index fd. + {Info, Config, Manifest}. + maybe_fix_corrupted_files([]) -> ok; maybe_fix_corrupted_files(#{dir := Dir}) -> @@ -847,11 +920,25 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0 {State0, Trk0} end. -% -spec -spec init_acceptor(range(), list(), config()) -> state(). -init_acceptor(Range, EpochOffsets0, - #{name := Name, dir := Dir} = Conf) -> +init_acceptor(Range, EpochOffsets0, Conf0) -> + EpochOffsets = + lists:reverse( + lists:sort(EpochOffsets0)), + ManifestMod = application:get_env(osiris, log_manifest, ?MODULE), + Conf1 = with_defaults(Conf0), + {Info, Conf, Manifest} = ManifestMod:acceptor_manifest(Range, EpochOffsets, + Conf1), + InitOffset = case Range of + empty -> 0; + {O, _} -> O + end, + init(Conf#{initial_offset => InitOffset, + acceptor_manifest => {Info, Manifest}}, acceptor). + +acceptor_manifest(Range, EpochOffsets, + #{name := Name, dir := Dir} = Conf) -> %% truncate to first common last epoch offset %% * if the last local chunk offset has the same epoch but is lower %% than the last chunk offset then just attach at next offset. @@ -859,21 +946,13 @@ init_acceptor(Range, EpochOffsets0, %% * if it has a higher epoch than last provided - truncate to last offset %% of previous %% sort them so that the highest epochs go first - EpochOffsets = - lists:reverse( - lists:sort(EpochOffsets0)), %% then truncate to IdxFiles = sorted_index_files(Dir), ?DEBUG_(Name, "from epoch offsets: ~w range ~w", [EpochOffsets, Range]), RemIdxFiles = truncate_to(Name, Range, EpochOffsets, IdxFiles), %% after truncation we can do normal init - InitOffset = case Range of - empty -> 0; - {O, _} -> O - end, - init(Conf#{initial_offset => InitOffset, - index_files => RemIdxFiles}, acceptor). + writer_manifest(Conf#{index_files => RemIdxFiles}). chunk_id_index_scan(IdxFile, ChunkId) when ?IS_STRING(IdxFile) -> @@ -989,8 +1068,18 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> {error, {offset_out_of_range, empty | {offset(), offset()}}} | {error, {invalid_last_offset_epoch, epoch(), offset()}} | {error, file:posix()}. -init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir, - name := Name} = Config) -> +init_data_reader(TailInfo, Config) -> + ManifestMod = application:get_env(osiris, log_manifest, ?MODULE), + case ManifestMod:find_data_reader_position(TailInfo, Config) of + {ok, ChunkId, Pos, Segment} -> + init_data_reader_at(ChunkId, Pos, Segment, Config); + {error, _} = Err -> + Err + end. + +find_data_reader_position({StartChunkId, PrevEOT}, + #{dir := Dir, + name := Name} = Config) -> IdxFiles = sorted_index_files(Dir), Range = offset_range_from_idx_files(IdxFiles), ?DEBUG_(Name, " at ~b prev ~w local range: ~w", @@ -1006,19 +1095,19 @@ init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir, _ when PrevEOT == empty -> %% this assumes the offset is in range %% first we need to validate PrevEO - init_data_reader_from(StartChunkId, - find_segment_for_offset(StartChunkId, - IdxFiles), - Config); + find_data_reader_pos(StartChunkId, + find_segment_for_offset(StartChunkId, + IdxFiles), + Config); _ -> {PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT, case check_chunk_has_expected_epoch(Name, PrevChunkId, PrevEpoch, IdxFiles) of ok -> - init_data_reader_from(StartChunkId, - find_segment_for_offset(StartChunkId, - IdxFiles), - Config); + find_data_reader_pos(StartChunkId, + find_segment_for_offset(StartChunkId, + IdxFiles), + Config); {error, _} = Err -> Err end @@ -1046,8 +1135,9 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case file:open(File, [raw, binary, read]) of - {ok, Fd} -> + ReaderMod = application:get_env(osiris, log_reader, osiris_log_reader), + case ReaderMod:open(File) of + {ok, Reader} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), CountersFun(1), @@ -1062,27 +1152,26 @@ init_data_reader_at(ChunkId, FilePos, File, }, mode = #read{type = data, + reader = {ReaderMod, Reader}, next_offset = ChunkId, chunk_selector = all, position = FilePos, - transport = maps:get(transport, Config, tcp)}, - fd = Fd}}; + transport = maps:get(transport, Config, tcp)}}}; Err -> Err end. -init_data_reader_from(ChunkId, - {end_of_log, #seg_info{file = File, - last = LastChunk}}, - Config) -> +find_data_reader_pos(ChunkId, + {end_of_log, #seg_info{file = File, last = LastChunk}}, + _Config) -> {ChunkId, AttachPos} = next_location(LastChunk), - init_data_reader_at(ChunkId, AttachPos, File, Config); -init_data_reader_from(ChunkId, - {found, #seg_info{file = File} = SegInfo}, - Config) -> + {ok, ChunkId, AttachPos, File}; +find_data_reader_pos(ChunkId, + {found, #seg_info{file = File} = SegInfo}, + Config) -> Name = maps:get(name, Config, <<>>), {ChunkId, _Epoch, FilePos} = offset_idx_scan(Name, ChunkId, SegInfo), - init_data_reader_at(ChunkId, FilePos, File, Config). + {ok, ChunkId, FilePos, File}. %% @doc Initialise a new offset reader %% @param OffsetSpec specifies where in the log to attach the reader @@ -1114,20 +1203,23 @@ init_offset_reader(OffsetSpec, Conf) -> init_offset_reader(_OffsetSpec, _Conf, 0) -> {error, retries_exhausted}; init_offset_reader(OffsetSpec, Conf, Attempt) -> - try - init_offset_reader0(OffsetSpec, Conf) + ManifestMod = application:get_env(osiris, log_manifest, ?MODULE), + try ManifestMod:find_offset_reader_position(OffsetSpec, Conf) of + {ok, ChunkId, Pos, Segment} -> + open_offset_reader_at(Segment, ChunkId, Pos, Conf); + {error, _} = Err -> + Err catch missing_file -> %% Retention policies are likely being applied, let's try again %% TODO: should we limit the number of retries? %% Remove cached index_files from config - init_offset_reader(OffsetSpec, - maps:remove(index_files, Conf), Attempt - 1); + init_offset_reader(OffsetSpec, Conf, Attempt - 1); {retry_with, NewOffsSpec, NewConf} -> init_offset_reader(NewOffsSpec, NewConf, Attempt - 1) end. -init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> +find_offset_reader_position({abs, Offs}, #{dir := Dir} = Config) -> case sorted_index_files(Dir) of [] -> {error, no_index_file}; @@ -1140,13 +1232,13 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> {error, {offset_out_of_range, Range}}; _ -> %% it is in range, convert to standard offset - init_offset_reader0(Offs, Conf) + find_offset_reader_position(Offs, Config) end end; -init_offset_reader0({timestamp, Ts}, #{} = Conf) -> +find_offset_reader_position({timestamp, Ts}, #{} = Conf) -> case sorted_index_files_rev(Conf) of [] -> - init_offset_reader0(next, Conf); + find_offset_reader_position(next, Conf); IdxFilesRev -> case timestamp_idx_file_search(Ts, IdxFilesRev) of {scan, IdxFile} -> @@ -1154,20 +1246,19 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> %% find nearest offset {ChunkId, FilePos} = chunk_location_for_timestamp(IdxFile, Ts), SegmentFile = segment_from_index_file(IdxFile), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, ChunkId, FilePos, SegmentFile}; {first_in, IdxFile} -> {ok, Fd} = file:open(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, ChunkId, FilePos, SegmentFile}; next -> %% segment was not found, attach next - %% this should be rare so no need to call the more optimal - %% open_offset_reader_at/4 function - init_offset_reader0(next, Conf) + %% this should be rare + find_offset_reader_position(next, Conf) end end; -init_offset_reader0(first, #{} = Conf) -> +find_offset_reader_position(first, #{} = Conf) -> case sorted_index_files(Conf) of [] -> {error, no_index_file}; @@ -1176,16 +1267,16 @@ init_offset_reader0(first, #{} = Conf) -> {ok, #seg_info{file = File, first = undefined}} -> %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + {ok, 0, ?LOG_HEADER_SIZE, File}; {ok, #seg_info{file = File, first = #chunk_info{id = FirstChunkId, pos = FilePos}}} -> - open_offset_reader_at(File, FirstChunkId, FilePos, Conf); + {ok, FirstChunkId, FilePos, File}; {error, _} = Err -> exit(Err) end end; -init_offset_reader0(next, #{} = Conf) -> +find_offset_reader_position(next, #{} = Conf) -> case sorted_index_files_rev(Conf) of [] -> {error, no_index_file}; @@ -1194,12 +1285,12 @@ init_offset_reader0(next, #{} = Conf) -> {ok, #seg_info{file = File, last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), - open_offset_reader_at(File, NextChunkId, FilePos, Conf); + {ok, NextChunkId, FilePos, File}; Err -> exit(Err) end end; -init_offset_reader0(last, #{name := Name} = Conf) -> +find_offset_reader_position(last, #{name := Name} = Conf) -> case sorted_index_files_rev(Conf) of [] -> {error, no_index_file}; @@ -1208,13 +1299,13 @@ init_offset_reader0(last, #{name := Name} = Conf) -> not_found -> ?DEBUG_(Name, "offset spec: 'last', user chunk not found, fall back to next", []), %% no user chunks in stream, this is awkward, fall back to next - init_offset_reader0(next, Conf); + find_offset_reader_position(next, Conf); {ChunkId, FilePos, IdxFile} -> File = segment_from_index_file(IdxFile), - open_offset_reader_at(File, ChunkId, FilePos, Conf) + {ok, ChunkId, FilePos, File} end end; -init_offset_reader0(OffsetSpec, #{} = Conf) +find_offset_reader_position(OffsetSpec, #{} = Conf) when is_integer(OffsetSpec) -> Name = maps:get(name, Conf, <<>>), case sorted_index_files(Conf) of @@ -1235,11 +1326,12 @@ init_offset_reader0(OffsetSpec, #{} = Conf) case find_segment_for_offset(StartOffset, IdxFiles) of {not_found, high} -> + %% TODO: shouldn't this cache the index files? throw({retry_with, next, Conf}); {end_of_log, #seg_info{file = SegmentFile, last = LastChunk}} -> {ChunkId, FilePos} = next_location(LastChunk), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, ChunkId, FilePos, SegmentFile}; {found, #seg_info{file = SegmentFile} = SegmentInfo} -> {ChunkId, _Epoch, FilePos} = case offset_idx_scan(Name, StartOffset, SegmentInfo) of @@ -1256,7 +1348,7 @@ init_offset_reader0(OffsetSpec, #{} = Conf) end, ?DEBUG_(Name, "resolved chunk_id ~b" " at file pos: ~w ", [ChunkId, FilePos]), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf) + {ok, ChunkId, FilePos, SegmentFile} end end. @@ -1267,7 +1359,8 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, readers_counter_fun := ReaderCounterFun, options := Options} = Conf) -> - {ok, Fd} = open(SegmentFile, [raw, binary, read]), + ReaderMod = application:get_env(osiris, log_reader, osiris_log_reader), + {ok, Reader} = throw_missing(ReaderMod:open(SegmentFile)), Cnt = make_counter(Conf), ReaderCounterFun(1), FilterMatcher = case Options of @@ -1284,13 +1377,13 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, shared = Shared }, mode = #read{type = offset, + reader = {ReaderMod, Reader}, position = FilePos, chunk_selector = maps:get(chunk_selector, Options, user_data), next_offset = NextChunkId, transport = maps:get(transport, Options, tcp), - filter = FilterMatcher}, - fd = Fd}}. + filter = FilterMatcher}}}. %% Searches the index files backwards for the ID of the last user chunk. last_user_chunk_location(Name, RevdIdxFiles) @@ -1401,7 +1494,7 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> Err end. --record(iterator, {fd :: file:io_device(), +-record(iterator, {reader :: {module(), osiris_log_reader:state()}, next_offset :: offset(), %% entries left num_left :: non_neg_integer(), @@ -1452,15 +1545,18 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, filter_size := FilterSize, position := Pos, next_position := NextPos} = Header, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> + #?MODULE{mode = #read{reader = {ReaderMod, Reader0}, + next_offset = ChId} = Read} = State1} -> State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, position = NextPos}}, case needs_handling(RType, Selector, ChType) of true -> DataPos = Pos + ?HEADER_SIZE_B + FilterSize, - Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint, - DataSize, NumEntries), - Iterator = #iterator{fd = Fd, + {Data, Reader} = iter_read_ahead(ReaderMod, Reader0, + DataPos, ChId, Crc, + CreditHint, DataSize, + NumEntries), + Iterator = #iterator{reader = {ReaderMod, Reader}, data = Data, next_offset = ChId, num_left = NumEntries, @@ -1478,58 +1574,62 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, end_of_chunk | {offset_entry(), chunk_iterator()}. iterator_next(#iterator{num_left = 0}) -> end_of_chunk; -iterator_next(#iterator{fd = Fd, +iterator_next(#iterator{reader = {ReaderMod, Reader0}, next_offset = NextOffs, num_left = Num, data = ?REC_MATCH_SIMPLE(Len, Rem0), next_record_pos = Pos} = I0) -> - {Record, Rem} = + {Record, Rem, Reader} = case Rem0 of <> -> - {Record0, Rem1}; + {Record0, Rem1, Reader0}; _ -> %% not enough in Rem0 to read the entire record %% so we need to read it from disk - {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, - Len + ?ITER_READ_AHEAD_B), - {Record0, Rem1} + {ok, <>, Reader1} = + ReaderMod:pread(Reader0, Pos + ?REC_HDR_SZ_SIMPLE_B, + Len + ?ITER_READ_AHEAD_B, within), + {Record0, Rem1, Reader1} end, - I = I0#iterator{next_offset = NextOffs + 1, + I = I0#iterator{reader = {ReaderMod, Reader}, + next_offset = NextOffs + 1, num_left = Num - 1, data = Rem, next_record_pos = Pos + ?REC_HDR_SZ_SIMPLE_B + Len}, {{NextOffs, Record}, I}; -iterator_next(#iterator{fd = Fd, +iterator_next(#iterator{reader = {ReaderMod, Reader0}, next_offset = NextOffs, num_left = Num, data = ?REC_MATCH_SUBBATCH(CompType, NumRecs, UncompressedLen, Len, Rem0), next_record_pos = Pos} = I0) -> - {Data, Rem} = + {Data, Rem, Reader} = case Rem0 of <> -> - {Record0, Rem1}; + {Record0, Rem1, Reader0}; _ -> %% not enough in Rem0 to read the entire record %% so we need to read it from disk - {ok, <>} = - file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, - Len + ?ITER_READ_AHEAD_B), - {Record0, Rem1} + {ok, <>, Reader1} = + ReaderMod:pread(Reader0, Pos + ?REC_HDR_SZ_SUBBATCH_B, + Len + ?ITER_READ_AHEAD_B, within), + {Record0, Rem1, Reader1} end, Record = {batch, NumRecs, CompType, UncompressedLen, Data}, - I = I0#iterator{next_offset = NextOffs + NumRecs, + I = I0#iterator{reader = {ReaderMod, Reader}, + next_offset = NextOffs + NumRecs, num_left = Num - 1, data = Rem, next_record_pos = Pos + ?REC_HDR_SZ_SUBBATCH_B + Len}, {{NextOffs, Record}, I}; -iterator_next(#iterator{fd = Fd, +iterator_next(#iterator{reader = {ReaderMod, Reader0}, next_record_pos = Pos} = I) -> - {ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), - iterator_next(I#iterator{data = Data}). + {ok, Data, Reader} = ReaderMod:pread(Reader0, Pos, ?ITER_READ_AHEAD_B, + within), + iterator_next(I#iterator{reader = {ReaderMod, Reader}, + data = Data}). -spec read_chunk(state()) -> {ok, binary(), state()} | @@ -1550,16 +1650,19 @@ read_chunk(#?MODULE{cfg = #cfg{}} = State0) -> position := Pos, next_position := NextPos, trailer_size := TrailerSize}, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State} -> + #?MODULE{mode = #read{reader = {ReaderMod, Reader0}, + next_offset = ChId} = Read} = State} -> ToRead = ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - {ok, ChData} = file:pread(Fd, Pos, ToRead), + {ok, ChData, Reader} = ReaderMod:pread(Reader0, Pos, ToRead, + within), <<_:?HEADER_SIZE_B/binary, _:FilterSize/binary, RecordData:DataSize/binary, _/binary>> = ChData, validate_crc(ChId, Crc, RecordData), {ok, ChData, - State#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, + State#?MODULE{mode = Read#read{reader = {ReaderMod, Reader}, + next_offset = ChId + NumRecords, position = NextPos}}}; Other -> Other @@ -1644,7 +1747,7 @@ is_valid_chunk_on_disk(SegFile, Pos) -> false end. --spec send_file(gen_tcp:socket(), state()) -> +-spec send_file(gen_tcp:socket() | ssl:socket(), state()) -> {ok, state()} | {error, term()} | {end_of_stream, state()}. @@ -1671,8 +1774,8 @@ send_file(Sock, position := Pos, next_position := NextPos, header_data := HeaderData} = Header, - #?MODULE{fd = Fd, - mode = #read{next_offset = ChId} = Read0} = State1} -> + #?MODULE{mode = #read{reader = {ReaderMod, Reader0}, + next_offset = ChId} = Read0} = State1} -> %% read header %% used to write frame headers to socket %% and return the number of bytes to sendfile @@ -1687,8 +1790,8 @@ send_file(Sock, {0, FilterSize + DataSize + TrailerSize} end, - Read = Read0#read{next_offset = ChId + NumRecords, - position = NextPos}, + Read1 = Read0#read{next_offset = ChId + NumRecords, + position = NextPos}, %% only sendfile if either the reader is a data reader %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of @@ -1696,9 +1799,12 @@ send_file(Sock, _ = Callback(Header, ToSend + byte_size(HeaderData)), case send(Transport, Sock, HeaderData) of ok -> - case sendfile(Transport, Fd, Sock, - Pos + ?HEADER_SIZE_B + ToSkip, ToSend) of - ok -> + case ReaderMod:sendfile(Transport, Reader0, Sock, + Pos + ?HEADER_SIZE_B + ToSkip, + ToSend) of + {ok, Reader} -> + Read = Read1#read{reader = {ReaderMod, + Reader}}, State = State1#?MODULE{mode = Read}, {ok, State}; Err -> @@ -1710,7 +1816,7 @@ send_file(Sock, Err end; false -> - State = State1#?MODULE{mode = Read}, + State = State1#?MODULE{mode = Read1}, %% skip chunk and recurse send_file(Sock, State, Callback) end; @@ -1736,10 +1842,14 @@ needs_handling(_, _, _) -> -spec close(state()) -> ok. close(#?MODULE{cfg = #cfg{counter_id = CntId, readers_counter_fun = Fun}, - fd = SegFd, - index_fd = IdxFd}) -> - close_fd(IdxFd), + fd = SegFd} = State) -> close_fd(SegFd), + case State of + #?MODULE{mode = #write{manifest = {ManifestMod, Manifest}}} -> + ok = ManifestMod:close_manifest(Manifest); + _ -> + ok + end, Fun(-1), case CntId of undefined -> @@ -1752,7 +1862,13 @@ delete_directory(#{name := Name, dir := _} = Config) -> Dir = directory(Config), ?DEBUG_(Name, " deleting directory ~ts", [Dir]), - delete_dir(Dir); + delete_dir(Dir), + case application:get_env(osiris, log_manifest) of + {ok, ManifestMod} -> + ok = ManifestMod:delete(Config); + undefined -> + ok + end; delete_directory(#{name := Name}) -> delete_directory(Name); delete_directory(Name) when ?IS_STRING(Name) -> @@ -1854,7 +1970,7 @@ first_and_last_seginfos0([]) -> none; first_and_last_seginfos0([FstIdxFile]) -> {ok, SegInfo} = build_seg_info(FstIdxFile), - {1, SegInfo, SegInfo}; + {index_file_offsets([FstIdxFile]), SegInfo, SegInfo}; first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) -> %% this function is only used by init case build_seg_info(FstIdxFile) of @@ -1868,7 +1984,7 @@ first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) -> [_ | RetryIndexFiles] = lists:reverse(IdxFiles), first_and_last_seginfos0(lists:reverse(RetryIndexFiles)); {ok, LastSegInfo} -> - {length(Rem) + 1, FstSegInfo, LastSegInfo}; + {index_file_offsets(IdxFiles), FstSegInfo, LastSegInfo}; {error, Err} -> ?ERROR("~s: failed to build seg_info from file ~ts, error: ~w", [?MODULE, LastIdxFile, Err]), @@ -1879,6 +1995,12 @@ first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) -> first_and_last_seginfos0(Rem) end. +index_file_offsets(IdxFiles) -> + [begin + <> = filename:basename(I), + binary_to_integer(Offset) + end || I <- IdxFiles]. + build_seg_info(IdxFile) -> case last_valid_idx_record(IdxFile) of {ok, ?IDX_MATCH(_, _, LastChunkPos)} -> @@ -2016,6 +2138,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> {ok, #seg_info{file = SegFile, index = IdxFile, size = Size, + chunks = chunks_in_idx(IdxFile), first = FstChInfo, last = LastChInfo}}; _ -> @@ -2035,6 +2158,9 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> end end. +chunks_in_idx(IdxFile) -> + (file_size(IdxFile) - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B. + -spec overview(file:filename_all()) -> {range(), [{epoch(), offset()}]}. overview(Dir) -> @@ -2099,12 +2225,14 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir, -spec update_retention([retention_spec()], state()) -> state(). update_retention(Retention, - #?MODULE{cfg = #cfg{name = Name, - retention = Retention0} = Cfg} = State0) + #?MODULE{mode = #write{manifest = {ManifestMod, Manifest0}} = + Write0, + cfg = #cfg{name = Name}} = State0) when is_list(Retention) -> - ?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]), - State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}}, - trigger_retention_eval(State). + ?DEBUG_(Name, " updating retention to ~w", [Retention]), + Manifest = ManifestMod:handle_event({retention_updated, Retention}, + Manifest0), + State0#?MODULE{mode = Write0#write{manifest = {ManifestMod, Manifest}}}. -spec evaluate_retention(file:filename_all(), [retention_spec()]) -> {range(), FirstTimestamp :: osiris:timestamp(), @@ -2394,33 +2522,31 @@ write_chunk(Chunk, #?MODULE{cfg = #cfg{counter = CntRef, shared = Shared} = Cfg, fd = Fd, - index_fd = IdxFd, mode = #write{segment_size = {SegSizeBytes, SegSizeChunks}, + manifest = {ManifestMod, Manifest0}, tail_info = {Next, _}} = Write} = State) -> case max_segment_size_reached(State) of true -> - trigger_retention_eval( - write_chunk(Chunk, - ChType, - Timestamp, - Epoch, - NumRecords, - open_new_segment(State))); + write_chunk(Chunk, + ChType, + Timestamp, + Epoch, + NumRecords, + open_new_segment(State)); false -> NextOffset = Next + NumRecords, Size = iolist_size(Chunk), {ok, Cur} = file:position(Fd, cur), ok = file:write(Fd, Chunk), - ok = file:write(IdxFd, - <>), + ChunkInfo = #chunk_info{id = Next, timestamp = Timestamp, + epoch = Epoch, num = NumRecords, + type = ChType, size = Size, pos = Cur}, + Event = {chunk_written, ChunkInfo, Chunk}, + Manifest = ManifestMod:handle_event(Event, Manifest0), osiris_log_shared:set_last_chunk_id(Shared, Next), %% update counters counters:put(CntRef, ?C_OFFSET, NextOffset - 1), @@ -2429,6 +2555,7 @@ write_chunk(Chunk, State#?MODULE{mode = Write#write{tail_info = {NextOffset, {Epoch, Next, Timestamp}}, + manifest = {ManifestMod, Manifest}, segment_size = {SegSizeBytes + Size, SegSizeChunks + 1}}} end. @@ -2447,26 +2574,6 @@ max_segment_size_reached( CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks. -sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> - ok; -sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> - case file:sendfile(Fd, Sock, Pos, ToSend, []) of - {ok, 0} -> - %% TODO add counter for this? - sendfile(Transport, Fd, Sock, Pos, ToSend); - {ok, BytesSent} -> - sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent); - {error, _} = Err -> - Err - end; -sendfile(ssl, Fd, Sock, Pos, ToSend) -> - case file:pread(Fd, Pos, ToSend) of - {ok, Data} -> - ssl:send(Sock, Data); - {error, _} = Err -> - Err - end. - send(tcp, Sock, Data) -> gen_tcp:send(Sock, Data); send(ssl, Sock, Data) -> @@ -2639,33 +2746,31 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, directory = Dir, counter = Cnt}, fd = OldFd, - index_fd = OldIdxFd, + current_file = OldFilename, mode = #write{type = _WriterType, + manifest = {ManifestMod, Manifest0}, tail_info = {NextOffset, _}} = Write} = State0) -> _ = close_fd(OldFd), - _ = close_fd(OldIdxFd), Filename = make_file_name(NextOffset, "segment"), - IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), - {ok, IdxFd} = - file:open( - filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), - ok = file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = file:open( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), ok = file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file {ok, _} = file:position(Fd, eof), - {ok, _} = file:position(IdxFd, eof), + + Event = {segment_opened, OldFilename, Filename}, + Manifest = ManifestMod:handle_event(Event, Manifest0), + counters:add(Cnt, ?C_SEGMENTS, 1), State0#?MODULE{current_file = Filename, fd = Fd, %% reset segment_size counter - index_fd = IdxFd, - mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. + mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}, + manifest = {ManifestMod, Manifest}}}. open_index_read(File) -> {ok, Fd} = open(File, [read, raw, binary, read_ahead]), @@ -2760,6 +2865,24 @@ validate_crc(ChunkId, Crc, IOData) -> exit({crc_validation_failure, {chunk_id, ChunkId}}) end. + +-spec with_defaults(config()) -> config(). +with_defaults(Config0) -> + Shared = case Config0 of + #{shared := S} -> + S; + _ -> + osiris_log_shared:new() + end, + maps:merge(#{max_segment_size_bytes => ?DEFAULT_MAX_SEGMENT_SIZE_B, + retention => [], + filter_size => ?DEFAULT_FILTER_SIZE, + shared => Shared, + counter => make_counter(Config0), + counter_id => counter_id(Config0), + tracking_config => #{}}, + Config0). + -spec make_counter(osiris_log:config()) -> counters:counters_ref(). make_counter(#{counter := Counter}) -> @@ -2864,11 +2987,11 @@ recover_tracking(Fd, Trk0, Pos0) -> read_header0(#?MODULE{cfg = #cfg{directory = Dir, shared = Shared, counter = CntRef}, - mode = #read{next_offset = NextChId0, + mode = #read{reader = {ReaderMod, Reader0}, + next_offset = NextChId0, position = Pos, filter = Filter} = Read0, - current_file = CurFile, - fd = Fd} = + current_file = CurFile} = State) -> %% reads the next header if permitted case can_read_next(State) of @@ -2876,7 +2999,9 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% optimistically read 64 bytes (small binary) as it may save us %% a syscall reading the filter if the filter is of the default %% 16 byte size - case file:pread(Fd, Pos, ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE) of + case ReaderMod:pread(Reader0, Pos, + ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE, + boundary) of {ok, <> = HeaderData0} -> + MaybeFilter/binary>> = HeaderData0, Reader1} -> <> = HeaderData0, counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), counters:add(CntRef, ?C_CHUNKS, 1), NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, - ChunkFilter = case MaybeFilter of - <> -> - %% filter is of default size or 0 - F; - _ when Filter =/= undefined -> - %% the filter is larger than default - case file:pread(Fd, Pos + ?HEADER_SIZE_B, - FilterSize) of - {ok, F} -> - F; - eof -> - throw({end_of_stream, State}) - end; - _ -> - <<>> - end, + {ChunkFilter, Reader} = + case MaybeFilter of + <> -> + %% filter is of default size or 0 + {F, Reader1}; + _ when Filter =/= undefined -> + %% the filter is larger than default + case ReaderMod:pread(Reader1, + Pos + ?HEADER_SIZE_B, + FilterSize, within) of + {ok, F, Reader2} -> + {F, Reader2}; + eof -> + throw({end_of_stream, State}) + end; + _ -> + {<<>>, Reader1} + end, case osiris_bloom:is_match(ChunkFilter, Filter) of true -> @@ -2930,10 +3057,12 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, position => Pos}, State}; false -> Read = Read0#read{next_offset = NextChId0 + NumRecords, - position = NextPos}, + position = NextPos, + reader = {ReaderMod, Reader}}, read_header0(State#?MODULE{mode = Read}); {retry_with, NewFilter} -> - Read = Read0#read{filter = NewFilter}, + Read = Read0#read{filter = NewFilter, + reader = {ReaderMod, Reader}}, read_header0(State#?MODULE{mode = Read}) end; {ok, Bin} when byte_size(Bin) < ?HEADER_SIZE_B -> @@ -2956,15 +3085,15 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case file:open(filename:join(Dir, SegFile), - [raw, binary, read]) of - {ok, Fd2} -> - ok = file:close(Fd), - Read = Read0#read{next_offset = NextChId, + ok = ReaderMod:close(Reader0), + case ReaderMod:open(filename:join(Dir, SegFile)) of + {ok, Reader} -> + Read = Read0#read{reader = {ReaderMod, + Reader}, + next_offset = NextChId, position = ?LOG_HEADER_SIZE}, read_header0( State#?MODULE{current_file = SegFile, - fd = Fd2, mode = Read}); {error, enoent} -> {end_of_stream, State} @@ -2982,7 +3111,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, _Crc:32/integer, _DataSize:32/unsigned, _TrailerSize:32/unsigned, - _Reserved:32>>} -> + _Reserved:32>>, _Reader} -> %% TODO: we may need to return the new state here if %% we've crossed segments {error, {unexpected_chunk_id, UnexpectedChId, NextChId0}}; @@ -2993,28 +3122,6 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, {end_of_stream, State} end. -trigger_retention_eval(#?MODULE{cfg = - #cfg{name = Name, - directory = Dir, - retention = RetentionSpec, - counter = Cnt, - shared = Shared}} = State) -> - - %% updates first offset and first timestamp - %% after retention has been evaluated - EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft}) - when is_integer(FstOff), - is_integer(FstTs) -> - osiris_log_shared:set_first_chunk_id(Shared, FstOff), - counters:put(Cnt, ?C_FIRST_OFFSET, FstOff), - counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs), - counters:put(Cnt, ?C_SEGMENTS, NumSegLeft); - (_) -> - ok - end, - ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun), - State. - next_location(undefined) -> {0, ?LOG_HEADER_SIZE}; next_location(#chunk_info{id = Id, @@ -3215,23 +3322,28 @@ dump_crc_check(Fd) -> dump_crc_check(Fd) end. -iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> +iter_read_ahead(_ReaderMod, Reader, _Pos, _ChunkId, _Crc, 1, _DataSize, + _NumEntries) -> %% no point reading ahead if there is only one entry to be read at this %% time - undefined; -iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) + {undefined, Reader}; +iter_read_ahead(ReaderMod, Reader0, Pos, ChunkId, Crc, Credit, DataSize, + NumEntries) when Credit == all orelse NumEntries == 1 -> - {ok, Data} = file:pread(Fd, Pos, DataSize), + {ok, Data, Reader} = ReaderMod:pread(Reader0, Pos, DataSize, within), validate_crc(ChunkId, Crc, Data), - Data; -iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> + {Data, Reader}; +iter_read_ahead(ReaderMod, Reader0, Pos, _ChunkId, _Crc, Credit0, DataSize, + NumEntries) -> %% read ahead, assumes roughly equal entry sizes which may not be the case %% TODO round up to nearest block? %% We can only practically validate CRC if we read the whole data Credit = min(Credit0, NumEntries), Size = DataSize div NumEntries * Credit, - {ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), - Data. + {ok, Data, Reader} = ReaderMod:pread(Reader0, Pos, + Size + ?ITER_READ_AHEAD_B, + within), + {Data, Reader}. list_dir(Dir) -> case prim_file:list_dir(Dir) of @@ -3241,6 +3353,67 @@ list_dir(Dir) -> [list_to_binary(F) || F <- Files] end. +handle_event({segment_opened, OldSegment, NewSegment}, + #manifest{directory = Dir, + index_fd = Fd0} = Manifest) -> + _ = close_fd(Fd0), + IdxFilename = unicode:characters_to_list( + string:replace(NewSegment, ".segment", ".index", + trailing)), + + case OldSegment of + undefined -> + %% Skip retention evaluation when opening a stream. + ok; + _ -> + ok = trigger_retention_eval(Manifest) + end, + + {ok, Fd} = + file:open( + filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), + %% Write the header if this is a new index file. + case file:position(Fd, eof) of + {ok, 0} -> + ok = file:write(Fd, ?IDX_HEADER), + %% TODO: necessary? + {ok, _} = file:position(Fd, eof), + ok; + {ok, _} -> + ok + end, + Manifest#manifest{index_fd = Fd}; +handle_event({chunk_written, #chunk_info{id = Offset, + timestamp = Timestamp, + epoch = Epoch, + pos = SegmentFilePos, + type = ChType}, _Chunk}, + #manifest{index_fd = Fd} = Manifest) -> + ok = file:write(Fd, + <>), + Manifest; +handle_event({retention_updated, Retention}, Manifest0) -> + Manifest = Manifest0#manifest{retention = Retention}, + trigger_retention_eval(Manifest), + Manifest. + +trigger_retention_eval(#manifest{name = Name, + directory = Dir, + retention = RetentionSpec, + retention_eval_fun = EvalFun}) -> + ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun). + +close_manifest(#manifest{index_fd = Fd}) -> + _ = close_fd(Fd), + ok. + +delete(_Config) -> + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/osiris_log_manifest.erl b/src/osiris_log_manifest.erl new file mode 100644 index 0000000..33260bf --- /dev/null +++ b/src/osiris_log_manifest.erl @@ -0,0 +1,65 @@ +-module(osiris_log_manifest). + +-include("src/osiris.hrl"). + +-type state() :: term(). + +-type segment_info() :: + #{file := file:filename_all(), + size := non_neg_integer(), + chunks := non_neg_integer(), + first => #chunk_info{}, + last => #chunk_info{} + }. + +-type log_info() :: + #{num_segments := non_neg_integer(), + %% These keys may be unset if the log is empty. + first_offset => osiris:offset(), + first_timestamp => osiris:timestamp(), + last_segment => segment_info(), + %% Optional. Included by the default impls of writer_manifest/1 and + %% acceptor_manifest/3 for the convenience of other impls. + segment_offsets => [osiris:offset()] + }. + +-type event() :: {segment_opened, + OldSegment :: file:filename_all() | undefined, + NewSegment :: file:filename_all()} | + {chunk_written, #chunk_info{}, iolist()}. + +-export_type([state/0, log_info/0, event/0]). + +-callback acceptor_manifest(osiris_log:range(), + EpochOffsets :: [{osiris:offset(), osiris:epoch()}], + osiris_log:config()) -> + {log_info(), osiris_log:config(), state()}. + +-callback writer_manifest(osiris_log:config()) -> + {log_info(), osiris_log:config(), state()}. + +-callback find_data_reader_position(osiris:tail_info(), osiris_log:config()) -> + {ok, osiris:offset(), Pos :: non_neg_integer(), + Segment :: file:filename_all()} | + {error, + {offset_out_of_range, + empty | {From :: osiris:offset(), To :: osiris:offset()}}} | + {error, {invalid_last_offset_epoch, osiris:epoch(), osiris:offset()}} | + {error, term()}. + +-callback find_offset_reader_position(osiris:offset_spec(), + osiris_log:config()) -> + {ok, osiris:offset(), Pos :: non_neg_integer(), + Segment :: file:filename_all()} | + {error, + {offset_out_of_range, + empty | {From :: osiris:offset(), To :: osiris:offset()}}} | + {error, {invalid_chunk_header, term()}} | + {error, no_index_file} | + {error, retries_exhausted}. + +-callback handle_event(event(), state()) -> state(). + +-callback close_manifest(state()) -> ok. + +-callback delete(osiris_log:config()) -> ok. diff --git a/src/osiris_log_reader.erl b/src/osiris_log_reader.erl new file mode 100644 index 0000000..cc6c3b6 --- /dev/null +++ b/src/osiris_log_reader.erl @@ -0,0 +1,67 @@ +-module(osiris_log_reader). + +-export([open/1, pread/4, sendfile/5, close/1]). + +-type state() :: term(). + +%% A hint for whether the position should lie on a chunk header (i.e. the next +%% data would be the magic), or otherwise is somewhere within the chunk. +%% NOTE: sendfile is used exclusively 'within' chunks. +-type position_hint() :: boundary | within. + +-export_type([state/0, + position_hint/0]). + +-callback open(SegmentFilename :: file:filename_all()) -> + {ok, state()} | {error, term()}. + +-callback pread(state(), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + position_hint()) -> + {ok, Data :: binary(), state()} | eof | {error, term()}. + +-callback sendfile(tcp | ssl, + state(), + gen_tcp:socket() | ssl:socket(), + Pos :: non_neg_integer(), + ToSend :: non_neg_integer()) -> + {ok, BytesWritten :: non_neg_integer(), state()} | {error, term()}. + +-callback close(state()) -> ok | {error, term()}. + +%% --- Default implementation + +open(SegmentFilename) -> + file:open(SegmentFilename, [raw, binary, read]). + +pread(Fd, Offset, Bytes, _Hint) -> + case file:pread(Fd, Offset, Bytes) of + {ok, Data} -> + {ok, Data, Fd}; + eof -> + eof; + {error, _} = Err -> + Err + end. + +sendfile(tcp, Fd, _Socket, _Pos, 0) -> + {ok, Fd}; +sendfile(tcp, Fd, Socket, Pos, ToSend) -> + case file:sendfile(Fd, Socket, Pos, ToSend, []) of + {ok, BytesSent} -> + sendfile(tcp, Fd, Socket, Pos + BytesSent, ToSend - BytesSent); + {error, _} = Err -> + Err + end; +sendfile(ssl, Fd, Socket, Pos, ToSend) -> + case file:pread(Fd, Pos, ToSend) of + {ok, Data} -> + ok = ssl:send(Socket, Data), + {ok, Fd}; + {error, _} = Err -> + Err + end. + +close(Fd) -> + file:close(Fd).