1
1
package meta
2
2
3
3
import (
4
+ "bytes"
4
5
"fmt"
5
6
6
7
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
@@ -31,6 +32,9 @@ const (
31
32
type ReviveStatus struct {
32
33
statusType reviveStatusType
33
34
message string
35
+ // tombstoneAddr holds the address of the tombstone used to inhume the object (if any).
36
+ // It is set only when revival happened from a graveyard.
37
+ tombstoneAddr oid.Address
34
38
}
35
39
36
40
// Message returns message of status.
@@ -43,6 +47,11 @@ func (s *ReviveStatus) StatusType() reviveStatusType {
43
47
return s .statusType
44
48
}
45
49
50
+ // TombstoneAddress returns the tombstone address.
51
+ func (s * ReviveStatus ) TombstoneAddress () oid.Address {
52
+ return s .tombstoneAddr
53
+ }
54
+
46
55
func (s * ReviveStatus ) setStatusGraveyard (tomb string ) {
47
56
s .statusType = ReviveStatusGraveyard
48
57
s .message = fmt .Sprintf ("successful revival from graveyard, tomb: %s" , tomb )
@@ -73,6 +82,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
73
82
}
74
83
75
84
currEpoch := db .epochState .CurrentEpoch ()
85
+ cnr := addr .Container ()
76
86
77
87
err = db .boltDB .Update (func (tx * bbolt.Tx ) error {
78
88
garbageObjectsBKT := tx .Bucket (garbageObjectsBucketName )
@@ -101,6 +111,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
101
111
return err
102
112
}
103
113
res .setStatusGraveyard (tombAddress .EncodeToString ())
114
+ res .tombstoneAddr = tombAddress
104
115
} else {
105
116
val = garbageContainersBKT .Get (targetKey [:cidSize ])
106
117
if val != nil {
@@ -116,6 +127,20 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
116
127
// nor was marked with GC mark
117
128
return ErrObjectWasNotRemoved
118
129
}
130
+
131
+ metaBucket := tx .Bucket (metaBucketKey (cnr ))
132
+ var metaCursor * bbolt.Cursor
133
+ if metaBucket != nil {
134
+ metaCursor = metaBucket .Cursor ()
135
+ }
136
+
137
+ tombID , err := getTombstoneByAssociatedObject (metaCursor , addr .Object ())
138
+ if err != nil {
139
+ return fmt .Errorf ("iterate covered by tombstones: %w" , err )
140
+ }
141
+ if ! tombID .IsZero () {
142
+ res .tombstoneAddr = oid .NewAddress (cnr , tombID )
143
+ }
119
144
}
120
145
121
146
if err := garbageObjectsBKT .Delete (targetKey ); err != nil {
@@ -126,7 +151,7 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
126
151
// if object is stored, and it is regular object then update bucket
127
152
// with container size estimations
128
153
if obj .Type () == object .TypeRegular {
129
- if err := changeContainerInfo (tx , addr . Container () , int (obj .PayloadSize ()), 1 ); err != nil {
154
+ if err := changeContainerInfo (tx , cnr , int (obj .PayloadSize ()), 1 ); err != nil {
130
155
return err
131
156
}
132
157
}
@@ -145,3 +170,42 @@ func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
145
170
146
171
return
147
172
}
173
+
174
+ // getTombstoneByAssociatedObject iterates meta index to find tombstone object
175
+ // associated with the provided object ID. If found, returns its ID, otherwise
176
+ // returns zero ID and nil error.
177
+ func getTombstoneByAssociatedObject (metaCursor * bbolt.Cursor , idObj oid.ID ) (oid.ID , error ) {
178
+ var id oid.ID
179
+ if metaCursor == nil {
180
+ return id , fmt .Errorf ("nil meta cursor" )
181
+ }
182
+
183
+ var (
184
+ typString = object .TypeTombstone .String ()
185
+ idStr = idObj .EncodeToString ()
186
+ accPrefix = make ([]byte , 1 + len (object .AttributeAssociatedObject )+ 1 + len (idStr )+ 1 )
187
+ typeKey = make ([]byte , metaIDTypePrefixSize + len (typString ))
188
+ expirationPrefix = make ([]byte , attrIDFixedLen + len (object .AttributeExpirationEpoch ))
189
+ )
190
+
191
+ expirationPrefix [0 ] = metaPrefixIDAttr
192
+ copy (expirationPrefix [1 + oid .Size :], object .AttributeExpirationEpoch )
193
+
194
+ accPrefix [0 ] = metaPrefixAttrIDPlain
195
+ copy (accPrefix [1 :], object .AttributeAssociatedObject )
196
+ copy (accPrefix [1 + len (object .AttributeAssociatedObject )+ 1 :], idStr )
197
+
198
+ fillIDTypePrefix (typeKey )
199
+ copy (typeKey [metaIDTypePrefixSize :], typString )
200
+
201
+ for k , _ := metaCursor .Seek (accPrefix ); bytes .HasPrefix (k , accPrefix ); k , _ = metaCursor .Next () {
202
+ mainObj := k [len (accPrefix ):]
203
+ copy (typeKey [1 :], mainObj )
204
+
205
+ if metaCursor .Bucket ().Get (typeKey ) != nil {
206
+ return id , id .Decode (mainObj )
207
+ }
208
+ }
209
+
210
+ return id , nil
211
+ }
0 commit comments