Skip to content

Commit 243ea9c

Browse files
committed
sn/object: Initial support for erasure coding policies by GET server
Follow cd3e5d3 for GET. Closes #3422. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent be59b71 commit 243ea9c

File tree

14 files changed

+1469
-42
lines changed

14 files changed

+1469
-42
lines changed

cmd/neofs-node/object.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,9 @@ func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }
716716
// object holders. Resulting slices must not be changed.
717717
//
718718
// GetNodesForObject implements [getsvc.NeoFSNetwork].
719-
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
720-
return c.cfgObject.containerNodes.getNodesForObject(addr)
719+
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
720+
nodeSets, repRules, ecRules, err := c.cfgObject.containerNodes.getNodesForObject(addr)
721+
return nodeSets, repRules, ecRules, err
721722
}
722723

723724
type netmapSourceWithNodes struct {
@@ -805,7 +806,7 @@ type containerNodesSorter struct {
805806

806807
func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
807808
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
808-
func (x *containerNodesSorter) ECRules() []iec.Rule { return nil }
809+
func (x *containerNodesSorter) ECRules() []iec.Rule { return x.policy.ecRules }
809810
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
810811
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
811812
cacheKey.addr.SetContainer(x.cnrID)
@@ -822,6 +823,7 @@ func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo
822823
}
823824
}
824825
res.repCounts = x.policy.repCounts
826+
res.ecRules = x.policy.ecRules
825827
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
826828
if res.err != nil {
827829
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)

cmd/neofs-node/policy.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
lru "github.com/hashicorp/golang-lru/v2"
7+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
78
"github.com/nspcc-dev/neofs-node/pkg/core/container"
89
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
910
sdkcontainer "github.com/nspcc-dev/neofs-sdk-go/container"
@@ -17,6 +18,7 @@ import (
1718
type storagePolicyRes struct {
1819
nodeSets [][]netmapsdk.NodeInfo
1920
repCounts []uint
21+
ecRules []iec.Rule
2022
err error
2123
}
2224

@@ -152,33 +154,34 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
152154
// the underlying storage, applies the storage policy to it and returns sorted
153155
// lists of selected storage nodes along with the per-list numbers of primary
154156
// object holders. Resulting slices must not be changed.
155-
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
157+
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
156158
curEpoch, err := x.network.Epoch()
157159
if err != nil {
158-
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
160+
return nil, nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
159161
}
160162
cacheKey := objectNodesCacheKey{curEpoch, addr}
161163
res, ok := x.objCache.Get(cacheKey)
162164
if ok {
163-
return res.nodeSets, res.repCounts, res.err
165+
return res.nodeSets, res.repCounts, res.ecRules, res.err
164166
}
165167
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
166168
if err != nil {
167-
return nil, nil, err
169+
return nil, nil, nil, err
168170
}
169171
if networkMap == nil {
170172
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
171173
// non-persistent error => do not cache
172-
return nil, nil, fmt.Errorf("read network map by epoch: %w", err)
174+
return nil, nil, nil, fmt.Errorf("read network map by epoch: %w", err)
173175
}
174176
}
175177
res.repCounts = cnrRes.repCounts
178+
res.ecRules = cnrRes.ecRules
176179
res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object())
177180
if res.err != nil {
178181
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
179182
}
180183
x.objCache.Add(cacheKey, res)
181-
return res.nodeSets, res.repCounts, res.err
184+
return res.nodeSets, res.repCounts, res.ecRules, res.err
182185
}
183186

