@@ -22,6 +22,7 @@ import (
22
22
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
23
23
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
24
24
"go.uber.org/zap"
25
+ "google.golang.org/protobuf/encoding/protowire"
25
26
)
26
27
27
28
// FSTree represents an object storage as a filesystem tree.
@@ -90,6 +91,26 @@ const (
90
91
// combinedDataOff is the offset from the start of the combined prefix to object data.
91
92
// It's also the length of the prefix in total.
92
93
combinedDataOff = combinedLengthOff + combinedLenSize
94
+
95
+ // streamPrefix is the prefix for streamed objects. It is used to distinguish
96
+ // streamed objects from regular ones.
97
+ streamPrefix = 0x7e
98
+
99
+ // streamLenHeaderOff is the offset from the start of the stream prefix to
100
+ // the length of the header data.
101
+ streamLenHeaderOff = 2
102
+
103
+ // streamLenSize is sizeof(uint32), length of a serialized 32-bit BE integer
104
+ // that represents the length of the header or payload data.
105
+ streamLenSize = 4
106
+
107
+ // streamLenDataOff is the offset from the start of the stream prefix to
108
+ // the length of the data.
109
+ streamLenDataOff = streamLenHeaderOff + streamLenSize
110
+
111
+ // streamDataOff is the offset from the start of the stream prefix to the
112
+ // start of the data. It is used to read the data after the header.
113
+ streamDataOff = streamLenDataOff + streamLenSize
93
114
)
94
115
95
116
var _ common.Storage = (* FSTree )(nil )
@@ -329,7 +350,7 @@ func (t *FSTree) getPath(addr oid.Address) (string, error) {
329
350
}
330
351
331
352
// Put puts an object in the storage.
332
- func (t * FSTree ) Put (addr oid.Address , data []byte ) error {
353
+ func (t * FSTree ) Put (addr oid.Address , data []byte , header [] byte ) error {
333
354
if t .readOnly {
334
355
return common .ErrReadOnly
335
356
}
@@ -339,7 +360,7 @@ func (t *FSTree) Put(addr oid.Address, data []byte) error {
339
360
if err := util .MkdirAllX (filepath .Dir (p ), t .Permissions ); err != nil {
340
361
return fmt .Errorf ("mkdirall for %q: %w" , p , err )
341
362
}
342
- data = t .Compress ( data )
363
+ data = t .joinHeaderAndPayload ( header , data )
343
364
344
365
err := t .writer .writeData (addr .Object (), p , data )
345
366
if err != nil {
@@ -349,7 +370,7 @@ func (t *FSTree) Put(addr oid.Address, data []byte) error {
349
370
}
350
371
351
372
// PutBatch puts a batch of objects in the storage.
352
- func (t * FSTree ) PutBatch (objs map [oid.Address ][]byte ) error {
373
+ func (t * FSTree ) PutBatch (objs map [oid.Address ][2 ][ ]byte ) error {
353
374
if t .readOnly {
354
375
return common .ErrReadOnly
355
376
}
@@ -363,7 +384,7 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error {
363
384
writeDataUnits = append (writeDataUnits , writeDataUnit {
364
385
id : addr .Object (),
365
386
path : p ,
366
- data : t .Compress (data ),
387
+ data : t .joinHeaderAndPayload (data [ 1 ], data [ 0 ] ),
367
388
})
368
389
}
369
390
@@ -375,6 +396,28 @@ func (t *FSTree) PutBatch(objs map[oid.Address][]byte) error {
375
396
return nil
376
397
}
377
398
399
+ // joinHeaderAndPayload combines header and payload into a single byte slice.
400
+ func (t * FSTree ) joinHeaderAndPayload (header , payload []byte ) []byte {
401
+ if header == nil {
402
+ return payload
403
+ }
404
+
405
+ hLen := len (header )
406
+ payload = t .Compress (payload )
407
+ pLen := len (payload )
408
+
409
+ data := make ([]byte , hLen + pLen + streamDataOff )
410
+ data [0 ] = streamPrefix
411
+ data [1 ] = 0 // version 0
412
+ binary .BigEndian .PutUint32 (data [streamLenHeaderOff :], uint32 (hLen ))
413
+ binary .BigEndian .PutUint32 (data [streamLenDataOff :], uint32 (pLen ))
414
+
415
+ copy (data [streamDataOff :], header )
416
+ copy (data [streamDataOff + hLen :], payload )
417
+
418
+ return data
419
+ }
420
+
378
421
// Get returns an object from the storage by address.
379
422
func (t * FSTree ) Get (addr oid.Address ) (* objectSDK.Object , error ) {
380
423
data , err := t .getObjBytes (addr )
@@ -433,6 +476,16 @@ func parseCombinedPrefix(p []byte) ([]byte, uint32) {
433
476
binary .BigEndian .Uint32 (p [combinedLengthOff :combinedDataOff ])
434
477
}
435
478
479
+ // parseStreamPrefix checks the given byte slice for stream prefix and returns
480
+ // the length of the header and data if so (0, 0 otherwise).
481
+ func parseStreamPrefix (p []byte ) (uint32 , uint32 ) {
482
+ if p [0 ] != streamPrefix || p [1 ] != 0 { // Only version 0 is supported now.
483
+ return 0 , 0
484
+ }
485
+ return binary .BigEndian .Uint32 (p [streamLenHeaderOff :streamLenDataOff ]),
486
+ binary .BigEndian .Uint32 (p [streamLenDataOff :])
487
+ }
488
+
436
489
func (t * FSTree ) extractCombinedObject (id oid.ID , f * os.File ) ([]byte , error ) {
437
490
var (
438
491
comBuf [combinedDataOff ]byte
@@ -485,6 +538,23 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
485
538
return nil , fmt .Errorf ("read: %w" , err )
486
539
}
487
540
data = data [:len (initial )+ n ]
541
+ hLen , _ := parseStreamPrefix (data )
542
+ if hLen > 0 {
543
+ data = data [streamDataOff :]
544
+ payload , err := t .Decompress (data [hLen :])
545
+ if err != nil {
546
+ return nil , fmt .Errorf ("decompress payload: %w" , err )
547
+ }
548
+ pLen := len (payload )
549
+ payloadNum := protowire .Number (4 )
550
+ n := protowire .SizeTag (payloadNum ) + protowire .SizeVarint (uint64 (pLen ))
551
+ buf := make ([]byte , int (hLen )+ pLen + n )
552
+ copy (buf [:hLen ], data )
553
+ off := binary .PutUvarint (buf [hLen :], protowire .EncodeTag (payloadNum , protowire .BytesType )) + int (hLen )
554
+ off += binary .PutUvarint (buf [off :], uint64 (pLen ))
555
+ copy (buf [off :], payload )
556
+ data = buf
557
+ }
488
558
489
559
return t .Decompress (data )
490
560
}
0 commit comments