Skip to content

Commit 17e09c9

Browse files
committed
WIP: decoding in GET service
Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 2917378 commit 17e09c9

File tree

15 files changed

+943
-24
lines changed

15 files changed

+943
-24
lines changed

cmd/neofs-node/object.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,9 @@ func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }
716716
// object holders. Resulting slices must not be changed.
717717
//
718718
// GetNodesForObject implements [getsvc.NeoFSNetwork].
719-
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
720-
return c.cfgObject.containerNodes.getNodesForObject(addr)
719+
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
720+
nodeSets, repRules, err := c.cfgObject.containerNodes.getNodesForObject(addr)
721+
return nodeSets, repRules, nil, err
721722
}
722723

723724
type netmapSourceWithNodes struct {

internal/ec/ec.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ func Decode(rule Rule, dataLen uint64, parts [][]byte) ([]byte, error) {
7575
return nil, fmt.Errorf("restore Reed-Solomon: %w", err)
7676
}
7777

78+
return ConcatDataParts(rule, dataLen, parts), nil
79+
}
80+
81+
// ConcatDataParts returns a new slice of dataLen bytes originating given EC
82+
// parts according to rule.
83+
//
84+
// Panics if there are less than [Rule.DataPartNum] parts.
85+
func ConcatDataParts(rule Rule, dataLen uint64, parts [][]byte) []byte {
7886
// TODO: last part may be shorter, do not overallocate buffer.
79-
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen], nil
87+
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen]
8088
}

internal/ec/ec_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ec_test
22

33
import (
4+
"bytes"
5+
"fmt"
46
"testing"
57

68
"github.com/klauspost/reedsolomon"
@@ -74,3 +76,44 @@ func TestEncode(t *testing.T) {
7476
})
7577
}
7678
}
79+
80+
func testConcatDataParts(t *testing.T, ln uint64) {
81+
data := testutil.RandByteSlice(ln)
82+
83+
for _, rule := range []iec.Rule{
84+
{DataPartNum: 3, ParityPartNum: 1},
85+
{DataPartNum: 12, ParityPartNum: 4},
86+
} {
87+
t.Run(fmt.Sprintf("rule=%s", rule.String()), func(t *testing.T) {
88+
var parts [][]byte
89+
if ln > 0 {
90+
rs, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
91+
require.NoError(t, err)
92+
93+
parts, err = rs.Split(data)
94+
require.NoError(t, err)
95+
} else {
96+
parts = make([][]byte, rule.DataPartNum+rule.ParityPartNum)
97+
}
98+
99+
got := iec.ConcatDataParts(rule, ln, parts)
100+
require.True(t, bytes.Equal(data, got))
101+
})
102+
}
103+
}
104+
105+
func TestConcatDataParts(t *testing.T) {
106+
for _, ln := range []uint64{
107+
0,
108+
1,
109+
100,
110+
1 << 10,
111+
1<<10 + 1,
112+
10 << 10,
113+
10<<10 + 1,
114+
} {
115+
t.Run(fmt.Sprintf("len=%d", ln), func(t *testing.T) {
116+
testConcatDataParts(t, ln)
117+
})
118+
}
119+
}