184187
func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {

cmd/neofs-node/policy_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
520520
require.NoError(t, err)
521521

522522
for n := 1; n < 10; n++ {
523-
_, _, err = ns.getNodesForObject(anyAddr)
523+
_, _, _, err = ns.getNodesForObject(anyAddr)
524524
require.ErrorIs(t, err, epochErr)
525525
require.EqualError(t, err, "read current NeoFS epoch: any epoch error")
526526
// such error must not be cached
@@ -534,7 +534,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
534534
require.NoError(t, err)
535535

536536
for n := 1; n < 10; n++ {
537-
_, _, err = ns.getNodesForObject(anyAddr)
537+
_, _, _, err = ns.getNodesForObject(anyAddr)
538538
require.ErrorIs(t, err, cnrErr)
539539
require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error")
540540
// such error must not be cached
@@ -548,7 +548,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
548548
require.NoError(t, err)
549549

550550
for n := 1; n <= 10; n++ {
551-
_, _, err = ns.getNodesForObject(anyAddr)
551+
_, _, _, err = ns.getNodesForObject(anyAddr)
552552
require.ErrorIs(t, err, curNetmapErr)
553553
require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error")
554554
network.assertEpochCallCount(t, n)
@@ -569,7 +569,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
569569
require.NoError(t, err)
570570

571571
for n := 1; n <= 10; n++ {
572-
_, _, err = ns.getNodesForObject(anyAddr)
572+
_, _, _, err = ns.getNodesForObject(anyAddr)
573573
require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr))
574574
network.assertEpochCallCount(t, n)
575575
// assert results are cached
@@ -592,7 +592,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
592592
}
593593

594594
for n := 1; n <= 10; n++ {
595-
_, _, err = ns.getNodesForObject(anyAddr)
595+
_, _, _, err = ns.getNodesForObject(anyAddr)
596596
require.EqualError(t, err, "select container nodes for current epoch #42: "+
597597
"invalid result of container's storage policy application to the network map: "+
598598
"diff number of storage node sets (4) and required replica descriptors (2)")
@@ -622,7 +622,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
622622
}
623623

624624
for n := 1; n <= 10; n++ {
625-
_, _, err = ns.getNodesForObject(anyAddr)
625+
_, _, _, err = ns.getNodesForObject(anyAddr)
626626
require.EqualError(t, err, "select container nodes for current epoch #42: "+
627627
"invalid result of container's storage policy application to the network map: "+
628628
"invalid storage node set #1: number of nodes (1) is less than minimum required by the container policy (2)")
@@ -647,7 +647,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
647647
}
648648

649649
for n := 1; n <= 10; n++ {
650-
_, _, err = ns.getNodesForObject(anyAddr)
650+
_, _, _, err = ns.getNodesForObject(anyAddr)
651651
require.EqualError(t, err, "select container nodes for current epoch #42: "+
652652
"invalid result of container's storage policy application to the network map: "+
653653
"diff number of storage node sets (4) and required replica descriptors (2)")
@@ -677,7 +677,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
677677
}
678678

679679
for n := 1; n <= 10; n++ {
680-
_, _, err = ns.getNodesForObject(anyAddr)
680+
_, _, _, err = ns.getNodesForObject(anyAddr)
681681
require.EqualError(t, err, "sort container nodes for object: any sort error")
682682
network.assertEpochCallCount(t, n)
683683
// assert results are cached
@@ -700,7 +700,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
700700
require.NoError(t, err)
701701

702702
for n := 1; n <= 10; n++ {
703-
nodeLists, primCounts, err := ns.getNodesForObject(anyAddr)
703+
nodeLists, primCounts, _, err := ns.getNodesForObject(anyAddr)
704704
require.NoError(t, err)
705705
require.Len(t, primCounts, 4)
706706
require.Len(t, nodeLists, 4)

internal/ec/ec.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ func Decode(rule Rule, dataLen uint64, parts [][]byte) ([]byte, error) {
7575
return nil, fmt.Errorf("restore Reed-Solomon: %w", err)
7676
}
7777

78+
return ConcatDataParts(rule, dataLen, parts), nil
79+
}
80+
81+
// ConcatDataParts returns a new slice of dataLen bytes originating given EC
82+
// parts according to rule.
83+
//
84+
// Panics if there are less than [Rule.DataPartNum] parts.
85+
func ConcatDataParts(rule Rule, dataLen uint64, parts [][]byte) []byte {
7886
// TODO: last part may be shorter, do not overallocate buffer.
79-
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen], nil
87+
// FIXME: can panic if less than dataLen passed. Return error or check on callers' side.
88+
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen]
8089
}

internal/ec/ec_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ec_test
22

33
import (
4+
"bytes"
5+
"fmt"
46
"testing"
57

68
"github.com/klauspost/reedsolomon"
@@ -74,3 +76,44 @@ func TestEncode(t *testing.T) {
7476
})
7577
}
7678
}
79+
80+
func testConcatDataParts(t *testing.T, ln uint64) {
81+
data := testutil.RandByteSlice(ln)
82+
83+
for _, rule := range []iec.Rule{
84+
{DataPartNum: 3, ParityPartNum: 1},
85+
{DataPartNum: 12, ParityPartNum: 4},
86+
} {
87+
t.Run(fmt.Sprintf("rule=%s", rule.String()), func(t *testing.T) {
88+
var parts [][]byte
89+
if ln > 0 {
90+
rs, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
91+
require.NoError(t, err)
92+
93+
parts, err = rs.Split(data)
94+
require.NoError(t, err)
95+
} else {
96+
parts = make([][]byte, rule.DataPartNum+rule.ParityPartNum)
97+
}
98+
99+
got := iec.ConcatDataParts(rule, ln, parts)
100+
require.True(t, bytes.Equal(data, got))
101+
})
102+
}
103+
}
104+
105+
func TestConcatDataParts(t *testing.T) {
106+
for _, ln := range []uint64{
107+
0,
108+
1,
109+
100,
110+
1 << 10,
111+
1<<10 + 1,
112+
10 << 10,
113+
10<<10 + 1,
114+
} {
115+
t.Run(fmt.Sprintf("len=%d", ln), func(t *testing.T) {
116+
testConcatDataParts(t, ln)
117+
})
118+
}
119+
}

