diff --git a/pkg/local_object_storage/blobstor/fstree/bench_test.go b/pkg/local_object_storage/blobstor/fstree/bench_test.go new file mode 100644 index 0000000000..09188e6851 --- /dev/null +++ b/pkg/local_object_storage/blobstor/fstree/bench_test.go @@ -0,0 +1,217 @@ +package fstree_test + +import ( + "io" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func BenchmarkFSTree_Head(b *testing.B) { + for _, size := range payloadSizes { + b.Run(generateSizeLabel(size), func(b *testing.B) { + fsTree := fstree.New(fstree.WithPath(b.TempDir())) + + require.NoError(b, fsTree.Open(false)) + require.NoError(b, fsTree.Init()) + + testReadOp(b, fsTree, fsTree.Head, "Head", size) + }) + } +} + +func BenchmarkFSTree_Get(b *testing.B) { + for _, size := range payloadSizes { + b.Run(generateSizeLabel(size), func(b *testing.B) { + fsTree := fstree.New(fstree.WithPath(b.TempDir())) + + require.NoError(b, fsTree.Open(false)) + require.NoError(b, fsTree.Init()) + + testReadOp(b, fsTree, fsTree.Get, "Get", size) + }) + } +} + +func BenchmarkFSTree_GetStream(b *testing.B) { + for _, size := range payloadSizes { + b.Run(generateSizeLabel(size), func(b *testing.B) { + fsTree := fstree.New(fstree.WithPath(b.TempDir())) + + require.NoError(b, fsTree.Open(false)) + require.NoError(b, fsTree.Init()) + + testGetStreamOp(b, fsTree, size) + }) + } +} + +func testReadOp(b *testing.B, fsTree *fstree.FSTree, read func(address oid.Address) (*objectSDK.Object, error), + name string, payloadSize int) { + b.Run(name+"_regular", func(b *testing.B) { + obj := generateTestObject(payloadSize) + addr := object.AddressOf(obj) + + require.NoError(b, fsTree.Put(addr, obj.Marshal())) + b.ReportAllocs() + b.ResetTimer() + for range b.N { + _, err := read(addr) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run(name+"_combined", func(b *testing.B) { + const numObjects = 10 + + objMap := make(map[oid.Address][]byte, numObjects) + addrs := make([]oid.Address, numObjects) + for i := range numObjects { + o := generateTestObject(payloadSize) + objMap[object.AddressOf(o)] = o.Marshal() + addrs[i] = object.AddressOf(o) + } + require.NoError(b, fsTree.PutBatch(objMap)) + + b.ReportAllocs() + b.ResetTimer() + for k := range b.N { + _, err := read(addrs[k%numObjects]) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run(name+"_compressed", func(b *testing.B) { + obj := generateTestObject(payloadSize) + addr := object.AddressOf(obj) + + compressConfig := &compression.Config{ + Enabled: true, + } + require.NoError(b, compressConfig.Init()) + fsTree.SetCompressor(compressConfig) + require.NoError(b, fsTree.Put(addr, obj.Marshal())) + + b.ReportAllocs() + b.ResetTimer() + for range b.N { + _, err := read(addr) + if err != nil { + b.Fatal(err) + } + } + }) +} + +func testGetStreamOp(b *testing.B, fsTree *fstree.FSTree, payloadSize int) { + b.Run("GetStream_regular", func(b *testing.B) { + obj := generateTestObject(payloadSize) + addr := object.AddressOf(obj) + + require.NoError(b, fsTree.Put(addr, obj.Marshal())) + b.ReportAllocs() + b.ResetTimer() + for range b.N { + header, reader, err := fsTree.GetStream(addr) + if err != nil { + b.Fatal(err) + } + if header == nil { + b.Fatal("header is nil") + } + if reader != nil { + reader.Close() + } + } + }) + + b.Run("GetStream_combined", func(b *testing.B) { + const numObjects = 10 + + objMap := make(map[oid.Address][]byte, numObjects) + addrs := make([]oid.Address, numObjects) + for i := range numObjects { + o := generateTestObject(payloadSize) + objMap[object.AddressOf(o)] = o.Marshal() + addrs[i] = object.AddressOf(o) + } + require.NoError(b, fsTree.PutBatch(objMap)) + + b.ReportAllocs() + b.ResetTimer() + for k := range b.N { + header, reader, err := fsTree.GetStream(addrs[k%numObjects]) + if err != nil { + b.Fatal(err) + } + if header == nil { + b.Fatal("header is nil") + } + if reader != nil { + reader.Close() + } + } + }) + + b.Run("GetStream_compressed", func(b *testing.B) { + obj := generateTestObject(payloadSize) + addr := object.AddressOf(obj) + + compressConfig := &compression.Config{ + Enabled: true, + } + require.NoError(b, compressConfig.Init()) + fsTree.SetCompressor(compressConfig) + require.NoError(b, fsTree.Put(addr, obj.Marshal())) + + b.ReportAllocs() + b.ResetTimer() + for range b.N { + header, reader, err := fsTree.GetStream(addr) + if err != nil { + b.Fatal(err) + } + if header == nil { + b.Fatal("header is nil") + } + if reader != nil { + reader.Close() + } + } + }) + + b.Run("GetStream_with_payload_read", func(b *testing.B) { + obj := generateTestObject(payloadSize) + addr := object.AddressOf(obj) + + require.NoError(b, fsTree.Put(addr, obj.Marshal())) + b.ReportAllocs() + b.ResetTimer() + for range b.N { + header, reader, err := fsTree.GetStream(addr) + if err != nil { + b.Fatal(err) + } + if header == nil { + b.Fatal("header is nil") + } + if reader != nil { + // Read all payload to simulate real usage + _, err := io.ReadAll(reader) + if err != nil { + b.Fatal(err) + } + reader.Close() + } + } + }) +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 3a1abd6544..7253e18e7b 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -22,6 +22,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protowire" ) // FSTree represents an object storage as a filesystem tree. @@ -90,6 +91,26 @@ const ( // combinedDataOff is the offset from the start of the combined prefix to object data. // It's also the length of the prefix in total. combinedDataOff = combinedLengthOff + combinedLenSize + + // streamPrefix is the prefix for streamed objects. It is used to distinguish + // streamed objects from regular ones. + streamPrefix = 0x7e + + // streamLenHeaderOff is the offset from the start of the stream prefix to + // the length of the header data. + streamLenHeaderOff = 2 + + // streamLenSize is sizeof(uint32), length of a serialized 32-bit BE integer + // that represents the length of the header or payload data. + streamLenSize = 4 + + // streamLenDataOff is the offset from the start of the stream prefix to + // the length of the data. + streamLenDataOff = streamLenHeaderOff + streamLenSize + + // streamDataOff is the offset from the start of the stream prefix to the + // start of the data. It is used to read the data after the header. + streamDataOff = streamLenDataOff + streamLenSize ) var _ common.Storage = (*FSTree)(nil) @@ -339,7 +360,7 @@ func (t *FSTree) Put(addr oid.Address, data []byte) error { if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil { return fmt.Errorf("mkdirall for %q: %w", p, err) } - data = t.Compress(data) + data = t.processHeaderAndPayload(data) err := t.writer.writeData(addr.Object(), p, data) if err != nil { @@ -363,7 +384,7 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error { writeDataUnits = append(writeDataUnits, writeDataUnit{ id: addr.Object(), path: p, - data: t.Compress(data), + data: t.processHeaderAndPayload(data), }) } @@ -375,6 +396,32 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error { return nil } +// processHeaderAndPayload processes the header and payload of the object data. +func (t *FSTree) processHeaderAndPayload(data []byte) []byte { + headerEnd, payloadStart, err := extractHeaderAndPayload(data, nil) + if err != nil || headerEnd == 0 { + return data + } + + header := data[:headerEnd] + payload := data[payloadStart:] + + hLen := len(header) + payload = t.Compress(payload) + pLen := len(payload) + + res := make([]byte, hLen+pLen+streamDataOff) + res[0] = streamPrefix + res[1] = 0 // version 0 + binary.BigEndian.PutUint32(res[streamLenHeaderOff:], uint32(hLen)) + binary.BigEndian.PutUint32(res[streamLenDataOff:], uint32(pLen)) + + copy(res[streamDataOff:], header) + copy(res[streamDataOff+hLen:], payload) + + return res +} + // Get returns an object from the storage by address. func (t *FSTree) Get(addr oid.Address) (*objectSDK.Object, error) { data, err := t.getObjBytes(addr) @@ -433,6 +480,16 @@ func parseCombinedPrefix(p []byte) ([]byte, uint32) { binary.BigEndian.Uint32(p[combinedLengthOff:combinedDataOff]) } +// parseStreamPrefix checks the given byte slice for stream prefix and returns +// the length of the header and data if so (0, 0 otherwise). +func parseStreamPrefix(p []byte) (uint32, uint32) { + if p[0] != streamPrefix || p[1] != 0 { // Only version 0 is supported now. + return 0, 0 + } + return binary.BigEndian.Uint32(p[streamLenHeaderOff:streamLenDataOff]), + binary.BigEndian.Uint32(p[streamLenDataOff:]) +} + func (t *FSTree) extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) { var ( comBuf [combinedDataOff]byte @@ -485,6 +542,23 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte return nil, fmt.Errorf("read: %w", err) } data = data[:len(initial)+n] + hLen, _ := parseStreamPrefix(data) + if hLen > 0 { + data = data[streamDataOff:] + payload, err := t.Decompress(data[hLen:]) + if err != nil { + return nil, fmt.Errorf("decompress payload: %w", err) + } + pLen := len(payload) + payloadNum := protowire.Number(4) + n := protowire.SizeTag(payloadNum) + protowire.SizeVarint(uint64(pLen)) + buf := make([]byte, int(hLen)+pLen+n) + copy(buf[:hLen], data) + off := binary.PutUvarint(buf[hLen:], protowire.EncodeTag(payloadNum, protowire.BytesType)) + int(hLen) + off += binary.PutUvarint(buf[off:], uint64(pLen)) + copy(buf[off:], payload) + data = buf + } return t.Decompress(data) } diff --git a/pkg/local_object_storage/blobstor/fstree/getstream_test.go b/pkg/local_object_storage/blobstor/fstree/getstream_test.go index ae191482f7..dab5a51177 100644 --- a/pkg/local_object_storage/blobstor/fstree/getstream_test.go +++ b/pkg/local_object_storage/blobstor/fstree/getstream_test.go @@ -100,7 +100,7 @@ func TestGetStreamAfterErrors(t *testing.T) { t.Run("corrupt compressed data", func(t *testing.T) { compress := compression.Config{Enabled: true} require.NoError(t, compress.Init()) - tree.Config = &compress + tree.SetCompressor(&compress) addr := oidtest.Address() obj := objectSDK.New() @@ -118,7 +118,16 @@ func TestGetStreamAfterErrors(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, _, err = tree.GetStream(addr) - require.Error(t, err) + res, reader, err := tree.GetStream(addr) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, res.CutPayload(), res) + require.NotNil(t, reader) + + streamedPayload, err := io.ReadAll(reader) + // we use io.LimitReader to avoid reading the corrupted part + require.NoError(t, err) + require.Equal(t, streamedPayload, payload) + require.NoError(t, reader.Close()) }) } diff --git a/pkg/local_object_storage/blobstor/fstree/head.go b/pkg/local_object_storage/blobstor/fstree/head.go index d7a2eee7e1..3513ba5b35 100644 --- a/pkg/local_object_storage/blobstor/fstree/head.go +++ b/pkg/local_object_storage/blobstor/fstree/head.go @@ -124,8 +124,57 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*objectSDK.Objec // readHeaderAndPayload reads an object header from the file and returns reader for payload. // This function takes ownership of the io.ReadCloser and will close it if it does not return it. -func (t *FSTree) readHeaderAndPayload(f io.ReadCloser, initial []byte) (*objectSDK.Object, io.ReadSeekCloser, error) { +func (t *FSTree) readHeaderAndPayload(f io.ReadSeekCloser, initial []byte) (*objectSDK.Object, io.ReadSeekCloser, error) { var err error + var hLen, pLen uint32 + if len(initial) >= streamDataOff { + hLen, pLen = parseStreamPrefix(initial) + } else { + var p []byte + copy(p[:], initial) + _, err := io.ReadFull(f, p[len(initial):]) + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + return nil, f, fmt.Errorf("read stream prefix: %w", err) + } + hLen, pLen = parseStreamPrefix(p) + if hLen == 0 { + initial = p[:] + } + } + if hLen > 0 { + initial = initial[streamDataOff:] + var header []byte + if len(initial) < int(hLen) { + header = make([]byte, hLen) + copy(header, initial) + _, err = io.ReadFull(f, header[len(initial):]) + if err != nil { + return nil, nil, fmt.Errorf("read stream header: %w", err) + } + initial = header + } + header = initial[:hLen] + var obj objectSDK.Object + err = obj.Unmarshal(header) + if err != nil { + return nil, nil, fmt.Errorf("unmarshal object: %w", err) + } + + data := initial[hLen:] + reader := io.LimitReader(io.MultiReader(bytes.NewReader(data), f), int64(pLen)) + if t.IsCompressed(data) { + decoder, err := zstd.NewReader(reader) + if err != nil { + return nil, nil, fmt.Errorf("zstd decoder: %w", err) + } + reader = decoder.IOReadCloser() + } + return &obj, &payloadReader{ + Reader: reader, + close: f.Close, + }, nil + } + if len(initial) < objectSDK.MaxHeaderLen { _ = f.Close() initial, err = t.Decompress(initial) @@ -168,80 +217,98 @@ func (t *FSTree) readUntilPayload(f io.ReadCloser, initial []byte) (*objectSDK.O initial = buf[:n] } - obj, rest, err := extractHeaderAndPayload(initial) + var ( + obj object.Object + res objectSDK.Object + ) + + _, offset, err := extractHeaderAndPayload(initial, func(num int, val []byte) error { + switch num { + case fieldObjectID: + obj.ObjectId = new(refs.ObjectID) + err := proto.Unmarshal(val, obj.ObjectId) + if err != nil { + return fmt.Errorf("unmarshal object ID: %w", err) + } + case fieldObjectSignature: + obj.Signature = new(refs.Signature) + err := proto.Unmarshal(val, obj.Signature) + if err != nil { + return fmt.Errorf("unmarshal object signature: %w", err) + } + case fieldObjectHeader: + obj.Header = new(object.Header) + err := proto.Unmarshal(val, obj.Header) + if err != nil { + return fmt.Errorf("unmarshal object header: %w", err) + } + default: + return fmt.Errorf("unknown field number: %d", num) + } + return nil + }) if err != nil { _ = reader.Close() return nil, nil, fmt.Errorf("extract header and payload: %w", err) } - return obj, &payloadReader{ - Reader: io.MultiReader(bytes.NewReader(rest), reader), + err = res.FromProtoMessage(&obj) + if err != nil { + _ = reader.Close() + return nil, nil, fmt.Errorf("convert to objectSDK.Object: %w", err) + } + + return &res, &payloadReader{ + Reader: io.MultiReader(bytes.NewReader(initial[offset:]), reader), close: reader.Close, }, nil } -// extractHeaderAndPayload extracts the header of an object from the given byte slice and returns rest of the data. -func extractHeaderAndPayload(data []byte) (*objectSDK.Object, []byte, error) { - var ( - offset int - res objectSDK.Object - obj object.Object - ) +// extractHeaderAndPayload processes the initial data to extract the header and payload +// fields of an object. It calls the provided dataHandler for each field found in the data. +// It returns the start offset of the header, the end offset of the payload, and an error if any. +func extractHeaderAndPayload(data []byte, dataHandler func(int, []byte) error) (int, int, error) { + var offset, headerEnd int if len(data) == 0 { - return nil, nil, fmt.Errorf("empty data") + return 0, 0, fmt.Errorf("empty data") } for offset < len(data) { num, typ, n := protowire.ConsumeTag(data[offset:]) if err := protowire.ParseError(n); err != nil { - return nil, nil, fmt.Errorf("invalid tag at offset %d: %w", offset, err) + return 0, 0, fmt.Errorf("invalid tag at offset %d: %w", offset, err) } offset += n if typ != protowire.BytesType { - return nil, nil, fmt.Errorf("unexpected wire type: %v", typ) + return 0, 0, fmt.Errorf("unexpected wire type: %v", typ) } if num == fieldObjectPayload { + headerEnd = offset - n _, n = binary.Varint(data[offset:]) if err := protowire.ParseError(n); err != nil { - return nil, nil, fmt.Errorf("invalid varint at offset %d: %w", offset, err) + return 0, 0, fmt.Errorf("invalid varint at offset %d: %w", offset, err) } offset += n break } val, n := protowire.ConsumeBytes(data[offset:]) if err := protowire.ParseError(n); err != nil { - return nil, nil, fmt.Errorf("invalid bytes field at offset %d: %w", offset, err) + return 0, 0, fmt.Errorf("invalid bytes field at offset %d: %w", offset, err) } offset += n - switch num { - case fieldObjectID: - obj.ObjectId = new(refs.ObjectID) - err := proto.Unmarshal(val, obj.ObjectId) - if err != nil { - return nil, nil, fmt.Errorf("unmarshal object ID: %w", err) - } - case fieldObjectSignature: - obj.Signature = new(refs.Signature) - err := proto.Unmarshal(val, obj.Signature) + if dataHandler != nil { + err := dataHandler(int(num), val) if err != nil { - return nil, nil, fmt.Errorf("unmarshal object signature: %w", err) + return 0, 0, fmt.Errorf("data handler error at offset %d: %w", offset, err) } - case fieldObjectHeader: - obj.Header = new(object.Header) - err := proto.Unmarshal(val, obj.Header) - if err != nil { - return nil, nil, fmt.Errorf("unmarshal object header: %w", err) - } - default: - return nil, nil, fmt.Errorf("unknown field number: %d", num) } } - return &res, data[offset:], res.FromProtoMessage(&obj) + return headerEnd, offset, nil } type payloadReader struct {