1
1
package meta
2
2
3
3
import (
4
+ "bytes"
4
5
"fmt"
5
6
6
7
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
@@ -31,6 +32,8 @@ const (
31
32
type ReviveStatus struct {
32
33
statusType reviveStatusType
33
34
message string
35
+ // tombstoneAddr holds the address of the tombstone used to inhume the object (if any).
36
+ tombstoneAddr oid.Address
34
37
}
35
38
36
39
// Message returns message of status.
@@ -43,6 +46,11 @@ func (s *ReviveStatus) StatusType() reviveStatusType {
43
46
return s .statusType
44
47
}
45
48
49
+ // TombstoneAddress returns the tombstone address.
50
+ func (s * ReviveStatus ) TombstoneAddress () oid.Address {
51
+ return s .tombstoneAddr
52
+ }
53
+
46
54
func (s * ReviveStatus ) setStatusGraveyard (tomb string ) {
47
55
s .statusType = ReviveStatusGraveyard
48
56
s .message = fmt .Sprintf ("successful revival from graveyard, tomb: %s" , tomb )
@@ -73,6 +81,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
73
81
}
74
82
75
83
currEpoch := db .epochState .CurrentEpoch ()
84
+ cnr := addr .Container ()
76
85
77
86
err = db .boltDB .Update (func (tx * bbolt.Tx ) error {
78
87
garbageObjectsBKT := tx .Bucket (garbageObjectsBucketName )
@@ -101,6 +110,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
101
110
return err
102
111
}
103
112
res .setStatusGraveyard (tombAddress .EncodeToString ())
113
+ res .tombstoneAddr = tombAddress
104
114
} else {
105
115
val = garbageContainersBKT .Get (targetKey [:cidSize ])
106
116
if val != nil {
@@ -116,6 +126,20 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
116
126
// nor was marked with GC mark
117
127
return ErrObjectWasNotRemoved
118
128
}
129
+
130
+ metaBucket := tx .Bucket (metaBucketKey (cnr ))
131
+ var metaCursor * bbolt.Cursor
132
+ if metaBucket != nil {
133
+ metaCursor = metaBucket .Cursor ()
134
+ }
135
+
136
+ tombID , err := getTombstoneByAssociatedObject (metaCursor , addr .Object ())
137
+ if err != nil {
138
+ return fmt .Errorf ("iterate covered by tombstones: %w" , err )
139
+ }
140
+ if ! tombID .IsZero () {
141
+ res .tombstoneAddr = oid .NewAddress (cnr , tombID )
142
+ }
119
143
}
120
144
121
145
if err := garbageObjectsBKT .Delete (targetKey ); err != nil {
@@ -126,7 +150,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
126
150
// if object is stored, and it is regular object then update bucket
127
151
// with container size estimations
128
152
if obj .Type () == object .TypeRegular {
129
- if err := changeContainerInfo (tx , addr . Container () , int (obj .PayloadSize ()), 1 ); err != nil {
153
+ if err := changeContainerInfo (tx , cnr , int (obj .PayloadSize ()), 1 ); err != nil {
130
154
return err
131
155
}
132
156
}
@@ -145,3 +169,42 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
145
169
146
170
return
147
171
}
172
+
173
+ // getTombstoneByAssociatedObject iterates meta index to find tombstone object
174
+ // associated with the provided object ID. If found, returns its ID, otherwise
175
+ // returns zero ID and nil error.
176
+ func getTombstoneByAssociatedObject (metaCursor * bbolt.Cursor , idObj oid.ID ) (oid.ID , error ) {
177
+ var id oid.ID
178
+ if metaCursor == nil {
179
+ return id , fmt .Errorf ("nil meta cursor" )
180
+ }
181
+
182
+ var (
183
+ typString = object .TypeTombstone .String ()
184
+ idStr = idObj .EncodeToString ()
185
+ accPrefix = make ([]byte , 1 + len (object .AttributeAssociatedObject )+ 1 + len (idStr )+ 1 )
186
+ typeKey = make ([]byte , metaIDTypePrefixSize + len (typString ))
187
+ expirationPrefix = make ([]byte , attrIDFixedLen + len (object .AttributeExpirationEpoch ))
188
+ )
189
+
190
+ expirationPrefix [0 ] = metaPrefixIDAttr
191
+ copy (expirationPrefix [1 + oid .Size :], object .AttributeExpirationEpoch )
192
+
193
+ accPrefix [0 ] = metaPrefixAttrIDPlain
194
+ copy (accPrefix [1 :], object .AttributeAssociatedObject )
195
+ copy (accPrefix [1 + len (object .AttributeAssociatedObject )+ 1 :], idStr )
196
+
197
+ fillIDTypePrefix (typeKey )
198
+ copy (typeKey [metaIDTypePrefixSize :], typString )
199
+
200
+ for k , _ := metaCursor .Seek (accPrefix ); bytes .HasPrefix (k , accPrefix ); k , _ = metaCursor .Next () {
201
+ mainObj := k [len (accPrefix ):]
202
+ copy (typeKey [1 :], mainObj )
203
+
204
+ if metaCursor .Bucket ().Get (typeKey ) != nil {
205
+ return id , id .Decode (mainObj )
206
+ }
207
+ }
208
+
209
+ return id , nil
210
+ }
0 commit comments