pkg/local_object_storage/engine/ec.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package engine
2+
3+
import (
4+
"fmt"
5+
6+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
7+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
8+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
9+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
10+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
11+
"go.uber.org/zap"
12+
)
13+
14+
// TODO:: docs.
15+
func (e *StorageEngine) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (objectSDK.Object, error) {
16+
// TODO: keep in sync with https://github.com/nspcc-dev/neofs-node/pull/3466.
17+
// TODO: metrics and blockErr like Get
18+
19+
// TODO: sync placement with PUT. They must sort shard equally
20+
shs := e.sortedShards(oid.NewAddress(cnr, parent))
21+
for i := range shs {
22+
obj, err := shs[i].GetECPart(cnr, parent, pi)
23+
if err == nil {
24+
return obj, nil
25+
}
26+
// TODO: debug if 404
27+
e.log.Info("failed to get EC part from shard", zap.Stringer("shardID", shs[i].ID()), zap.Stringer("container", cnr), zap.Stringer("parent", parent),
28+
zap.Int("ruleIdx", pi.RuleIndex), zap.Int("partIdx", pi.Index), zap.Error(err))
29+
// FIXME: some errors like 'akready removed' must abort. Make closer to StorageEngine.Get
30+
}
31+
32+
return objectSDK.Object{}, fmt.Errorf("%w: all shards failed", apistatus.ErrObjectNotFound)
33+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package engine
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
7+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
8+
"github.com/nspcc-dev/neofs-node/internal/testutil"
9+
"github.com/nspcc-dev/neofs-sdk-go/checksum"
10+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
11+
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
12+
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
13+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
14+
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestStorageEngine_GetECPartByIdx(t *testing.T) {
19+
for _, shardNum := range []int{1, 5} {
20+
t.Run("shards="+strconv.Itoa(shardNum), func(t *testing.T) {
21+
testGetECPart(t, shardNum)
22+
})
23+
}
24+
}
25+
26+
func testGetECPart(t *testing.T, shardNum int) {
27+
cnr := cidtest.ID()
28+
parentID := oidtest.ID()
29+
const ruleIdx = 123
30+
const partIdx = 456
31+
32+
var parent objectsdk.Object
33+
parent.SetContainerID(cnr)
34+
parent.SetID(parentID)
35+
parent.SetOwner(usertest.ID())
36+
parent.SetPayloadChecksum(checksum.NewSHA256([32]byte(testutil.RandByteSlice(32))))
37+
38+
part := parent
39+
part.SetID(oidtest.OtherID(parentID))
40+
part.SetParent(&parent)
41+
part.SetAttributes(
42+
objectsdk.NewAttribute("__NEOFS__EC_RULE_IDX", strconv.Itoa(ruleIdx)),
43+
objectsdk.NewAttribute("__NEOFS__EC_PART_IDX", strconv.Itoa(partIdx)),
44+
)
45+
46+
s := testNewEngineWithShardNum(t, shardNum)
47+
48+
checkMissingIdxs := func(t *testing.T, ruleIdx, partIdx int) {
49+
_, err := s.GetECPart(cnr, parentID, iec.PartInfo{
50+
RuleIndex: ruleIdx,
51+
Index: partIdx,
52+
})
53+
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
54+
}
55+
56+
checkMissingIdxs(t, ruleIdx-1, partIdx-1)
57+
checkMissingIdxs(t, ruleIdx-1, partIdx)
58+
checkMissingIdxs(t, ruleIdx-1, partIdx+1)
59+
checkMissingIdxs(t, ruleIdx, partIdx-1)
60+
checkMissingIdxs(t, ruleIdx, partIdx)
61+
checkMissingIdxs(t, ruleIdx, partIdx+1)
62+
checkMissingIdxs(t, ruleIdx+1, partIdx-1)
63+
checkMissingIdxs(t, ruleIdx+1, partIdx)
64+
checkMissingIdxs(t, ruleIdx+1, partIdx+1)
65+
66+
require.NoError(t, s.Put(&part, nil))
67+
68+
checkMissingIdxs(t, ruleIdx-1, partIdx-1)
69+
checkMissingIdxs(t, ruleIdx-1, partIdx)
70+
checkMissingIdxs(t, ruleIdx-1, partIdx+1)
71+
checkMissingIdxs(t, ruleIdx, partIdx-1)
72+
checkMissingIdxs(t, ruleIdx, partIdx+1)
73+
checkMissingIdxs(t, ruleIdx+1, partIdx-1)
74+
checkMissingIdxs(t, ruleIdx+1, partIdx)
75+
checkMissingIdxs(t, ruleIdx+1, partIdx+1)
76+
77+
got, err := s.GetECPart(cnr, parentID, iec.PartInfo{
78+
RuleIndex: ruleIdx,
79+
Index: partIdx,
80+
})
81+
require.NoError(t, err)
82+
require.Equal(t, part, got)
83+
}

pkg/local_object_storage/metabase/db_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package meta_test
22

33
import (
44
"crypto/rand"
5+
"crypto/sha256"
56
"os"
67
"path"
78
"strconv"
89
"testing"
910

11+
"github.com/nspcc-dev/neofs-node/internal/testutil"
1012
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
1113
"github.com/nspcc-dev/neofs-sdk-go/checksum"
1214
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
@@ -130,3 +132,12 @@ func setExpiration(o *object.Object, epoch uint64) {
130132

131133
o.SetAttributes(append(o.Attributes(), attr)...)
132134
}
135+
136+
func newBlankObject(cnr cid.ID, id oid.ID) object.Object {
137+
var obj object.Object
138+
obj.SetContainerID(cnr)
139+
obj.SetID(id)
140+
obj.SetOwner(usertest.ID())
141+
obj.SetPayloadChecksum(checksum.NewSHA256([sha256.Size]byte(testutil.RandByteSlice(sha256.Size))))
142+
return obj
143+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package meta
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"slices"
7+
"strconv"
8+
9+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
10+
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
11+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
12+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
13+
"github.com/nspcc-dev/neofs-sdk-go/object"
14+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
15+
"go.etcd.io/bbolt"
16+
)
17+
18+
// ResolveECPart resolves object that carries EC part produced within cnr for
19+
// parent object and indexed by pi, checks its availability and returns its ID.
20+
//
21+
// If object is missing, ResolveECPart returns [apistatus.ErrObjectNotFound].
22+
//
23+
// If object has expired, ResolveECPart returns [ErrObjectIsExpired].
24+
//
25+
// If object exists but tombstoned (via [DB.Inhume] or stored tombstone object),
26+
// ResolveECPart returns [apistatus.ErrObjectAlreadyRemoved].
27+
//
28+
// If object is marked as garbage (via [DB.MarkGarbage]), ResolveECPart returns
29+
// [apistatus.ErrObjectNotFound].
30+
//
31+
// If object is locked (via [DB.Lock] or stored locker object), ResolveECPart
32+
// ignores expiration, tombstone and garbage marks.
33+
func (db *DB) ResolveECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (oid.ID, error) {
34+
db.modeMtx.RLock()
35+
defer db.modeMtx.RUnlock()
36+
if db.mode.NoMetabase() {
37+
return oid.ID{}, ErrDegradedMode
38+
}
39+
40+
// TODO: check already removed and expires cases.
41+
var res oid.ID
42+
err := db.boltDB.View(func(tx *bbolt.Tx) error {
43+
var err error
44+
res, err = db.resolveECPartTx(tx, cnr, parent, pi)
45+
return err
46+
})
47+
return res, err
48+
}
49+
50+
func (db *DB) resolveECPartTx(tx *bbolt.Tx, cnr cid.ID, parent oid.ID, pi iec.PartInfo) (oid.ID, error) {
51+
metaBkt := tx.Bucket(metaBucketKey(cnr))
52+
if metaBkt == nil {
53+
return oid.ID{}, apistatus.ErrObjectNotFound
54+
}
55+
metaBktCursor := metaBkt.Cursor()
56+
57+
id, err := db.resolveECPartInMetaBucket(metaBkt, metaBktCursor, parent, pi)
58+
if err != nil {
59+
return oid.ID{}, err
60+
}
61+
62+
switch objectStatus(tx, metaBktCursor, oid.NewAddress(cnr, id), db.epochState.CurrentEpoch()) {
63+
case statusGCMarked:
64+
return oid.ID{}, apistatus.ErrObjectNotFound
65+
case statusTombstoned:
66+
return oid.ID{}, apistatus.ErrObjectAlreadyRemoved
67+
case statusExpired:
68+
return oid.ID{}, ErrObjectIsExpired
69+
}
70+
71+
return id, nil
72+
}
73+
74+
func (db *DB) resolveECPartInMetaBucket(bkt *bbolt.Bucket, crs *bbolt.Cursor, parent oid.ID, pi iec.PartInfo) (oid.ID, error) {
75+
pref := slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(object.FilterParentID), objectcore.MetaAttributeDelimiter,
76+
parent[:], objectcore.MetaAttributeDelimiter,
77+
)
78+
79+
k, _ := crs.Seek(pref)
80+
partID, ok := bytes.CutPrefix(k, pref)
81+
if !ok {
82+
return oid.ID{}, apistatus.ErrObjectNotFound
83+
}
84+
if len(partID) != oid.Size {
85+
return oid.ID{}, invalidMetaBucketKeyErr(k, fmt.Errorf("wrong OID len %d", len(partID)))
86+
}
87+
88+
// TODO: make one buffer for all keys
89+
k = slices.Concat([]byte{metaPrefixIDAttr}, partID, []byte(iec.AttributeRuleIdx), objectcore.MetaAttributeDelimiter, []byte(strconv.Itoa(pi.RuleIndex)))
90+
if bkt.Get(k) == nil {
91+
return oid.ID{}, apistatus.ErrObjectNotFound
92+
}
93+
94+
k = slices.Concat([]byte{metaPrefixIDAttr}, partID, []byte(iec.AttributePartIdx), objectcore.MetaAttributeDelimiter, []byte(strconv.Itoa(pi.Index)))
95+
if bkt.Get(k) == nil {
96+
return oid.ID{}, apistatus.ErrObjectNotFound
97+
}
98+
99+
return oid.ID(partID), nil
100+
}

0 commit comments

Comments
 (0)