pkg/services/object/get/container.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@ func (exec *execCtx) executeOnContainer() {
1717

1818
exec.log.Debug("trying to execute in container...")
1919

20-
nodeLists, primaryCounts, err := exec.svc.neoFSNet.GetNodesForObject(exec.address())
21-
if err != nil {
22-
exec.status = statusUndefined
23-
exec.err = err
24-
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
25-
return
20+
addr := exec.address()
21+
22+
nodeLists := exec.nodeLists
23+
primaryCounts := exec.repRules
24+
if nodeLists == nil {
25+
var err error
26+
nodeLists, primaryCounts, _, err = exec.svc.neoFSNet.GetNodesForObject(addr)
27+
if err != nil {
28+
exec.status = statusUndefined
29+
exec.err = err
30+
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
31+
return
32+
}
2633
}
2734

2835
ctx, cancel := context.WithCancel(exec.context())
@@ -34,7 +41,7 @@ func (exec *execCtx) executeOnContainer() {
3441
var j, jLim uint
3542
primary := true
3643

37-
for i := 0; i < len(nodeLists); i++ { // do not use for-range!
44+
for i := 0; i < len(primaryCounts); i++ { // do not use for-range!
3845
if primary {
3946
j, jLim = 0, primaryCounts[i]
4047
} else {
@@ -60,7 +67,7 @@ func (exec *execCtx) executeOnContainer() {
6067

6168
mProcessedNodes[strKey] = struct{}{}
6269

63-
if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
70+
if err := endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
6471
// critical error that may ultimately block the storage service. Normally it
6572
// should not appear because entry into the network map under strict control
6673
exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
@@ -81,7 +88,7 @@ func (exec *execCtx) executeOnContainer() {
8188
}
8289
}
8390

84-
if primary && i == len(nodeLists)-1 {
91+
if primary && i == len(primaryCounts)-1 {
8592
// switch to reserve nodes
8693
primary = false
8794
i = -1

0 commit comments

Comments
 (0)