diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index f1e9ab4c0c..ecf1ac0066 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" + iec "github.com/nspcc-dev/neofs-node/internal/ec" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -49,7 +50,7 @@ import ( ) type objectSvc struct { - put *putsvc.Service + *putsvc.Service search *searchsvc.Service @@ -69,10 +70,6 @@ func (c *cfg) MaxObjectSize() uint64 { return sz } -func (s *objectSvc) Put(ctx context.Context) (*putsvc.Streamer, error) { - return s.put.Put(ctx) -} - func (s *objectSvc) Head(ctx context.Context, prm getsvc.HeadPrm) error { return s.get.Head(ctx, prm) } @@ -276,10 +273,10 @@ func initObjectService(c *cfg) { ) objSvc := &objectSvc{ - put: sPut, - search: sSearch, - get: sGet, - delete: sDelete, + Service: sPut, + search: sSearch, + get: sGet, + delete: sDelete, } // cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme @@ -804,6 +801,7 @@ type containerNodesSorter struct { func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets } func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts } +func (x *containerNodesSorter) ECRules() []iec.Rule { return nil } func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) { cacheKey := objectNodesCacheKey{epoch: x.curEpoch} cacheKey.addr.SetContainer(x.cnrID) diff --git a/go.mod b/go.mod index d666d4e75f..f2392af354 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,12 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.11 + github.com/klauspost/reedsolomon v1.12.4 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.2 + github.com/mxschmitt/golang-combinations v1.2.0 github.com/nspcc-dev/hrw/v2 v2.0.3 github.com/nspcc-dev/locode-db v0.6.0 github.com/nspcc-dev/neo-go v0.111.0 @@ -62,7 +64,7 @@ require ( github.com/holiman/uint256 v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/sha256-simd v1.0.1 // indirect diff --git a/go.sum b/go.sum index 8d68b5140c..1d389f75be 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,10 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= +github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA= +github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -185,6 +187,8 @@ github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/n github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxschmitt/golang-combinations v1.2.0 h1:V5E7MncIK8Yr1SL/SpdqMuSquFsfoIs5auI7Y3n8z14= +github.com/mxschmitt/golang-combinations v1.2.0/go.mod h1:RCm5eR03B+JrBOMRDLsKZWShluXdrHu+qwhPEJ0miBM= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22 h1:M5Nmg1iCnbZngzIBDIlMr9vW+okFfcSMBvBlXG8r+14= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= github.com/nspcc-dev/dbft v0.4.0 h1:4/atD4GrrMEtrYBDiZPrPzdKZ6ws7PR/cg0M4DEdVeI= @@ -358,7 +362,6 @@ golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/ec/ec.go b/internal/ec/ec.go new file mode 100644 index 0000000000..3a7a4bf5b8 --- /dev/null +++ b/internal/ec/ec.go @@ -0,0 +1,81 @@ +package ec + +import ( + "fmt" + "slices" + "strconv" + + "github.com/klauspost/reedsolomon" + "golang.org/x/exp/constraints" +) + +// Erasure coding attributes. +const ( + AttributePrefix = "__NEOFS__EC_" + AttributeRuleIdx = AttributePrefix + "RULE_IDX" + AttributePartIdx = AttributePrefix + "PART_IDX" +) + +// Rule represents erasure coding rule for object payload's encoding and placement. +type Rule struct { + DataPartNum uint8 + ParityPartNum uint8 +} + +// String implements [fmt.Stringer]. +func (x Rule) String() string { + return strconv.FormatUint(uint64(x.DataPartNum), 10) + "/" + strconv.FormatUint(uint64(x.ParityPartNum), 10) +} + +// Encode encodes given data according to specified EC rule and returns coded +// parts. First [Rule.DataPartNum] elements are data parts, other +// [Rule.ParityPartNum] ones are parity blocks. +// +// All parts are the same length. If data len is not divisible by +// [Rule.DataPartNum], last data part is aligned with zeros. +// +// If data is empty, all parts are nil. +func Encode(rule Rule, data []byte) ([][]byte, error) { + if len(data) == 0 { + return make([][]byte, rule.DataPartNum+rule.ParityPartNum), nil + } + + // TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501 + enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + if err != nil { // should never happen with correct rule + return nil, fmt.Errorf("init Reed-Solomon encoder: %w", err) + } + + parts, err := enc.Split(data) + if err != nil { + return nil, fmt.Errorf("split data: %w", err) + } + + if err := enc.Encode(parts); err != nil { + return nil, fmt.Errorf("calculate Reed-Solomon parity: %w", err) + } + + return parts, nil +} + +// Decode decodes source data of known len from EC parts obtained by applying +// specified rule. +func Decode[LT constraints.Unsigned](rule Rule, dataLen LT, parts [][]byte) ([]byte, error) { + // TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501 + dec, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + if err != nil { // should never happen with correct rule + return nil, fmt.Errorf("init Reed-Solomon decoder: %w", err) + } + + required := make([]bool, rule.DataPartNum+rule.ParityPartNum) + for i := range rule.DataPartNum { + required[i] = true + } + + if err := dec.ReconstructSome(parts, required); err != nil { + return nil, fmt.Errorf("restore Reed-Solomon: %w", err) + } + + // TODO: last part may be shorter, do not overallocate buffer. + return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen], nil +} diff --git a/internal/ec/ec_test.go b/internal/ec/ec_test.go new file mode 100644 index 0000000000..2e37013e2e --- /dev/null +++ b/internal/ec/ec_test.go @@ -0,0 +1,76 @@ +package ec_test + +import ( + "testing" + + "github.com/klauspost/reedsolomon" + iec "github.com/nspcc-dev/neofs-node/internal/ec" + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/stretchr/testify/require" +) + +func TestRule_String(t *testing.T) { + r := iec.Rule{ + DataPartNum: 12, + ParityPartNum: 23, + } + require.Equal(t, "12/23", r.String()) +} + +func testEncode(t *testing.T, rule iec.Rule, data []byte) { + ln := uint(len(data)) + + parts, err := iec.Encode(rule, data) + require.NoError(t, err) + + res, err := iec.Decode(rule, ln, parts) + require.NoError(t, err) + require.Equal(t, data, res) + + for lostCount := 1; lostCount <= int(rule.ParityPartNum); lostCount++ { + for _, lostIdxs := range islices.IndexCombos(len(parts), lostCount) { + res, err := iec.Decode(rule, ln, islices.NilTwoDimSliceElements(parts, lostIdxs)) + require.NoError(t, err) + require.Equal(t, data, res) + } + } + + for _, lostIdxs := range islices.IndexCombos(len(parts), int(rule.ParityPartNum)+1) { + _, err := iec.Decode(rule, ln, islices.NilTwoDimSliceElements(parts, lostIdxs)) + require.ErrorContains(t, err, "restore Reed-Solomon") + require.ErrorIs(t, err, reedsolomon.ErrTooFewShards) + } +} + +func TestEncode(t *testing.T) { + rules := []iec.Rule{ + {DataPartNum: 3, ParityPartNum: 1}, + {DataPartNum: 12, ParityPartNum: 4}, + } + + data := testutil.RandByteSlice(4 << 10) + + t.Run("empty", func(t *testing.T) { + for _, rule := range rules { + t.Run(rule.String(), func(t *testing.T) { + test := func(t *testing.T, data []byte) { + res, err := iec.Encode(rule, []byte{}) + require.NoError(t, err) + + total := int(rule.DataPartNum + rule.ParityPartNum) + require.Len(t, res, total) + require.EqualValues(t, total, islices.CountNilsInTwoDimSlice(res)) + } + test(t, nil) + test(t, []byte{}) + }) + } + }) + + for _, rule := range rules { + t.Run(rule.String(), func(t *testing.T) { + testEncode(t, rule, data) + }) + } +} diff --git a/internal/ec/object_test.go b/internal/ec/object_test.go new file mode 100644 index 0000000000..4110c5f040 --- /dev/null +++ b/internal/ec/object_test.go @@ -0,0 +1,174 @@ +package ec_test + +import ( + "crypto/sha256" + "math/rand/v2" + "testing" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" + "github.com/stretchr/testify/require" +) + +func TestGetPartInfo(t *testing.T) { + var obj object.Object + otherAttr := object.NewAttribute("any_attribute", "val") + + obj.SetAttributes(otherAttr) + + t.Run("missing", func(t *testing.T) { + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.EqualValues(t, -1, pi.RuleIndex) + }) + + t.Run("failure", func(t *testing.T) { + for _, tc := range []struct { + name string + attrs map[string]string + assertErr func(t *testing.T, err error) + }{ + {name: "non-int rule index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "not_an_int", "__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.ErrorContains(t, err, "invalid index attribute __NEOFS__EC_RULE_IDX: ") + require.ErrorContains(t, err, "invalid syntax") + }, + }, + {name: "negative rule index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "-123", "__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "invalid index attribute __NEOFS__EC_RULE_IDX: negative value -123") + }, + }, + {name: "non-int part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123", "__NEOFS__EC_PART_IDX": "not_an_int"}, + assertErr: func(t *testing.T, err error) { + require.ErrorContains(t, err, "invalid index attribute __NEOFS__EC_PART_IDX: ") + require.ErrorContains(t, err, "invalid syntax") + }, + }, + {name: "negative part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123", "__NEOFS__EC_PART_IDX": "-456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "invalid index attribute __NEOFS__EC_PART_IDX: negative value -456") + }, + }, + {name: "rule index without part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "__NEOFS__EC_RULE_IDX attribute is set while __NEOFS__EC_PART_IDX is not") + }, + }, + {name: "part index without rule index", + attrs: map[string]string{"__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "__NEOFS__EC_PART_IDX attribute is set while __NEOFS__EC_RULE_IDX is not") + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + attrs := make([]object.Attribute, 0, len(tc.attrs)/2) + for k, v := range tc.attrs { + attrs = append(attrs, object.NewAttribute(k, v)) + } + + obj.SetAttributes(append([]object.Attribute{otherAttr}, attrs...)...) + + _, err := iec.GetPartInfo(obj) + tc.assertErr(t, err) + }) + } + }) + + obj.SetAttributes( + otherAttr, + object.NewAttribute("__NEOFS__EC_RULE_IDX", "123"), + object.NewAttribute("__NEOFS__EC_PART_IDX", "456"), + ) + + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.Equal(t, iec.PartInfo{RuleIndex: 123, Index: 456}, pi) +} + +func TestFormObjectForECPart(t *testing.T) { + ver := version.Current() + st := sessiontest.Object() + signer := neofscryptotest.Signer() + + var parent object.Object + parent.SetVersion(&ver) + parent.SetContainerID(cidtest.ID()) + parent.SetOwner(usertest.ID()) + parent.SetCreationEpoch(rand.Uint64()) + parent.SetType(object.Type(rand.Int32())) + parent.SetSessionToken(&st) + require.NoError(t, parent.SetVerificationFields(signer)) + + partInfo := iec.PartInfo{RuleIndex: 123, Index: 456} + part := testutil.RandByteSlice(32) + + t.Run("signer failure", func(t *testing.T) { + signer := neofscryptotest.FailSigner(signer) + _, sigErr := signer.Sign(nil) + require.Error(t, sigErr) + + _, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.ErrorContains(t, err, "set verification fields: could not set signature:") + require.ErrorContains(t, err, sigErr.Error()) + }) + + obj, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.NoError(t, err) + + require.NoError(t, obj.VerifyID()) + require.True(t, obj.VerifySignature()) + + require.True(t, obj.HasParent()) + require.NotNil(t, obj.Parent()) + require.Equal(t, parent, *obj.Parent()) + + require.Equal(t, part, obj.Payload()) + require.EqualValues(t, len(part), obj.PayloadSize()) + + pcs, ok := obj.PayloadChecksum() + require.True(t, ok) + require.Equal(t, checksum.NewSHA256(sha256.Sum256(part)), pcs) + + require.Equal(t, parent.Version(), obj.Version()) + require.Equal(t, parent.GetContainerID(), obj.GetContainerID()) + require.Equal(t, parent.Owner(), obj.Owner()) + require.Equal(t, parent.CreationEpoch(), obj.CreationEpoch()) + require.Equal(t, object.TypeRegular, obj.Type()) + require.Equal(t, parent.SessionToken(), obj.SessionToken()) + + _, ok = obj.PayloadHomomorphicHash() + require.False(t, ok) + + require.Len(t, obj.Attributes(), 2) + + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.Equal(t, partInfo, pi) + + t.Run("with homomorphic hash", func(t *testing.T) { + anyHash := checksum.NewTillichZemor([tz.Size]byte{1, 2, 3}) + parent.SetPayloadHomomorphicHash(anyHash) + + obj, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.NoError(t, err) + + phh, ok := obj.PayloadHomomorphicHash() + require.True(t, ok) + require.Equal(t, checksum.NewTillichZemor(tz.Sum(part)), phh) + }) +} diff --git a/internal/ec/objects.go b/internal/ec/objects.go new file mode 100644 index 0000000000..4dd4693fd4 --- /dev/null +++ b/internal/ec/objects.go @@ -0,0 +1,75 @@ +package ec + +import ( + "fmt" + + iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/tzhash/tz" +) + +// PartInfo groups information about single EC part produced according to some [Rule]. +type PartInfo struct { + // Index of EC rule in the container storage policy. + RuleIndex int + // Part index. + Index int +} + +// GetPartInfo fetches EC part info from given object header. It one of +// [AttributeRuleIdx] or [AttributeRuleIdx] attributes is set, the other must be +// set too. If both are missing, GetPartInfo returns [PartInfo.RuleIndex] = -1 +// without error. +func GetPartInfo(obj object.Object) (PartInfo, error) { + ruleIdx, err := iobject.GetIndexAttribute(obj, AttributeRuleIdx) + if err != nil { + return PartInfo{}, fmt.Errorf("invalid index attribute %s: %w", AttributeRuleIdx, err) + } + + partIdx, err := iobject.GetIndexAttribute(obj, AttributePartIdx) + if err != nil { + return PartInfo{}, fmt.Errorf("invalid index attribute %s: %w", AttributePartIdx, err) + } + + if ruleIdx < 0 { + if partIdx >= 0 { + return PartInfo{}, fmt.Errorf("%s attribute is set while %s is not", AttributePartIdx, AttributeRuleIdx) + } + } else if partIdx < 0 { + return PartInfo{}, fmt.Errorf("%s attribute is set while %s is not", AttributeRuleIdx, AttributePartIdx) + } + + return PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }, nil +} + +// FormObjectForECPart forms object for EC part produced from given parent object. +func FormObjectForECPart(signer neofscrypto.Signer, parent object.Object, part []byte, partInfo PartInfo) (object.Object, error) { + var obj object.Object + obj.SetVersion(parent.Version()) + obj.SetContainerID(parent.GetContainerID()) + obj.SetOwner(parent.Owner()) + obj.SetCreationEpoch(parent.CreationEpoch()) + obj.SetType(object.TypeRegular) + obj.SetSessionToken(parent.SessionToken()) + + obj.SetParent(&parent) + iobject.SetIntAttribute(&obj, AttributeRuleIdx, partInfo.RuleIndex) + iobject.SetIntAttribute(&obj, AttributePartIdx, partInfo.Index) + + obj.SetPayload(part) + obj.SetPayloadSize(uint64(len(part))) + if _, ok := parent.PayloadHomomorphicHash(); ok { + obj.SetPayloadHomomorphicHash(checksum.NewTillichZemor(tz.Sum(part))) + } + + if err := obj.SetVerificationFields(signer); err != nil { + return object.Object{}, fmt.Errorf("set verification fields: %w", err) + } + + return obj, nil +} diff --git a/internal/object/attributes.go b/internal/object/attributes.go new file mode 100644 index 0000000000..8e6109166f --- /dev/null +++ b/internal/object/attributes.go @@ -0,0 +1,80 @@ +package object + +import ( + "errors" + "fmt" + "strconv" + + "github.com/nspcc-dev/neofs-sdk-go/object" +) + +// ErrAttributeNotFound is returned when some object attribute not found. +var ErrAttributeNotFound = errors.New("attribute not found") + +// GetIndexAttribute looks up for specified index attribute in the given object +// header. Returns -1 if the attribute is missing. +// +// GetIndexAttribute ignores all attribute values except the first. +// +// Note that if attribute exists but negative, GetIndexAttribute returns error. +func GetIndexAttribute(hdr object.Object, attr string) (int, error) { + i, err := GetIntAttribute(hdr, attr) + if err != nil { + if errors.Is(err, ErrAttributeNotFound) { + return -1, nil + } + return 0, err + } + + if i < 0 { + return 0, fmt.Errorf("negative value %d", i) + } + + return i, nil +} + +// GetIntAttribute looks up for specified int attribute in the given object +// header. Returns [ErrAttributeNotFound] if the attribute is missing. +// +// GetIntAttribute ignores all attribute values except the first. +func GetIntAttribute(hdr object.Object, attr string) (int, error) { + if s := GetAttribute(hdr, attr); s != "" { + return strconv.Atoi(s) + } + return 0, ErrAttributeNotFound +} + +// GetAttribute looks up for specified attribute in the given object header. +// Returns empty string if the attribute is missing. +// +// GetIntAttribute ignores all attribute values except the first. +func GetAttribute(hdr object.Object, attr string) string { + attrs := hdr.Attributes() + for i := range attrs { + if attrs[i].Key() == attr { + return attrs[i].Value() + } + } + return "" +} + +// SetIntAttribute sets int value for the object attribute. If the attribute +// already exists, SetIntAttribute overwrites its value. +func SetIntAttribute(dst *object.Object, attr string, val int) { + SetAttribute(dst, attr, strconv.Itoa(val)) +} + +// SetAttribute sets value for the object attribute. If the attribute already +// exists, SetAttribute overwrites its value. +func SetAttribute(dst *object.Object, attr, val string) { + attrs := dst.Attributes() + for i := range attrs { + if attrs[i].Key() == attr { + attrs[i].SetValue(val) + dst.SetAttributes(attrs...) + return + } + } + + dst.SetAttributes(append(attrs, object.NewAttribute(attr, val))...) +} diff --git a/internal/object/attributes_test.go b/internal/object/attributes_test.go new file mode 100644 index 0000000000..88c6412dce --- /dev/null +++ b/internal/object/attributes_test.go @@ -0,0 +1,174 @@ +package object_test + +import ( + "strconv" + "testing" + + iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/stretchr/testify/require" +) + +func TestGetIndexAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, -1, i) + }) + + t.Run("not an integer", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "not_an_int")) + + _, err := iobject.GetIndexAttribute(obj, attr) + require.ErrorContains(t, err, "invalid syntax") + }) + + t.Run("negative", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "-123")) + + _, err := iobject.GetIndexAttribute(obj, attr) + require.EqualError(t, err, "negative value -123") + }) + + obj.SetAttributes(object.NewAttribute(attr, "1234567890")) + + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1234567890, i) + + t.Run("multiple", func(t *testing.T) { + for _, s := range []string{ + "not_an_int", + "-1", + "2", + } { + obj.SetAttributes( + object.NewAttribute(attr, "1"), + object.NewAttribute(attr, s), + ) + + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1, i) + } + }) +} + +func TestGetIntAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + _, err := iobject.GetIntAttribute(obj, attr) + require.ErrorIs(t, err, iobject.ErrAttributeNotFound) + }) + + t.Run("not an integer", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "not_an_int")) + + _, err := iobject.GetIntAttribute(obj, attr) + require.ErrorContains(t, err, "invalid syntax") + }) + + for _, tc := range []struct { + s string + i int + }{ + {s: "1234567890", i: 1234567890}, + {s: "0", i: 0}, + {s: "-1234567890", i: -1234567890}, + } { + obj.SetAttributes(object.NewAttribute(attr, tc.s)) + + i, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err, tc.s) + require.EqualValues(t, tc.i, i) + } + + t.Run("multiple", func(t *testing.T) { + for _, s := range []string{ + "not_an_int", + "-1", + "2", + } { + obj.SetAttributes( + object.NewAttribute(attr, "1"), + object.NewAttribute(attr, s), + ) + + i, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1, i) + } + }) +} + +func TestGetAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + require.Empty(t, iobject.GetAttribute(obj, attr)) + }) + + obj.SetAttributes(object.NewAttribute(attr, "val")) + require.Equal(t, "val", iobject.GetAttribute(obj, attr)) + + t.Run("multiple", func(t *testing.T) { + obj.SetAttributes( + object.NewAttribute(attr, "val1"), + object.NewAttribute(attr, "val2"), + ) + + require.Equal(t, "val1", iobject.GetAttribute(obj, attr)) + }) +} + +func TestSetIntAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + obj.SetAttributes(object.NewAttribute(attr+"_other", "val")) + + check := func(t *testing.T, val int) { + iobject.SetIntAttribute(&obj, attr, val) + + attrs := obj.Attributes() + require.Len(t, attrs, 2) + require.Equal(t, attr, attrs[1].Key()) + require.Equal(t, strconv.Itoa(val), attrs[1].Value()) + + got, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err, val) + require.EqualValues(t, val, got) + } + + check(t, 1234567890) + check(t, 0) + check(t, -1234567890) +} + +func TestSetAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + obj.SetAttributes(object.NewAttribute(attr+"_other", "val")) + + check := func(t *testing.T, val string) { + iobject.SetAttribute(&obj, attr, val) + + attrs := obj.Attributes() + require.Len(t, attrs, 2) + require.Equal(t, attr, attrs[1].Key()) + require.Equal(t, val, attrs[1].Value()) + + got := iobject.GetAttribute(obj, attr) + require.Equal(t, val, got) + } + + check(t, "val1") + check(t, "val2") +} diff --git a/internal/slices/index.go b/internal/slices/index.go new file mode 100644 index 0000000000..69a0107e07 --- /dev/null +++ b/internal/slices/index.go @@ -0,0 +1,17 @@ +package slices + +import combinations "github.com/mxschmitt/golang-combinations" + +// IndexCombos returns all combinations of n indexes taken k. +func IndexCombos(n, k int) [][]int { + return combinations.Combinations(Indexes(n), k) +} + +// Indexes returns slices filled with n indexes. +func Indexes(n int) []int { + s := make([]int, n) + for i := range s { + s[i] = i + } + return s +} diff --git a/internal/slices/index_test.go b/internal/slices/index_test.go new file mode 100644 index 0000000000..404bf751bb --- /dev/null +++ b/internal/slices/index_test.go @@ -0,0 +1,24 @@ +package slices_test + +import ( + "testing" + + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/stretchr/testify/require" +) + +func TestIndexes(t *testing.T) { + require.Empty(t, islices.Indexes(0)) + require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, islices.Indexes(10)) +} + +func TestIndexCombos(t *testing.T) { + require.ElementsMatch(t, islices.IndexCombos(4, 2), [][]int{ + {0, 1}, + {0, 2}, + {0, 3}, + {1, 2}, + {1, 3}, + {2, 3}, + }) +} diff --git a/internal/slices/slices.go b/internal/slices/slices.go new file mode 100644 index 0000000000..1e81e9e22e --- /dev/null +++ b/internal/slices/slices.go @@ -0,0 +1,38 @@ +package slices + +import "slices" + +// TwoDimSliceElementCount returns sum len for ss. +func TwoDimSliceElementCount[E any](s [][]E) int { + var n int + for i := range s { + n += len(s[i]) + } + return n +} + +// NilTwoDimSliceElements returns clone of ss with nil-ed given indexes. +func NilTwoDimSliceElements[T any](s [][]T, idxs []int) [][]T { + if s == nil { + return nil + } + + c := make([][]T, len(s)) + for i := range c { + if !slices.Contains(idxs, i) { + c[i] = slices.Clone(s[i]) + } + } + return c +} + +// CountNilsInTwoDimSlice counts nil elements of s. +func CountNilsInTwoDimSlice[T any](s [][]T) int { + var n int + for i := range s { + if s[i] == nil { + n++ + } + } + return n +} diff --git a/internal/slices/slices_test.go b/internal/slices/slices_test.go new file mode 100644 index 0000000000..0cc125218b --- /dev/null +++ b/internal/slices/slices_test.go @@ -0,0 +1,40 @@ +package slices_test + +import ( + "testing" + + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/stretchr/testify/require" +) + +func TestTwoDimElementCount(t *testing.T) { + require.Zero(t, islices.TwoDimSliceElementCount([][]int(nil))) + require.Zero(t, islices.TwoDimSliceElementCount(make([][]int, 10))) + require.EqualValues(t, 10, islices.TwoDimSliceElementCount([][]int{ + {1}, + {2, 3}, + {4, 5, 6}, + {7, 8, 9, 10}, + })) +} + +func TestNilTwoDimSliceElements(t *testing.T) { + require.Nil(t, islices.NilTwoDimSliceElements([][]int(nil), []int{1, 2, 3})) + require.Empty(t, islices.NilTwoDimSliceElements([][]int{}, []int{1, 2, 3})) + + excl := []int{1, 3} + res := islices.NilTwoDimSliceElements([][]int{ + {1}, + {2, 3}, + {4, 5, 6}, + {7, 8, 9, 10}, + }, excl) + + require.Equal(t, [][]int{ + {1}, + nil, + {4, 5, 6}, + nil, + }, res) + require.EqualValues(t, len(excl), islices.CountNilsInTwoDimSlice(res)) +} diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index a5be698cbc..b47ea4dda4 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -8,33 +8,24 @@ import ( type putSvcWrapper putsvc.Service func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) { - streamer, err := (*putsvc.Service)(w).Put(exec.context()) - if err != nil { - return nil, err - } - payload := exec.tombstoneObj.Payload() - initPrm := new(putsvc.PutInitPrm). - WithCommonPrm(exec.commonParameters()). - WithObject(exec.tombstoneObj.CutPayload()) + var opts putsvc.PutInitOptions - err = streamer.Init(initPrm) + pw, err := (*putsvc.Service)(w).InitPut(exec.context(), exec.tombstoneObj.CutPayload(), exec.commonParameters(), opts) if err != nil { return nil, err } - err = streamer.SendChunk(new(putsvc.PutChunkPrm).WithChunk(payload)) + _, err = pw.Write(payload) if err != nil { return nil, err } - r, err := streamer.Close() + id, err := pw.Close() if err != nil { return nil, err } - id := r.ObjectID() - return &id, nil } diff --git a/pkg/services/object/internal/target.go b/pkg/services/object/internal/target.go index 828eaacf8e..cdfacd5c17 100644 --- a/pkg/services/object/internal/target.go +++ b/pkg/services/object/internal/target.go @@ -21,9 +21,8 @@ type HeaderWriter interface { WriteHeader(*object.Object) error } -// Target is an interface of the object writer. -type Target interface { - HeaderWriter +// PayloadWriter is an interface of the object payload writer. +type PayloadWriter interface { // Writer writes object payload chunk. // // Can be called multiple times. @@ -41,3 +40,9 @@ type Target interface { // that depends on the implementation. Close() (oid.ID, error) } + +// Target is an interface of the object writer. +type Target interface { + HeaderWriter + PayloadWriter +} diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go index 679c0c4d3e..5a24e7506e 100644 --- a/pkg/services/object/put/distibuted_test.go +++ b/pkg/services/object/put/distibuted_test.go @@ -11,7 +11,6 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -32,26 +31,6 @@ func allocNodes(n []uint) [][]netmap.NodeInfo { return res } -type testContainerNodes struct { - objID oid.ID - - sortErr error - cnrNodes [][]netmap.NodeInfo - - primCounts []uint -} - -func (x testContainerNodes) Unsorted() [][]netmap.NodeInfo { return x.cnrNodes } - -func (x testContainerNodes) SortForObject(obj oid.ID) ([][]netmap.NodeInfo, error) { - if x.objID != obj { - return nil, errors.New("[test] unexpected object ID") - } - return x.cnrNodes, x.sortErr -} - -func (x testContainerNodes) PrimaryCounts() []uint { return x.primCounts } - type testNetwork struct { localPubKey []byte } @@ -89,21 +68,20 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes[2][0].SetPublicKey(cnrNodes[0][1].PublicKey()) cnrNodes[1][1].SetPublicKey(cnrNodes[0][2].PublicKey()) var rwp testWorkerPool - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: testNetwork{ - localPubKey: cnrNodes[0][2].PublicKey(), - }, - remotePool: &rwp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 3, 2}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &rwp, + }, + neoFSNet: testNetwork{ + localPubKey: cnrNodes[0][2].PublicKey(), + }, }, } var handlerMtx sync.Mutex var handlerCalls []nodeDesc - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 3, 2}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node) handlerMtx.Unlock() @@ -182,20 +160,19 @@ func TestIterateNodesForObject(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{3, 3, 2}) cnrNodes[1][1].SetPublicKey(cnrNodes[0][1].PublicKey()) - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: new(testWorkerPool), - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 1, 2}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: new(testWorkerPool), + }, + neoFSNet: new(testNetwork), }, linearReplNum: 4, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 1, 2}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -223,20 +200,18 @@ func TestIterateNodesForObject(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{2, 3, 2}) cnrNodes[1][2].SetPublicKey(cnrNodes[0][1].PublicKey()) - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: new(testWorkerPool), - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{1, 1, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: new(testWorkerPool), + }, + neoFSNet: new(testNetwork), }, - broadcast: true, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{1, 1, 1}, cnrNodes, true, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -260,21 +235,6 @@ func TestIterateNodesForObject(t *testing.T) { }, key) } }) - t.Run("sort nodes for object failure", func(t *testing.T) { - objID := oidtest.ID() - iter := placementIterator{ - log: zap.NewNop(), - containerNodes: testContainerNodes{ - objID: objID, - sortErr: errors.New("any sort error"), - }, - } - err := iter.iterateNodesForObject(objID, func(nodeDesc) error { - t.Fatal("must not be called") - return nil - }) - require.EqualError(t, err, "sort container nodes for the object: any sort error") - }) t.Run("worker pool failure", func(t *testing.T) { // nodes: [A B] [C D E] [F] // policy: [2 2 1] @@ -286,19 +246,18 @@ func TestIterateNodesForObject(t *testing.T) { nFail: 5, err: errors.New("any worker pool error"), } - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &wp, + }, + neoFSNet: new(testNetwork), }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -330,19 +289,18 @@ func TestIterateNodesForObject(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{2, 3, 1}) var wp testWorkerPool - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 4, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &wp, + }, + neoFSNet: new(testNetwork), }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 4, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -368,19 +326,18 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes := allocNodes([]uint{2, 3, 1}) cnrNodes[1][2].SetNetworkEndpoints("definitely invalid network address") var wp testWorkerPool - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &wp, + }, + neoFSNet: new(testNetwork), }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -409,19 +366,18 @@ func TestIterateNodesForObject(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{2, 3, 1}) var wp testWorkerPool - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &wp, + }, + neoFSNet: new(testNetwork), }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -445,23 +401,22 @@ func TestIterateNodesForObject(t *testing.T) { t.Run("return only after worker pool finished", func(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{2, 3, 1}) - iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: &testWorkerPool{ - err: errors.New("pool err"), - nFail: 2, - }, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 3, 1}, + iter := distributedTarget{ + svc: &Service{ + cfg: &cfg{ + log: zap.NewNop(), + remotePool: &testWorkerPool{ + err: errors.New("pool err"), + nFail: 2, + }, + }, + neoFSNet: new(testNetwork), }, } blockCh := make(chan struct{}) returnCh := make(chan struct{}) go func() { - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 3, 1}, cnrNodes, false, func(node nodeDesc) error { <-blockCh return nil }) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index be377f46ae..f58ba0ac5c 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -11,14 +11,11 @@ import ( "sync" "sync/atomic" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" - netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" - chaincontainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/services/meta" svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/util" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/netmap" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -26,43 +23,43 @@ import ( "go.uber.org/zap" ) -type distributedTarget struct { - opCtx context.Context - - placementIterator placementIterator - - obj *objectSDK.Object - objMeta object.ContentMeta - networkMagicNumber uint32 - fsState netmapcore.StateDetailed - - cnrClient *chaincontainer.Client - metainfoConsistencyAttr string - - metaSvc *meta.Meta - metaMtx sync.RWMutex - metaSigner neofscrypto.Signer - objSharedMeta []byte - collectedSignatures [][]byte - - localNodeInContainer bool - localNodeSigner neofscrypto.Signer +type distributedTargetState struct { + obj *objectSDK.Object // - object if localOnly // - replicate request if localNodeInContainer // - payload otherwise encodedObject encodedObject - relay func(nodeDesc) error + objSharedMeta []byte - fmt *object.FormatValidator + collectedSignaturesMtx sync.Mutex + collectedSignatures [][]byte +} - localStorage ObjectStorage - clientConstructor ClientConstructor - transport Transport - commonPrm *svcutil.CommonPrm - keyStorage *svcutil.KeyStorage +type distributedTarget struct { + svc *Service + localNodeSigner neofscrypto.Signer + metaSigner neofscrypto.Signer + /* request parameters */ + opCtx context.Context + commonPrm *svcutil.CommonPrm localOnly bool + // when non-zero, this setting simplifies the object's storage policy + // requirements to a fixed number of object replicas to be retained + linearReplNum uint + metainfoConsistencyAttr string + relay func(nodeDesc) error + + /* processing data */ + containerNodes ContainerNodes + localNodeInContainer bool + sessionSigner neofscrypto.Signer + // When object from request is an EC part, ecPart.RuleIndex is >= 0. + // Undefined when policy have no EC rules. + ecPart iec.PartInfo + + state distributedTargetState } type nodeDesc struct { @@ -95,9 +92,9 @@ func (t *distributedTarget) WriteHeader(hdr *objectSDK.Object) error { if t.localNodeInContainer { var err error if t.localOnly { - t.encodedObject, err = encodeObjectWithoutPayload(*hdr, int(payloadLen)) + t.state.encodedObject, err = encodeObjectWithoutPayload(*hdr, int(payloadLen)) } else { - t.encodedObject, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, *hdr, int(payloadLen), t.metainfoConsistencyAttr != "") + t.state.encodedObject, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, *hdr, int(payloadLen), t.metainfoConsistencyAttr != "") } if err != nil { return fmt.Errorf("encode object into binary: %w", err) @@ -108,75 +105,122 @@ func (t *distributedTarget) WriteHeader(hdr *objectSDK.Object) error { putPayload(b) b = make([]byte, 0, payloadLen) } - t.encodedObject = encodedObject{b: b} + t.state.encodedObject = encodedObject{b: b} } - t.obj = hdr + t.state.obj = hdr return nil } func (t *distributedTarget) Write(p []byte) (n int, err error) { - t.encodedObject.b = append(t.encodedObject.b, p...) + t.state.encodedObject.b = append(t.state.encodedObject.b, p...) return len(p), nil } func (t *distributedTarget) Close() (oid.ID, error) { defer func() { - putPayload(t.encodedObject.b) - t.encodedObject.b = nil - t.collectedSignatures = nil + putPayload(t.state.encodedObject.b) + t.state.encodedObject.b = nil }() - t.obj.SetPayload(t.encodedObject.b[t.encodedObject.pldOff:]) + t.state.obj.SetPayload(t.state.encodedObject.b[t.state.encodedObject.pldOff:]) - tombOrLink := t.obj.Type() == objectSDK.TypeLink || t.obj.Type() == objectSDK.TypeTombstone - - if !t.placementIterator.broadcast && len(t.obj.Children()) > 0 || tombOrLink { - // enabling extra broadcast for linking and tomb objects - t.placementIterator.broadcast = true - } + typ := t.state.obj.Type() + tombOrLink := typ == objectSDK.TypeLink || typ == objectSDK.TypeTombstone // v2 split link object and tombstone validations are expensive routines // and are useless if the node does not belong to the container, since // another node is responsible for the validation and may decline it, // does not matter what this node thinks about it + var objMeta object.ContentMeta if !tombOrLink || t.localNodeInContainer { var err error - if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { + if objMeta, err = t.svc.fmtValidator.ValidateContent(t.state.obj); err != nil { return oid.ID{}, fmt.Errorf("(%T) could not validate payload content: %w", t, err) } } + err := t.saveObject(*t.state.obj, objMeta, t.state.encodedObject) + if err != nil { + return oid.ID{}, err + } + + return t.state.obj.GetID(), nil +} + +func (t *distributedTarget) saveObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + if t.localOnly && t.sessionSigner == nil { + return t.distributeObject(obj, objMeta, encObj, nil) + } + + objNodeLists, err := t.containerNodes.SortForObject(t.state.obj.GetID()) + if err != nil { + return fmt.Errorf("sort container nodes by object ID: %w", err) + } + + // TODO: handle rules in parallel. https://github.com/nspcc-dev/neofs-node/issues/3503 + + repRules := t.containerNodes.PrimaryCounts() + if len(repRules) > 0 { + typ := obj.Type() + broadcast := typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLink || typ == objectSDK.TypeLock || len(obj.Children()) > 0 + return t.distributeObject(obj, objMeta, encObj, func(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + return t.iterateNodesForObject(obj.GetID(), repRules, objNodeLists, broadcast, func(node nodeDesc) error { + return t.sendObject(obj, objMeta, encObj, node) + }) + }) + } + + if ecRules := t.containerNodes.ECRules(); len(ecRules) > 0 { + if t.ecPart.RuleIndex >= 0 { // already encoded EC part + total := int(ecRules[t.ecPart.RuleIndex].DataPartNum + ecRules[t.ecPart.RuleIndex].ParityPartNum) + nodes := objNodeLists[len(repRules)+t.ecPart.RuleIndex] + return t.saveECPart(obj, objMeta, encObj, t.ecPart.Index, total, nodes) + } + + if t.sessionSigner != nil { + if err := t.ecAndSaveObject(t.sessionSigner, obj, ecRules, objNodeLists[len(repRules):]); err != nil { + return err + } + } + } + + return nil +} + +func (t *distributedTarget) distributeObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject, + placementFn func(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error) error { + defer func() { + t.state.collectedSignatures = nil + }() + if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { - t.objSharedMeta = t.encodeCurrentObjectMetadata() + t.state.objSharedMeta = t.encodeObjectMetadata(obj) } - id := t.obj.GetID() + id := obj.GetID() var err error if t.localOnly { - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID())) + var l = t.svc.log.With(zap.Stringer("oid", id)) - err = t.writeObjectLocally() + err = t.writeObjectLocally(obj, objMeta, encObj) if err != nil { err = fmt.Errorf("write object locally: %w", err) svcutil.LogServiceError(l, "PUT", nil, err) err = errIncompletePut{singleErr: fmt.Errorf("%w (last node error: %w)", errNotEnoughNodes{required: 1}, err)} } } else { - err = t.placementIterator.iterateNodesForObject(id, t.sendObject) + err = placementFn(obj, objMeta, encObj) } if err != nil { - return oid.ID{}, err + return err } if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { - t.metaMtx.RLock() - defer t.metaMtx.RUnlock() - - if len(t.collectedSignatures) == 0 { - return oid.ID{}, fmt.Errorf("skip metadata chain submit for %s object: no signatures were collected", id) + if len(t.state.collectedSignatures) == 0 { + return fmt.Errorf("skip metadata chain submit for %s object: no signatures were collected", id) } var await bool @@ -187,68 +231,68 @@ func (t *distributedTarget) Close() (oid.ID, error) { case "optimistic": await = false default: - return id, nil + return nil } - addr := object.AddressOf(t.obj) + addr := object.AddressOf(&obj) var objAccepted chan struct{} if await { objAccepted = make(chan struct{}, 1) - t.metaSvc.NotifyObjectSuccess(objAccepted, addr) + t.svc.metaSvc.NotifyObjectSuccess(objAccepted, addr) } - err = t.cnrClient.SubmitObjectPut(t.objSharedMeta, t.collectedSignatures) + err = t.svc.cnrClient.SubmitObjectPut(t.state.objSharedMeta, t.state.collectedSignatures) if err != nil { if await { - t.metaSvc.UnsubscribeFromObject(addr) + t.svc.metaSvc.UnsubscribeFromObject(addr) } - return oid.ID{}, fmt.Errorf("failed to submit %s object meta information: %w", addr, err) + return fmt.Errorf("failed to submit %s object meta information: %w", addr, err) } if await { select { case <-t.opCtx.Done(): - t.metaSvc.UnsubscribeFromObject(addr) - return oid.ID{}, fmt.Errorf("interrupted awaiting for %s object meta information: %w", addr, t.opCtx.Err()) + t.svc.metaSvc.UnsubscribeFromObject(addr) + return fmt.Errorf("interrupted awaiting for %s object meta information: %w", addr, t.opCtx.Err()) case <-objAccepted: } } - t.placementIterator.log.Debug("submitted object meta information", zap.Stringer("addr", addr)) + t.svc.log.Debug("submitted object meta information", zap.Stringer("addr", addr)) } - return id, nil + return nil } -func (t *distributedTarget) encodeCurrentObjectMetadata() []byte { - currBlock := t.fsState.CurrentBlock() - currEpochDuration := t.fsState.CurrentEpochDuration() +func (t *distributedTarget) encodeObjectMetadata(obj objectSDK.Object) []byte { + currBlock := t.svc.networkState.CurrentBlock() + currEpochDuration := t.svc.networkState.CurrentEpochDuration() expectedVUB := (uint64(currBlock)/currEpochDuration + 2) * currEpochDuration - firstObj := t.obj.GetFirstID() - if t.obj.HasParent() && firstObj.IsZero() { + firstObj := obj.GetFirstID() + if obj.HasParent() && firstObj.IsZero() { // object itself is the first one - firstObj = t.obj.GetID() + firstObj = obj.GetID() } var deletedObjs []oid.ID var lockedObjs []oid.ID - typ := t.obj.Type() + typ := obj.Type() switch typ { case objectSDK.TypeTombstone: - deletedObjs = append(deletedObjs, t.obj.AssociatedObject()) + deletedObjs = append(deletedObjs, obj.AssociatedObject()) case objectSDK.TypeLock: - lockedObjs = append(lockedObjs, t.obj.AssociatedObject()) + lockedObjs = append(lockedObjs, obj.AssociatedObject()) default: } - return object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), firstObj, t.obj.GetPreviousID(), - t.obj.PayloadSize(), typ, deletedObjs, lockedObjs, expectedVUB, t.networkMagicNumber) + return object.EncodeReplicationMetaInfo(obj.GetContainerID(), obj.GetID(), firstObj, obj.GetPreviousID(), + obj.PayloadSize(), typ, deletedObjs, lockedObjs, expectedVUB, t.svc.networkMagic) } -func (t *distributedTarget) sendObject(node nodeDesc) error { +func (t *distributedTarget) sendObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject, node nodeDesc) error { if node.local { - if err := t.writeObjectLocally(); err != nil { + if err := t.writeObjectLocally(obj, objMeta, encObj); err != nil { return fmt.Errorf("write object locally: %w", err) } return nil @@ -260,13 +304,13 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { var sigsRaw []byte var err error - if t.encodedObject.hdrOff > 0 { - sigsRaw, err = t.transport.SendReplicationRequestToNode(t.opCtx, t.encodedObject.b, node.info) + if encObj.hdrOff > 0 { + sigsRaw, err = t.svc.transport.SendReplicationRequestToNode(t.opCtx, encObj.b, node.info) if err != nil { err = fmt.Errorf("replicate object to remote node (key=%x): %w", node.info.PublicKey(), err) } } else { - err = putObjectToNode(t.opCtx, node.info, t.obj, t.keyStorage, t.clientConstructor, t.commonPrm) + err = putObjectToNode(t.opCtx, node.info, &obj, t.svc.keyStorage, t.svc.clientConstructor, t.commonPrm) } if err != nil { return fmt.Errorf("could not close object stream: %w", err) @@ -275,7 +319,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { // These should technically be errors, but we don't have // a complete implementation now, so errors are substituted with logs. - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID()), + var l = t.svc.log.With(zap.Stringer("oid", obj.GetID()), zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) sigs, err := decodeSignatures(sigsRaw) @@ -289,13 +333,13 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { continue } - if !sig.Verify(t.objSharedMeta) { + if !sig.Verify(t.state.objSharedMeta) { continue } - t.metaMtx.Lock() - t.collectedSignatures = append(t.collectedSignatures, sig.Value()) - t.metaMtx.Unlock() + t.state.collectedSignaturesMtx.Lock() + t.state.collectedSignatures = append(t.state.collectedSignatures, sig.Value()) + t.state.collectedSignaturesMtx.Unlock() return nil } @@ -306,20 +350,20 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return nil } -func (t *distributedTarget) writeObjectLocally() error { - if err := putObjectLocally(t.localStorage, t.obj, t.objMeta, &t.encodedObject); err != nil { +func (t *distributedTarget) writeObjectLocally(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + if err := putObjectLocally(t.svc.localStore, &obj, objMeta, &encObj); err != nil { return err } if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { - sig, err := t.metaSigner.Sign(t.objSharedMeta) + sig, err := t.metaSigner.Sign(t.state.objSharedMeta) if err != nil { return fmt.Errorf("failed to sign object metadata: %w", err) } - t.metaMtx.Lock() - t.collectedSignatures = append(t.collectedSignatures, sig) - t.metaMtx.Unlock() + t.state.collectedSignaturesMtx.Lock() + t.state.collectedSignatures = append(t.state.collectedSignatures, sig) + t.state.collectedSignaturesMtx.Unlock() } return nil @@ -371,33 +415,12 @@ func (x errNotEnoughNodes) Error() string { x.listIndex, x.required, x.left) } -type placementIterator struct { - log *zap.Logger - neoFSNet NeoFSNetwork - remotePool util.WorkerPool - /* request-dependent */ - containerNodes ContainerNodes - // when non-zero, this setting simplifies the object's storage policy - // requirements to a fixed number of object replicas to be retained - linearReplNum uint - // whether to perform additional best-effort of sending the object replica to - // all reserve nodes of the container - broadcast bool -} - -func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) error) error { - var replCounts []uint - var l = x.log.With(zap.Stringer("oid", obj)) - nodeLists, err := x.containerNodes.SortForObject(obj) - if err != nil { - return fmt.Errorf("sort container nodes for the object: %w", err) - } - if x.linearReplNum > 0 { +func (t *distributedTarget) iterateNodesForObject(obj oid.ID, replCounts []uint, nodeLists [][]netmap.NodeInfo, broadcast bool, f func(nodeDesc) error) error { + var l = t.svc.log.With(zap.Stringer("oid", obj)) + if t.linearReplNum > 0 { ns := slices.Concat(nodeLists...) nodeLists = [][]netmap.NodeInfo{ns} - replCounts = []uint{x.linearReplNum} - } else { - replCounts = x.containerNodes.PrimaryCounts() + replCounts = []uint{t.linearReplNum} } var processedNodesMtx sync.RWMutex var nextNodeGroupKeys []string @@ -441,7 +464,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er // latency and volume of "unfinished" data to be garbage-collected. Also after // the failure of any of the nodes the ability to comply with the policy // requirements may be lost. - for i := range nodeLists { + for i := range replCounts { listInd := i for { replRem := replCounts[listInd] - nodesCounters[listInd].stored @@ -450,7 +473,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } listLen := uint(len(nodeLists[listInd])) if listLen-nodesCounters[listInd].processed < replRem { - err = errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed} + var err error = errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed} if e, _ := lastRespErr.Load().(error); e != nil { err = fmt.Errorf("%w (last node error: %w)", err, e) } @@ -471,8 +494,8 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } continue } - if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { - nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[listInd][j]) + if nr.desc.local = t.svc.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { + nr.desc.info, nr.convertErr = convertNodeInfo(nodeLists[listInd][j]) } processedNodesMtx.Lock() nodeResults[pks] = nr @@ -486,7 +509,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure - err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", + err := fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed - 1}, nr.convertErr) return errIncompletePut{singleErr: err} @@ -503,7 +526,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er go processNode(pks, listInd, nr, &wg) continue } - if err := x.remotePool.Submit(func() { + if err := t.svc.remotePool.Submit(func() { processNode(pks, listInd, nr, &wg) }); err != nil { wg.Done() @@ -514,7 +537,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er wg.Wait() } } - if !x.broadcast { + if !broadcast { return nil } // TODO: since main part of the operation has already been completed, and @@ -531,8 +554,8 @@ broadcast: if ok { continue } - if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { - nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[i][j]) + if nr.desc.local = t.svc.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { + nr.desc.info, nr.convertErr = convertNodeInfo(nodeLists[i][j]) } processedNodesMtx.Lock() nodeResults[pks] = nr @@ -549,7 +572,7 @@ broadcast: go processNode(pks, -1, nr, &wg) continue } - if err := x.remotePool.Submit(func() { + if err := t.svc.remotePool.Submit(func() { processNode(pks, -1, nr, &wg) }); err != nil { wg.Done() @@ -562,7 +585,7 @@ broadcast: return nil } -func (x placementIterator) convertNodeInfo(nodeInfo netmap.NodeInfo) (client.NodeInfo, error) { +func convertNodeInfo(nodeInfo netmap.NodeInfo) (client.NodeInfo, error) { var res client.NodeInfo var endpoints network.AddressGroup if err := endpoints.FromIterator(network.NodeEndpointsIterator(nodeInfo)); err != nil { diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go new file mode 100644 index 0000000000..56820c25b5 --- /dev/null +++ b/pkg/services/object/put/ec.go @@ -0,0 +1,135 @@ +package putsvc + +import ( + "fmt" + "math" + "slices" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + "go.uber.org/zap" +) + +func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj object.Object, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo) error { + for i := range ecRules { + if slices.Contains(ecRules[:i], ecRules[i]) { // has already been processed, see below + continue + } + + payloadParts, err := iec.Encode(ecRules[i], obj.Payload()) + if err != nil { + return fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", i, ecRules[i], err) + } + + if err := t.applyECRule(signer, obj, i, payloadParts, nodeLists[i]); err != nil { + return fmt.Errorf("apply EC rule #%d (%s): %w", i, ecRules[i], err) + } + + for j := i + 1; j < len(ecRules); j++ { + if ecRules[i] != ecRules[j] { + continue + } + if err := t.applyECRule(signer, obj, j, payloadParts, nodeLists[j]); err != nil { + return fmt.Errorf("apply EC rule #%d (%s): %w", j, ecRules[j], err) + } + } + } + + return nil +} + +func (t *distributedTarget) applyECRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error { + for partIdx := range payloadParts { + // TODO: each part is handled independently, so this worth concurrent execution. https://github.com/nspcc-dev/neofs-node/issues/3504 + // Note that distributeTarget.distributeObject is not thread-safe. + if err := t.formAndSaveObjectForECPart(signer, obj, ruleIdx, partIdx, payloadParts, nodeList); err != nil { + return fmt.Errorf("form and save object for part %d: %w", partIdx, err) + } + } + + return nil +} + +func (t *distributedTarget) formAndSaveObjectForECPart(signer neofscrypto.Signer, obj object.Object, ruleIdx, partIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error { + partObj, err := iec.FormObjectForECPart(signer, obj, payloadParts[partIdx], iec.PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }) + if err != nil { + return fmt.Errorf("form object for part: %w", err) + } + + var encObj encodedObject + // similar to pkg/services/object/put/distributed.go:95 + if t.localNodeInContainer { + payloadLen := partObj.PayloadSize() + if payloadLen > math.MaxInt { + return fmt.Errorf("too big payload of physically stored for this server %d > %d", payloadLen, math.MaxInt) + } + + hdr := partObj + hdr.SetPayload(nil) + + if t.localOnly { + encObj, err = encodeObjectWithoutPayload(hdr, int(payloadLen)) + } else { + encObj, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, hdr, int(payloadLen), t.metainfoConsistencyAttr != "") + } + if err != nil { + return fmt.Errorf("encode object into binary: %w", err) + } + + defer putPayload(encObj.b) + + encObj.b = append(encObj.b, partObj.Payload()...) + } + + if err := t.saveECPart(partObj, objectcore.ContentMeta{}, encObj, partIdx, len(payloadParts), nodeList); err != nil { + return fmt.Errorf("save part object: %w", err) + } + + return nil +} + +func (t *distributedTarget) saveECPart(part object.Object, objMeta objectcore.ContentMeta, encObj encodedObject, idx, total int, nodeList []netmap.NodeInfo) error { + return t.distributeObject(part, objMeta, encObj, func(obj object.Object, objMeta objectcore.ContentMeta, encObj encodedObject) error { + return t.distributeECPart(obj, objMeta, encObj, idx, total, nodeList) + }) +} + +func (t *distributedTarget) distributeECPart(part object.Object, objMeta objectcore.ContentMeta, enc encodedObject, idx, total int, nodeList []netmap.NodeInfo) error { + var firstErr error + for { + err := t.saveECPartOnNode(part, objMeta, enc, nodeList[idx]) + if err == nil { + return nil + } + + na := slices.Collect(nodeList[idx].NetworkEndpoints()) + if firstErr == nil { + firstErr = fmt.Errorf("save on SN #%d (%s): %w", idx, na, err) + } else { + t.svc.log.Info("failed to save EC part on reserve SN", zap.Error(err), zap.Strings("addresses", na)) + } + + if idx += total; idx >= len(nodeList) { + return errIncompletePut{singleErr: firstErr} + } + } +} + +func (t *distributedTarget) saveECPartOnNode(obj object.Object, objMeta objectcore.ContentMeta, enc encodedObject, node netmap.NodeInfo) error { + var n nodeDesc + n.local = t.svc.neoFSNet.IsLocalNodePublicKey(node.PublicKey()) + if !n.local { + var err error + if n.info, err = convertNodeInfo(node); err != nil { + return fmt.Errorf("convert node info: %w", err) + } + } + + return t.sendObject(obj, objMeta, enc, n) +} diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index cf8a9d5be8..abe9671e05 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -69,7 +69,7 @@ func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore // ValidateAndStoreObjectLocally checks format of given object and, if it's // correct, stores it in the underlying local object storage. Serves operation -// similar to local-only [Service.Put] one. +// similar to local-only [Service.InitPut] one. func (p *Service) ValidateAndStoreObjectLocally(obj object.Object) error { cnrID := obj.GetContainerID() if cnrID.IsZero() { diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 106b1ed966..a4b2b9c068 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -2,69 +2,13 @@ package putsvc import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - containerSDK "github.com/nspcc-dev/neofs-sdk-go/container" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" - "github.com/nspcc-dev/neofs-sdk-go/object" ) -type PutInitPrm struct { - common *util.CommonPrm +// RelayFunc relays request using given connection to SN. +type RelayFunc = func(client.NodeInfo, client.MultiAddressClient) error - hdr *object.Object +type PutInitOptions struct { + CopiesNumber uint32 - cnr containerSDK.Container - - copiesNumber uint32 - - relay func(client.NodeInfo, client.MultiAddressClient) error - - containerNodes ContainerNodes - localNodeInContainer bool - localSignerRFC6979 neofscrypto.Signer - localNodeSigner neofscrypto.Signer -} - -type PutChunkPrm struct { - chunk []byte -} - -func (p *PutInitPrm) WithCommonPrm(v *util.CommonPrm) *PutInitPrm { - if p != nil { - p.common = v - } - - return p -} - -func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm { - if p != nil { - p.hdr = v - } - - return p -} - -func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { - if p != nil { - p.relay = f - } - - return p -} - -func (p *PutInitPrm) WithCopiesNumber(cn uint32) *PutInitPrm { - if p != nil { - p.copiesNumber = cn - } - - return p -} - -func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { - if p != nil { - p.chunk = v - } - - return p + Relay RelayFunc } diff --git a/pkg/services/object/put/res.go b/pkg/services/object/put/res.go deleted file mode 100644 index 920a86a0cd..0000000000 --- a/pkg/services/object/put/res.go +++ /dev/null @@ -1,13 +0,0 @@ -package putsvc - -import ( - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" -) - -type PutResponse struct { - id oid.ID -} - -func (r *PutResponse) ObjectID() oid.ID { - return r.id -} diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index b556a579ff..e08e538d9e 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -3,6 +3,7 @@ package putsvc import ( "context" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -51,6 +52,8 @@ type ContainerNodes interface { // matching storage policy of the container. Nodes are identified by their // public keys and can be repeated in different sets. // + // First PrimaryCounts() sets are for replication, the rest are for ECRules(). + // // Unsorted callers do not change resulting slices and their elements. Unsorted() [][]netmapsdk.NodeInfo // SortForObject sorts container nodes for the referenced object's storage. @@ -63,6 +66,11 @@ type ContainerNodes interface { // - first N nodes of each L are primary data holders while others (if any) // are backup. PrimaryCounts() []uint + // ECRules returns list of erasure coding rules for all objects in the + // container. Same rule may repeat. + // + // ECRules callers do not change resulting slice. + ECRules() []iec.Rule } // NeoFSNetwork provides access to the NeoFS network to get information @@ -138,15 +146,6 @@ func NewService(transport Transport, neoFSNet NeoFSNetwork, m *meta.Meta, opts . } } -func (p *Service) Put(ctx context.Context) (*Streamer, error) { - return &Streamer{ - cfg: p.cfg, - ctx: ctx, - transport: p.transport, - neoFSNet: p.neoFSNet, - }, nil -} - func WithKeyStorage(v *objutil.KeyStorage) Option { return func(c *cfg) { c.keyStorage = v diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index d912150be1..5d70d8c26a 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -2,18 +2,25 @@ package putsvc import ( "bytes" + "cmp" "context" "crypto/sha256" "errors" "fmt" "io" "slices" + "strconv" "sync" "testing" "github.com/google/uuid" + "github.com/klauspost/reedsolomon" + iec "github.com/nspcc-dev/neofs-node/internal/ec" + iobject "github.com/nspcc-dev/neofs-node/internal/object" + islices "github.com/nspcc-dev/neofs-node/internal/slices" "github.com/nspcc-dev/neofs-node/internal/testutil" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-sdk-go/checksum" @@ -68,6 +75,37 @@ func Test_Slicing_REP3(t *testing.T) { } } +func Test_Slicing_EC(t *testing.T) { + rules := []iec.Rule{ + {DataPartNum: 2, ParityPartNum: 2}, + {DataPartNum: 3, ParityPartNum: 1}, + {DataPartNum: 6, ParityPartNum: 3}, + {DataPartNum: 12, ParityPartNum: 4}, + } + + for _, tc := range []struct { + name string + ln uint64 + skip string + }{ + {name: "no payload", ln: 0}, + {name: "1B", ln: 1}, + {name: "limit-1B", ln: maxObjectSize - 1}, + {name: "exactly limit", ln: maxObjectSize}, + {name: "limit+1b", ln: maxObjectSize + 1, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX2", ln: maxObjectSize * 2, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX4-1", ln: maxObjectSize + 4 - 1, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX5", ln: maxObjectSize * 5, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.skip != "" { + t.Skip(tc.skip) + } + testSlicingECRules(t, tc.ln, rules) + }) + } +} + func testSlicingREP3(t *testing.T, ln uint64) { const repNodes = 3 const cnrReserveNodes = 2 @@ -146,6 +184,75 @@ func testSlicingREP3(t *testing.T, ln uint64) { } } +func testSlicingECRules(t *testing.T, ln uint64, rules []iec.Rule) { + maxRule := slices.MaxFunc(rules, func(a, b iec.Rule) int { + return cmp.Compare(a.DataPartNum+a.ParityPartNum, b.DataPartNum+b.ParityPartNum) + }) + + maxTotalParts := int(maxRule.DataPartNum + maxRule.ParityPartNum) + const cnrReserveNodes = 2 + const outCnrNodes = 2 + + cluster := newTestClusterForRepPolicy(t, uint(maxTotalParts), cnrReserveNodes, outCnrNodes) + for i := range cluster.nodeNetworks { + // TODO: add alternative to newTestClusterForRepPolicy for EC instead + cluster.nodeNetworks[i].cnrNodes.repCounts = nil + for range len(rules) - 1 { + cluster.nodeNetworks[i].cnrNodes.unsorted = append(cluster.nodeNetworks[i].cnrNodes.unsorted, cluster.nodeNetworks[i].cnrNodes.unsorted[0]) + cluster.nodeNetworks[i].cnrNodes.sorted = append(cluster.nodeNetworks[i].cnrNodes.sorted, cluster.nodeNetworks[i].cnrNodes.sorted[0]) + } + cluster.nodeNetworks[i].cnrNodes.ecRules = rules + } + + var srcObj object.Object + srcObj.SetContainerID(cidtest.ID()) + srcObj.SetOwner(usertest.ID()) + srcObj.SetAttributes( + object.NewAttribute("attr1", "val1"), + object.NewAttribute("attr2", "val2"), + ) + + var sessionToken session.Object + sessionToken.SetID(uuid.New()) + sessionToken.SetExp(1) + sessionToken.BindContainer(cidtest.ID()) + srcObj.SetPayload(testutil.RandByteSlice(ln)) + + testThroughNode := func(t *testing.T, idx int) { + sessionToken.SetAuthKey(cluster.nodeSessions[idx].signer.Public()) + require.NoError(t, sessionToken.Sign(usertest.User())) + + storeObjectWithSession(t, cluster.nodeServices[idx], srcObj, sessionToken) + + nodeObjLists := cluster.allStoredObjects() + + var restoredObj object.Object + if ln > maxObjectSize { + restoredObj = checkAndCutSplitECObject(t, ln, sessionToken, rules, nodeObjLists) + } else { + restoredObj = checkAndCutUnsplitECObject(t, rules, nodeObjLists) + } + + require.Zero(t, islices.TwoDimSliceElementCount(nodeObjLists)) + + assertObjectIntegrity(t, restoredObj) + require.Equal(t, sessionToken, *restoredObj.SessionToken()) + require.Equal(t, srcObj.GetContainerID(), restoredObj.GetContainerID()) + require.Equal(t, sessionToken.Issuer(), restoredObj.Owner()) + require.EqualValues(t, currentEpoch, restoredObj.CreationEpoch()) + require.Equal(t, object.TypeRegular, restoredObj.Type()) + require.Equal(t, srcObj.Attributes(), restoredObj.Attributes()) + require.False(t, restoredObj.HasParent()) + require.True(t, bytes.Equal(srcObj.Payload(), restoredObj.Payload())) + + cluster.resetAllStoredObjects() + } + + for i := range maxTotalParts + cnrReserveNodes + outCnrNodes { + testThroughNode(t, i) + } +} + func newTestClusterForRepPolicy(t *testing.T, repNodes, cnrReserveNodes, outCnrNodes uint) *testCluster { allNodes := allocNodes([]uint{repNodes + cnrReserveNodes + outCnrNodes})[0] cnrNodes := allNodes[:repNodes+cnrReserveNodes] @@ -166,7 +273,7 @@ func newTestClusterForRepPolicy(t *testing.T, repNodes, cnrReserveNodes, outCnrN for i := range allNodes { nodeKey := neofscryptotest.ECDSAPrivateKey() - nodeWorkerPool, err := ants.NewPool(10, ants.WithNonblocking(true)) + nodeWorkerPool, err := ants.NewPool(len(cnrNodes), ants.WithNonblocking(true)) require.NoError(t, err) cluster.nodeNetworks[i] = mockNetwork{ @@ -240,6 +347,7 @@ type mockContainerNodes struct { unsorted [][]netmap.NodeInfo sorted [][]netmap.NodeInfo repCounts []uint + ecRules []iec.Rule } func (x mockContainerNodes) Unsorted() [][]netmap.NodeInfo { @@ -254,6 +362,10 @@ func (x mockContainerNodes) PrimaryCounts() []uint { return x.repCounts } +func (x mockContainerNodes) ECRules() []iec.Rule { + return x.ecRules +} + type mockMaxSize uint64 func (x mockMaxSize) MaxObjectSize() uint64 { @@ -375,11 +487,6 @@ func (m *serviceClient) ContainerAnnounceUsedSpace(context.Context, []container. } func (m *serviceClient) ObjectPutInit(ctx context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { - stream, err := (*Service)(m).Put(ctx) - if err != nil { - return nil, err - } - // TODO: following is needed because struct parameters privatize some data. Refactor to avoid this. localReq := &protoobject.PutRequest{ MetaHeader: &protosession.RequestMetaHeader{Ttl: 1}, @@ -389,15 +496,14 @@ func (m *serviceClient) ObjectPutInit(ctx context.Context, hdr object.Object, _ panic(err) } - var ip PutInitPrm - ip.WithObject(hdr.CutPayload()) - ip.WithCommonPrm(commonPrm) + var opts PutInitOptions - if err := stream.Init(&ip); err != nil { + pw, err := (*Service)(m).InitPut(ctx, hdr.CutPayload(), commonPrm, opts) + if err != nil { return nil, err } - return (*testPayloadStream)(stream), nil + return &testPayloadStream{PayloadWriter: pw}, nil } func (m *serviceClient) ReplicateObject(context.Context, oid.ID, io.ReadSeeker, neofscrypto.Signer, bool) (*neofscrypto.Signature, error) { @@ -445,17 +551,12 @@ func (m *serviceClient) ForEachGRPCConn(context.Context, func(context.Context, * panic("unimplemented") } -type testPayloadStream Streamer - -func (x *testPayloadStream) Write(p []byte) (int, error) { - if err := (*Streamer)(x).SendChunk(new(PutChunkPrm).WithChunk(p)); err != nil { - return 0, err - } - return len(p), nil +type testPayloadStream struct { + internal.PayloadWriter } func (x *testPayloadStream) Close() error { - _, err := (*Streamer)(x).Close() + _, err := x.PayloadWriter.Close() return err } @@ -485,9 +586,6 @@ func (x *testCluster) resetAllStoredObjects() { } func storeObjectWithSession(t *testing.T, svc *Service, obj object.Object, st session.Object) { - stream, err := svc.Put(context.Background()) - require.NoError(t, err) - req := &protoobject.PutRequest{ MetaHeader: &protosession.RequestMetaHeader{ Ttl: 2, @@ -498,16 +596,14 @@ func storeObjectWithSession(t *testing.T, svc *Service, obj object.Object, st se commonPrm, err := objutil.CommonPrmFromRequest(req) require.NoError(t, err) - ip := new(PutInitPrm). - WithObject(obj.CutPayload()). - WithCommonPrm(commonPrm) - require.NoError(t, stream.Init(ip)) + var opts PutInitOptions + pw, err := svc.InitPut(context.Background(), obj.CutPayload(), commonPrm, opts) + require.NoError(t, err) - cp := new(PutChunkPrm). - WithChunk(obj.Payload()) - require.NoError(t, stream.SendChunk(cp)) + _, err = pw.Write(obj.Payload()) + require.NoError(t, err) - _, err = stream.Close() + _, err = pw.Close() require.NoError(t, err) } @@ -637,3 +733,154 @@ func assertObjectIntegrity(t *testing.T, obj object.Object) { require.Zero(t, obj.SplitID()) } + +func checkAndGetObjectFromECParts(t *testing.T, limit uint64, rule iec.Rule, parts []object.Object) object.Object { + require.Len(t, parts, int(rule.DataPartNum+rule.ParityPartNum)) + + for _, part := range parts { + assertObjectIntegrity(t, part) + require.LessOrEqual(t, part.PayloadSize(), limit) + } + + hdr := checkAndCutParentHeaderFromECPart(t, parts[0]) + + for i := 1; i < len(parts); i++ { + hdrI := checkAndCutParentHeaderFromECPart(t, parts[i]) + require.Equal(t, hdr, hdrI) + } + + payload := checkAndGetPayloadFromECParts(t, hdr.PayloadSize(), rule, parts) + + res := hdr + res.SetPayload(payload) + + return res +} + +func checkAndGetPayloadFromECParts(t *testing.T, ln uint64, rule iec.Rule, parts []object.Object) []byte { + var payloadParts [][]byte + for i := range parts { + payloadParts = append(payloadParts, parts[i].Payload()) + } + + if ln == 0 { + require.Negative(t, slices.IndexFunc(payloadParts, func(e []byte) bool { return len(e) > 0 })) + return nil + } + + enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + require.NoError(t, err) + + ok, err := enc.Verify(payloadParts) + require.NoError(t, err) + require.True(t, ok) + + required := make([]bool, rule.DataPartNum+rule.ParityPartNum) + for i := range rule.DataPartNum { + required[i] = true + } + + for lostCount := 1; lostCount <= int(rule.ParityPartNum); lostCount++ { + for _, lostIdxs := range islices.IndexCombos(len(payloadParts), lostCount) { + brokenParts := islices.NilTwoDimSliceElements(payloadParts, lostIdxs) + require.NoError(t, enc.Reconstruct(brokenParts)) + require.Equal(t, payloadParts, brokenParts) + + brokenParts = islices.NilTwoDimSliceElements(payloadParts, lostIdxs) + require.NoError(t, enc.ReconstructSome(brokenParts, required)) + require.Equal(t, payloadParts[:rule.DataPartNum], brokenParts[:rule.DataPartNum]) + } + } + + for _, lostIdxs := range islices.IndexCombos(len(payloadParts), int(rule.ParityPartNum)+1) { + require.Error(t, enc.Reconstruct(islices.NilTwoDimSliceElements(payloadParts, lostIdxs))) + require.Error(t, enc.ReconstructSome(islices.NilTwoDimSliceElements(payloadParts, lostIdxs), required)) + } + + payload := slices.Concat(payloadParts[:rule.DataPartNum]...) + + require.GreaterOrEqual(t, uint64(len(payload)), ln) + + require.False(t, slices.ContainsFunc(payload[ln:], func(b byte) bool { return b != 0 })) + + return payload[:ln] +} + +func checkAndCutParentHeaderFromECPart(t *testing.T, part object.Object) object.Object { + par := part.Parent() + require.NotNil(t, par) + + require.Equal(t, par.Version(), part.Version()) + require.Equal(t, par.GetContainerID(), part.GetContainerID()) + require.Equal(t, par.Owner(), part.Owner()) + require.Equal(t, par.CreationEpoch(), part.CreationEpoch()) + require.Equal(t, object.TypeRegular, part.Type()) + require.Equal(t, par.SessionToken(), part.SessionToken()) + + return *par +} + +func checkAndGetECPartInfo(t testing.TB, part object.Object) (int, int) { + ruleIdxAttr := iobject.GetAttribute(part, "__NEOFS__EC_RULE_IDX") + require.NotZero(t, ruleIdxAttr) + ruleIdx, err := strconv.Atoi(ruleIdxAttr) + require.NoError(t, err) + require.True(t, ruleIdx >= 0) + + partIdxAttr := iobject.GetAttribute(part, "__NEOFS__EC_PART_IDX") + require.NotZero(t, partIdxAttr) + partIdx, err := strconv.Atoi(partIdxAttr) + require.NoError(t, err) + require.True(t, partIdx >= 0) + + return ruleIdx, partIdx +} + +func checkAndCutSplitECObject(t *testing.T, ln uint64, sessionToken session.Object, rules []iec.Rule, nodeObjLists [][]object.Object) object.Object { + splitPartCount := splitMembersCount(maxObjectSize, ln) + + var expectedCount int + for i := range rules { + expectedCount += int(rules[i].DataPartNum+rules[i].ParityPartNum) * splitPartCount + } + + require.EqualValues(t, expectedCount, islices.TwoDimSliceElementCount(nodeObjLists)) + + var splitParts []object.Object + for range splitPartCount { + splitPart := checkAndCutUnsplitECObject(t, rules, nodeObjLists) + splitParts = append(splitParts, splitPart) + } + + restoredObj := assertSplitChain(t, maxObjectSize, ln, sessionToken, splitParts) + + return restoredObj +} + +func checkAndCutUnsplitECObject(t *testing.T, rules []iec.Rule, nodeObjLists [][]object.Object) object.Object { + ecParts := checkAndCutECPartsForRule(t, 0, rules[0], nodeObjLists) + restoredObj := checkAndGetObjectFromECParts(t, maxObjectSize, rules[0], ecParts) + + for i := 1; i < len(rules); i++ { + ecPartsI := checkAndCutECPartsForRule(t, i, rules[i], nodeObjLists) + restoredObjI := checkAndGetObjectFromECParts(t, maxObjectSize, rules[i], ecPartsI) + require.Equal(t, restoredObj, restoredObjI) + } + + return restoredObj +} + +func checkAndCutECPartsForRule(t *testing.T, ruleIdx int, rule iec.Rule, nodeObjLists [][]object.Object) []object.Object { + var parts []object.Object + + for i := range rule.DataPartNum + rule.ParityPartNum { + gotRuleIdx, partIdx := checkAndGetECPartInfo(t, nodeObjLists[i][0]) + require.EqualValues(t, ruleIdx, gotRuleIdx) + require.EqualValues(t, i, partIdx) + + parts = append(parts, nodeObjLists[i][0]) + nodeObjLists[i] = nodeObjLists[i][1:] + } + + return parts +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index a30f8b0044..623c8eadce 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - "github.com/nspcc-dev/neofs-node/pkg/core/client" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -14,79 +14,124 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/user" ) -type Streamer struct { - *cfg - - ctx context.Context - - target internal.Target - - relay func(client.NodeInfo, client.MultiAddressClient) error +func (p *Service) InitPut(ctx context.Context, hdr *object.Object, cp *util.CommonPrm, opts PutInitOptions) (internal.PayloadWriter, error) { + // initialize destination target + target, err := p.initTarget(ctx, hdr, cp, opts) + if err != nil { + return nil, err + } - maxPayloadSz uint64 // network config + if err := target.WriteHeader(hdr); err != nil { + return nil, err + } - transport Transport - neoFSNet NeoFSNetwork + return target, nil } -var errNotInit = errors.New("stream not initialized") - -var errInitRecall = errors.New("init recall") +func (p *Service) initTarget(ctx context.Context, hdr *object.Object, cp *util.CommonPrm, opts PutInitOptions) (internal.Target, error) { + localOnly := cp.LocalOnly() + if localOnly && opts.CopiesNumber > 1 { + return nil, errors.New("storage of multiple object replicas is requested for a local operation") + } -func (p *Streamer) Init(prm *PutInitPrm) error { - // initialize destination target - if err := p.initTarget(prm); err != nil { - return fmt.Errorf("(%T) could not initialize object target: %w", p, err) + localNodeKey, err := p.keyStorage.GetKey(nil) + if err != nil { + return nil, fmt.Errorf("get local node's private key: %w", err) } - if err := p.target.WriteHeader(prm.hdr); err != nil { - return fmt.Errorf("(%T) could not write header to target: %w", p, err) + idCnr := hdr.GetContainerID() + if idCnr.IsZero() { + return nil, errors.New("missing container ID") } - return nil -} -// MaxObjectSize returns maximum payload size for the streaming session. -// -// Must be called after the successful Init. -func (p *Streamer) MaxObjectSize() uint64 { - return p.maxPayloadSz -} + // get container to store the object + cnr, err := p.cnrSrc.Get(idCnr) + if err != nil { + return nil, fmt.Errorf("(%T) could not get container by ID: %w", p, err) + } -func (p *Streamer) initTarget(prm *PutInitPrm) error { - // prevent re-calling - if p.target != nil { - return errInitRecall + containerNodes, err := p.neoFSNet.GetContainerNodes(idCnr) + if err != nil { + return nil, fmt.Errorf("select storage nodes for the container: %w", err) } - // prepare needed put parameters - if err := p.preparePrm(prm); err != nil { - return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err) + cnrNodes := containerNodes.Unsorted() + ecRulesN := len(containerNodes.ECRules()) + + var localNodeInContainer bool + var ecPart iec.PartInfo + if ecRulesN > 0 { + ecPart, err = iec.GetPartInfo(*hdr) + if err != nil { + return nil, fmt.Errorf("get EC part info from object header: %w", err) + } + + repRulesN := len(containerNodes.PrimaryCounts()) + if ecPart.Index >= 0 { + if ecPart.RuleIndex >= ecRulesN { + return nil, fmt.Errorf("invalid EC part info in object header: EC rule idx=%d with %d rules in total", ecPart.RuleIndex, ecRulesN) + } + if hdr.Signature() == nil { + return nil, errors.New("unsigned EC part object") + } + localNodeInContainer = localNodeInSet(p.neoFSNet, cnrNodes[repRulesN+ecPart.RuleIndex]) + } else { + if repRulesN == 0 && hdr.Signature() != nil { + return nil, errors.New("missing EC part info in signed object") + } + localNodeInContainer = localNodeInSets(p.neoFSNet, cnrNodes) + } + } else { + localNodeInContainer = localNodeInSets(p.neoFSNet, cnrNodes) + } + if !localNodeInContainer && localOnly { + return nil, errors.New("local operation on the node not compliant with the container storage policy") } - p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize() - if p.maxPayloadSz == 0 { - return fmt.Errorf("(%T) could not obtain max object size parameter", p) + maxPayloadSz := p.maxSizeSrc.MaxObjectSize() + if maxPayloadSz == 0 { + return nil, fmt.Errorf("(%T) could not obtain max object size parameter", p) } - homomorphicChecksumRequired := !prm.cnr.IsHomomorphicHashingDisabled() + homomorphicChecksumRequired := !cnr.IsHomomorphicHashingDisabled() - if prm.hdr.Signature() != nil { - p.relay = prm.relay + target := &distributedTarget{ + svc: p, + localNodeSigner: (*neofsecdsa.Signer)(localNodeKey), + metaSigner: (*neofsecdsa.SignerRFC6979)(localNodeKey), + opCtx: ctx, + commonPrm: cp, + localOnly: cp.LocalOnly(), + linearReplNum: uint(opts.CopiesNumber), + metainfoConsistencyAttr: metaAttribute(cnr), + containerNodes: containerNodes, + localNodeInContainer: localNodeInContainer, + ecPart: ecPart, + } + if hdr.Signature() != nil { // prepare untrusted-Put object target - p.target = &validatingTarget{ - nextTarget: p.newCommonTarget(prm), + if opts.Relay != nil { + target.relay = func(node nodeDesc) error { + c, err := p.clientConstructor.Get(node.info) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", node.info.AddressGroup(), err) + } + + return opts.Relay(node.info, c) + } + } + return &validatingTarget{ + nextTarget: target, fmt: p.fmtValidator, - maxPayloadSz: p.maxPayloadSz, + maxPayloadSz: maxPayloadSz, homomorphicChecksumRequired: homomorphicChecksumRequired, - } - - return nil + }, nil } - sToken := prm.common.SessionToken() + sToken := cp.SessionToken() // prepare trusted-Put object target @@ -102,7 +147,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { sessionKey, err := p.keyStorage.GetKey(sessionInfo) if err != nil { - return fmt.Errorf("(%T) could not receive session key: %w", p, err) + return nil, fmt.Errorf("(%T) could not receive session key: %w", p, err) } signer := neofsecdsa.SignerRFC6979(*sessionKey) @@ -110,154 +155,33 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { // In case session token is missing, the line above returns the default key. // If it isn't owner key, replication attempts will fail, thus this check. if sToken == nil { - ownerObj := prm.hdr.Owner() + ownerObj := hdr.Owner() if ownerObj.IsZero() { - return errors.New("missing object owner") + return nil, errors.New("missing object owner") } ownerSession := user.NewFromECDSAPublicKey(signer.PublicKey) if ownerObj != ownerSession { - return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) + return nil, fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) } } - p.target = &validatingTarget{ + sessionSigner := user.NewAutoIDSigner(*sessionKey) + target.sessionSigner = sessionSigner + return &validatingTarget{ fmt: p.fmtValidator, unpreparedObject: true, nextTarget: newSlicingTarget( - p.ctx, - p.maxPayloadSz, + ctx, + maxPayloadSz, !homomorphicChecksumRequired, - user.NewAutoIDSigner(*sessionKey), + sessionSigner, sToken, p.networkState.CurrentEpoch(), - p.newCommonTarget(prm), + target, ), homomorphicChecksumRequired: homomorphicChecksumRequired, - } - - return nil -} - -func (p *Streamer) preparePrm(prm *PutInitPrm) error { - localOnly := prm.common.LocalOnly() - if localOnly && prm.copiesNumber > 1 { - return errors.New("storage of multiple object replicas is requested for a local operation") - } - - localNodeKey, err := p.keyStorage.GetKey(nil) - if err != nil { - return fmt.Errorf("get local node's private key: %w", err) - } - - idCnr := prm.hdr.GetContainerID() - if idCnr.IsZero() { - return errors.New("missing container ID") - } - - // get container to store the object - prm.cnr, err = p.cnrSrc.Get(idCnr) - if err != nil { - return fmt.Errorf("(%T) could not get container by ID: %w", p, err) - } - - prm.containerNodes, err = p.neoFSNet.GetContainerNodes(idCnr) - if err != nil { - return fmt.Errorf("select storage nodes for the container: %w", err) - } - cnrNodes := prm.containerNodes.Unsorted() -nextSet: - for i := range cnrNodes { - for j := range cnrNodes[i] { - prm.localNodeInContainer = p.neoFSNet.IsLocalNodePublicKey(cnrNodes[i][j].PublicKey()) - if prm.localNodeInContainer { - break nextSet - } - } - } - if !prm.localNodeInContainer && localOnly { - return errors.New("local operation on the node not compliant with the container storage policy") - } - - prm.localNodeSigner = (*neofsecdsa.Signer)(localNodeKey) - prm.localSignerRFC6979 = (*neofsecdsa.SignerRFC6979)(localNodeKey) - - return nil -} - -func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { - var relay func(nodeDesc) error - if p.relay != nil { - relay = func(node nodeDesc) error { - c, err := p.clientConstructor.Get(node.info) - if err != nil { - return fmt.Errorf("could not create SDK client %s: %w", node.info.AddressGroup(), err) - } - - return p.relay(node.info, c) - } - } - - // enable additional container broadcast on non-local operation - // if object has TOMBSTONE or LOCK type. - typ := prm.hdr.Type() - localOnly := prm.common.LocalOnly() - withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) - - return &distributedTarget{ - opCtx: p.ctx, - fsState: p.networkState, - networkMagicNumber: p.networkMagic, - metaSvc: p.metaSvc, - placementIterator: placementIterator{ - log: p.log, - neoFSNet: p.neoFSNet, - remotePool: p.remotePool, - containerNodes: prm.containerNodes, - linearReplNum: uint(prm.copiesNumber), - broadcast: withBroadcast, - }, - localStorage: p.localStore, - keyStorage: p.keyStorage, - commonPrm: prm.common, - clientConstructor: p.clientConstructor, - transport: p.transport, - relay: relay, - fmt: p.fmtValidator, - localNodeInContainer: prm.localNodeInContainer, - localNodeSigner: prm.localNodeSigner, - cnrClient: p.cfg.cnrClient, - metainfoConsistencyAttr: metaAttribute(prm.cnr), - metaSigner: prm.localSignerRFC6979, - localOnly: localOnly, - } -} - -func (p *Streamer) SendChunk(prm *PutChunkPrm) error { - if p.target == nil { - return errNotInit - } - - if _, err := p.target.Write(prm.chunk); err != nil { - return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err) - } - - return nil -} - -func (p *Streamer) Close() (*PutResponse, error) { - if p.target == nil { - return nil, errNotInit - } - - id, err := p.target.Close() - if err != nil { - return nil, fmt.Errorf("(%T) could not close object target: %w", p, err) - } - - return &PutResponse{ - id: id, }, nil } diff --git a/pkg/services/object/put/util.go b/pkg/services/object/put/util.go new file mode 100644 index 0000000000..380bcf2c14 --- /dev/null +++ b/pkg/services/object/put/util.go @@ -0,0 +1,19 @@ +package putsvc + +import ( + "slices" + + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +func localNodeInSets(n NeoFSNetwork, ss [][]netmap.NodeInfo) bool { + return slices.ContainsFunc(ss, func(s []netmap.NodeInfo) bool { + return localNodeInSet(n, s) + }) +} + +func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool { + return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool { + return n.IsLocalNodePublicKey(node.PublicKey()) + }) +} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 7e8c7445a7..16852dee99 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -50,11 +50,15 @@ import ( "google.golang.org/grpc" ) +type putHandler interface { + InitPut(context.Context, *object.Object, *objutil.CommonPrm, putsvc.PutInitOptions) (internal.PayloadWriter, error) +} + // Handlers represents storage node's internal handler Object service op // payloads. type Handlers interface { Get(context.Context, getsvc.Prm) error - Put(context.Context) (*putsvc.Streamer, error) + putHandler Head(context.Context, getsvc.HeadPrm) error Search(context.Context, searchsvc.Prm) error Delete(context.Context, deletesvc.Prm) error @@ -248,7 +252,9 @@ func (s *Server) sendStatusPutResponse(stream protoobject.ObjectService_PutServe type putStream struct { ctx context.Context signer ecdsa.PrivateKey - base *putsvc.Streamer + base putHandler + + payloadWriter internal.PayloadWriter cacheReqs bool initReq *protoobject.PutRequest @@ -257,7 +263,7 @@ type putStream struct { expBytes, recvBytes uint64 // payload } -func newIntermediatePutStream(signer ecdsa.PrivateKey, base *putsvc.Streamer, ctx context.Context) *putStream { +func newIntermediatePutStream(signer ecdsa.PrivateKey, base putHandler, ctx context.Context) *putStream { return &putStream{ ctx: ctx, signer: signer, @@ -326,7 +332,7 @@ func (x *putStream) resignRequest(req *protoobject.PutRequest) (*protoobject.Put func (x *putStream) forwardRequest(req *protoobject.PutRequest) error { switch v := req.GetBody().GetObjectPart().(type) { default: - return fmt.Errorf("invalid object put stream part type %T", v) + panic(fmt.Errorf("invalid object put stream part type %T", v)) case *protoobject.PutRequest_Body_Init_: if v == nil || v.Init == nil { // TODO: seems like this is done several times, deduplicate return errors.New("nil oneof field with heading part") @@ -348,12 +354,10 @@ func (x *putStream) forwardRequest(req *protoobject.PutRequest) error { return err } - var p putsvc.PutInitPrm - p.WithCommonPrm(cp) - p.WithObject(obj) - p.WithCopiesNumber(v.Init.CopiesNumber) - p.WithRelay(x.sendToRemoteNode) - if err = x.base.Init(&p); err != nil { + var opts putsvc.PutInitOptions + opts.CopiesNumber = v.Init.CopiesNumber + opts.Relay = x.sendToRemoteNode + if x.payloadWriter, err = x.base.InitPut(x.ctx, obj, cp, opts); err != nil { return fmt.Errorf("could not init object put stream: %w", err) } @@ -362,9 +366,6 @@ func (x *putStream) forwardRequest(req *protoobject.PutRequest) error { } x.expBytes = v.Init.Header.GetPayloadLength() - if m := x.base.MaxObjectSize(); x.expBytes > m { - return putsvc.ErrExceedingMaxSize - } signed, err := x.resignRequest(req) // TODO: resign only when needed if err != nil { return err // TODO: add context @@ -377,7 +378,7 @@ func (x *putStream) forwardRequest(req *protoobject.PutRequest) error { return putsvc.ErrWrongPayloadSize } } - if err := x.base.SendChunk(new(putsvc.PutChunkPrm).WithChunk(c)); err != nil { + if _, err := x.payloadWriter.Write(c); err != nil { return fmt.Errorf("could not send payload chunk: %w", err) } if !x.cacheReqs { @@ -397,12 +398,11 @@ func (x *putStream) close() (*protoobject.PutResponse, error) { return nil, putsvc.ErrWrongPayloadSize } - resp, err := x.base.Close() + id, err := x.payloadWriter.Close() if err != nil { return nil, fmt.Errorf("could not object put stream: %w", err) } - id := resp.ObjectID() return &protoobject.PutResponse{ Body: &protoobject.PutResponse_Body{ ObjectId: id.ProtoMessage(), @@ -412,20 +412,22 @@ func (x *putStream) close() (*protoobject.PutResponse, error) { func (s *Server) Put(gStream protoobject.ObjectService_PutServer) error { t := time.Now() - stream, err := s.handlers.Put(gStream.Context()) + var err error defer func() { s.pushOpExecResult(stat.MethodObjectPut, err, t) }() - if err != nil { - return err - } var req *protoobject.PutRequest var resp *protoobject.PutResponse + var headerWas bool - ps := newIntermediatePutStream(s.signer, stream, gStream.Context()) + ps := newIntermediatePutStream(s.signer, s.handlers, gStream.Context()) for { if req, err = gStream.Recv(); err != nil { if errors.Is(err, io.EOF) { + if !headerWas { + return s.sendStatusPutResponse(gStream, errors.New("stream is closed without messages")) + } + resp, err = ps.close() if err != nil { return s.sendStatusPutResponse(gStream, err) @@ -437,8 +439,26 @@ func (s *Server) Put(gStream protoobject.ObjectService_PutServer) error { return err } - if c := req.GetBody().GetChunk(); c != nil { - s.metrics.AddPutPayload(len(c)) + switch v := req.GetBody().GetObjectPart().(type) { + default: + err = fmt.Errorf("invalid object put stream part type %T", v) + case *protoobject.PutRequest_Body_Init_: + if headerWas { + err = errors.New("repeated message with object header") + break + } + headerWas = true + case *protoobject.PutRequest_Body_Chunk: + if !headerWas { + err = errors.New("message with payload chunk before object header") + break + } + + s.metrics.AddPutPayload(len(v.Chunk)) + } + if err != nil { + err = s.sendStatusPutResponse(gStream, err) // assign for defer + return err } if err = icrypto.VerifyRequestSignaturesN3(req, s.fsChain); err != nil { diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index ff7132bb7d..ecda980e91 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -24,8 +24,10 @@ import ( v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -59,7 +61,7 @@ func (x noCallObjectService) Get(context.Context, getsvc.Prm) error { panic("must not be called") } -func (x noCallObjectService) Put(context.Context) (*putsvc.Streamer, error) { +func (x noCallObjectService) InitPut(context.Context, *object.Object, *util.CommonPrm, putsvc.PutInitOptions) (internal.PayloadWriter, error) { panic("must not be called") }