@@ -22,6 +22,7 @@ import (
2222 objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
2323 oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
2424 "go.uber.org/zap"
25+ "google.golang.org/protobuf/encoding/protowire"
2526)
2627
2728// FSTree represents an object storage as a filesystem tree.
@@ -90,6 +91,26 @@ const (
9091 // combinedDataOff is the offset from the start of the combined prefix to object data.
9192 // It's also the length of the prefix in total.
9293 combinedDataOff = combinedLengthOff + combinedLenSize
94+
95+ // streamPrefix is the prefix for streamed objects. It is used to distinguish
96+ // streamed objects from regular ones.
97+ streamPrefix = 0x7e
98+
99+ // streamLenHeaderOff is the offset from the start of the stream prefix to
100+ // the length of the header data.
101+ streamLenHeaderOff = 2
102+
103+ // streamLenSize is sizeof(uint32), length of a serialized 32-bit BE integer
104+ // that represents the length of the header or payload data.
105+ streamLenSize = 4
106+
107+ // streamLenDataOff is the offset from the start of the stream prefix to
108+ // the length of the data.
109+ streamLenDataOff = streamLenHeaderOff + streamLenSize
110+
111+ // streamDataOff is the offset from the start of the stream prefix to the
112+ // start of the data. It is used to read the data after the header.
113+ streamDataOff = streamLenDataOff + streamLenSize
93114)
94115
95116var _ common.Storage = (* FSTree )(nil )
@@ -329,7 +350,7 @@ func (t *FSTree) getPath(addr oid.Address) (string, error) {
329350}
330351
331352// Put puts an object in the storage.
332- func (t * FSTree ) Put (addr oid.Address , data []byte ) error {
353+ func (t * FSTree ) Put (addr oid.Address , data []byte , header [] byte ) error {
333354 if t .readOnly {
334355 return common .ErrReadOnly
335356 }
@@ -339,7 +360,7 @@ func (t *FSTree) Put(addr oid.Address, data []byte) error {
339360 if err := util .MkdirAllX (filepath .Dir (p ), t .Permissions ); err != nil {
340361 return fmt .Errorf ("mkdirall for %q: %w" , p , err )
341362 }
342- data = t .Compress ( data )
363+ data = t .joinHeaderAndPayload ( header , data )
343364
344365 err := t .writer .writeData (addr .Object (), p , data )
345366 if err != nil {
@@ -349,7 +370,7 @@ func (t *FSTree) Put(addr oid.Address, data []byte) error {
349370}
350371
351372// PutBatch puts a batch of objects in the storage.
352- func (t * FSTree ) PutBatch (objs map [oid.Address ][]byte ) error {
373+ func (t * FSTree ) PutBatch (objs map [oid.Address ][2 ][ ]byte ) error {
353374 if t .readOnly {
354375 return common .ErrReadOnly
355376 }
@@ -363,7 +384,7 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error {
363384 writeDataUnits = append (writeDataUnits , writeDataUnit {
364385 id : addr .Object (),
365386 path : p ,
366- data : t .Compress (data ),
387+ data : t .joinHeaderAndPayload (data [ 1 ], data [ 0 ] ),
367388 })
368389 }
369390
@@ -375,6 +396,28 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error {
375396 return nil
376397}
377398
399+ // joinHeaderAndPayload combines header and payload into a single byte slice.
400+ func (t * FSTree ) joinHeaderAndPayload (header , payload []byte ) []byte {
401+ if header == nil {
402+ return payload
403+ }
404+
405+ hLen := len (header )
406+ payload = t .Compress (payload )
407+ pLen := len (payload )
408+
409+ data := make ([]byte , hLen + pLen + streamDataOff )
410+ data [0 ] = streamPrefix
411+ data [1 ] = 0 // version 0
412+ binary .BigEndian .PutUint32 (data [streamLenHeaderOff :], uint32 (hLen ))
413+ binary .BigEndian .PutUint32 (data [streamLenDataOff :], uint32 (pLen ))
414+
415+ copy (data [streamDataOff :], header )
416+ copy (data [streamDataOff + hLen :], payload )
417+
418+ return data
419+ }
420+
378421// Get returns an object from the storage by address.
379422func (t * FSTree ) Get (addr oid.Address ) (* objectSDK.Object , error ) {
380423 data , err := t .getObjBytes (addr )
@@ -433,6 +476,16 @@ func parseCombinedPrefix(p []byte) ([]byte, uint32) {
433476 binary .BigEndian .Uint32 (p [combinedLengthOff :combinedDataOff ])
434477}
435478
479+ // parseStreamPrefix checks the given byte slice for stream prefix and returns
480+ // the length of the header and data if so (0, 0 otherwise).
481+ func parseStreamPrefix (p []byte ) (uint32 , uint32 ) {
482+ if p [0 ] != streamPrefix || p [1 ] != 0 { // Only version 0 is supported now.
483+ return 0 , 0
484+ }
485+ return binary .BigEndian .Uint32 (p [streamLenHeaderOff :streamLenDataOff ]),
486+ binary .BigEndian .Uint32 (p [streamLenDataOff :])
487+ }
488+
436489func (t * FSTree ) extractCombinedObject (id oid.ID , f * os.File ) ([]byte , error ) {
437490 var (
438491 comBuf [combinedDataOff ]byte
@@ -485,6 +538,23 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
485538 return nil , fmt .Errorf ("read: %w" , err )
486539 }
487540 data = data [:len (initial )+ n ]
541+ hLen , _ := parseStreamPrefix (data )
542+ if hLen > 0 {
543+ data = data [streamDataOff :]
544+ payload , err := t .Decompress (data [hLen :])
545+ if err != nil {
546+ return nil , fmt .Errorf ("decompress payload: %w" , err )
547+ }
548+ pLen := len (payload )
549+ payloadNum := protowire .Number (4 )
550+ n := protowire .SizeTag (payloadNum ) + protowire .SizeVarint (uint64 (pLen ))
551+ buf := make ([]byte , int (hLen )+ pLen + n )
552+ copy (buf [:hLen ], data )
553+ off := binary .PutUvarint (buf [hLen :], protowire .EncodeTag (payloadNum , protowire .BytesType )) + int (hLen )
554+ off += binary .PutUvarint (buf [off :], uint64 (pLen ))
555+ copy (buf [off :], payload )
556+ data = buf
557+ }
488558
489559 return t .Decompress (data )
490560}
0 commit comments