Skip to content

Commit 5bc2e16

Browse files
committed
Allow setting TTL and User metadata at the same time.
We expose a public struct Entry, which can be used to set the key, value, user metadata and TTL all at the same time.
1 parent bc45f5c commit 5bc2e16

File tree

7 files changed

+58
-56
lines changed

7 files changed

+58
-56
lines changed

backup.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
7575
func (db *DB) Load(r io.Reader) error {
7676
br := bufio.NewReaderSize(r, 16<<10)
7777
unmarshalBuf := make([]byte, 1<<10)
78-
var entries []*entry
78+
var entries []*Entry
7979
var wg sync.WaitGroup
8080
errChan := make(chan error, 1)
8181

8282
// func to check for pending error before sending off a batch for writing
83-
batchSetAsyncIfNoErr := func(entries []*entry) error {
83+
batchSetAsyncIfNoErr := func(entries []*Entry) error {
8484
select {
8585
case err := <-errChan:
8686
return err
@@ -118,7 +118,7 @@ func (db *DB) Load(r io.Reader) error {
118118
if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil {
119119
return err
120120
}
121-
entries = append(entries, &entry{
121+
entries = append(entries, &Entry{
122122
Key: y.KeyWithTs(e.Key, e.Version),
123123
Value: e.Value,
124124
UserMeta: e.UserMeta[0],

db.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const (
7878
kvWriteChCapacity = 1000
7979
)
8080

81-
func replayFunction(out *DB) func(entry, valuePointer) error {
81+
func replayFunction(out *DB) func(Entry, valuePointer) error {
8282
type txnEntry struct {
8383
nk []byte
8484
v y.ValueStruct
@@ -96,7 +96,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error {
9696
}
9797

9898
first := true
99-
return func(e entry, vp valuePointer) error { // Function for replaying.
99+
return func(e Entry, vp valuePointer) error { // Function for replaying.
100100
if first {
101101
out.elog.Printf("First key=%s\n", e.Key)
102102
}
@@ -482,7 +482,7 @@ var requestPool = sync.Pool{
482482
},
483483
}
484484

485-
func (db *DB) shouldWriteValueToLSM(e entry) bool {
485+
func (db *DB) shouldWriteValueToLSM(e Entry) bool {
486486
return len(e.Value) < db.opt.ValueThreshold
487487
}
488488

@@ -567,7 +567,7 @@ func (db *DB) writeRequests(reqs []*request) error {
567567
return nil
568568
}
569569

570-
func (db *DB) sendToWriteCh(entries []*entry) (*request, error) {
570+
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
571571
var count, size int64
572572
for _, e := range entries {
573573
size += int64(e.estimateSize(db.opt.ValueThreshold))
@@ -652,7 +652,7 @@ func (db *DB) doWrites(lc *y.Closer) {
652652
// batchSet applies a list of badger.Entry. If a request level error occurs it
653653
// will be returned.
654654
// Check(kv.BatchSet(entries))
655-
func (db *DB) batchSet(entries []*entry) error {
655+
func (db *DB) batchSet(entries []*Entry) error {
656656
req, err := db.sendToWriteCh(entries)
657657
if err != nil {
658658
return err
@@ -671,7 +671,7 @@ func (db *DB) batchSet(entries []*entry) error {
671671
// err := kv.BatchSetAsync(entries, func(err error)) {
672672
// Check(err)
673673
// }
674-
func (db *DB) batchSetAsync(entries []*entry, f func(error)) error {
674+
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
675675
req, err := db.sendToWriteCh(entries)
676676
if err != nil {
677677
return err
@@ -881,7 +881,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
881881
opts.PrefetchValues = false
882882
it := txn.NewIterator(opts)
883883

884-
var entries []*entry
884+
var entries []*Entry
885885

886886
for it.Seek(key); it.ValidForPrefix(key); it.Next() {
887887
item := it.Item()
@@ -891,7 +891,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
891891

892892
// Found an older version. Mark for deletion
893893
entries = append(entries,
894-
&entry{
894+
&Entry{
895895
Key: y.KeyWithTs(key, item.version),
896896
meta: bitDelete,
897897
})
@@ -913,14 +913,14 @@ func (db *DB) PurgeOlderVersions() error {
913913
opts.PrefetchValues = false
914914
it := txn.NewIterator(opts)
915915

916-
var entries []*entry
916+
var entries []*Entry
917917
var lastKey []byte
918918
var count int
919919
var wg sync.WaitGroup
920920
errChan := make(chan error, 1)
921921

922922
// func to check for pending error before sending off a batch for writing
923-
batchSetAsyncIfNoErr := func(entries []*entry) error {
923+
batchSetAsyncIfNoErr := func(entries []*Entry) error {
924924
select {
925925
case err := <-errChan:
926926
return err
@@ -946,7 +946,7 @@ func (db *DB) PurgeOlderVersions() error {
946946
}
947947
// Found an older version. Mark for deletion
948948
entries = append(entries,
949-
&entry{
949+
&Entry{
950950
Key: y.KeyWithTs(lastKey, item.version),
951951
meta: bitDelete,
952952
})
@@ -959,7 +959,7 @@ func (db *DB) PurgeOlderVersions() error {
959959
return err
960960
}
961961
count = 0
962-
entries = []*entry{}
962+
entries = []*Entry{}
963963
}
964964
}
965965

structs.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ func (h *header) Decode(buf []byte) {
7676
h.userMeta = buf[17]
7777
}
7878

79-
// entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data.
80-
type entry struct {
79+
// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data.
80+
type Entry struct {
8181
Key []byte
8282
Value []byte
8383
UserMeta byte
@@ -88,15 +88,15 @@ type entry struct {
8888
offset uint32
8989
}
9090

91-
func (e *entry) estimateSize(threshold int) int {
91+
func (e *Entry) estimateSize(threshold int) int {
9292
if len(e.Value) < threshold {
9393
return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta
9494
}
9595
return len(e.Key) + 12 + 2 // 12 for ValuePointer, 2 for metas.
9696
}
9797

9898
// Encodes e to buf. Returns number of bytes written.
99-
func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) {
99+
func encodeEntry(e *Entry, buf *bytes.Buffer) (int, error) {
100100
h := header{
101101
klen: uint32(len(e.Key)),
102102
vlen: uint32(len(e.Value)),
@@ -126,7 +126,7 @@ func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) {
126126
return len(headerEnc) + len(e.Key) + len(e.Value) + len(crcBuf), nil
127127
}
128128

129-
func (e entry) print(prefix string) {
129+
func (e Entry) print(prefix string) {
130130
fmt.Printf("%s Key: %s Meta: %d UserMeta: %d Offset: %d len(val)=%d",
131131
prefix, e.Key, e.meta, e.UserMeta, e.offset, len(e.Value))
132132
}

transaction.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ type Txn struct {
185185
reads []uint64 // contains fingerprints of keys read.
186186
writes []uint64 // contains fingerprints of keys written.
187187

188-
pendingWrites map[string]*entry // cache stores any writes done by txn.
188+
pendingWrites map[string]*Entry // cache stores any writes done by txn.
189189

190190
db *DB
191191
callbacks []func()
@@ -196,7 +196,7 @@ type Txn struct {
196196
}
197197

198198
type pendingWritesIterator struct {
199-
entries []*entry
199+
entries []*Entry
200200
nextIdx int
201201
readTs uint64
202202
reversed bool
@@ -251,7 +251,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
251251
if !txn.update || len(txn.pendingWrites) == 0 {
252252
return nil
253253
}
254-
entries := make([]*entry, 0, len(txn.pendingWrites))
254+
entries := make([]*Entry, 0, len(txn.pendingWrites))
255255
for _, e := range txn.pendingWrites {
256256
entries = append(entries, e)
257257
}
@@ -270,7 +270,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
270270
}
271271
}
272272

273-
func (txn *Txn) checkSize(e *entry) error {
273+
func (txn *Txn) checkSize(e *Entry) error {
274274
count := txn.count + 1
275275
// Extra bytes for version in key.
276276
size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10
@@ -286,32 +286,34 @@ func (txn *Txn) checkSize(e *entry) error {
286286
// It will return ErrReadOnlyTxn if update flag was set to false when creating the
287287
// transaction.
288288
func (txn *Txn) Set(key, val []byte) error {
289-
e := &entry{
289+
e := &Entry{
290290
Key: key,
291291
Value: val,
292292
}
293-
return txn.setEntry(e)
293+
return txn.SetEntry(e)
294294
}
295295

296296
// SetWithMeta adds a key-value pair to the database, along with a metadata
297297
// byte. This byte is stored alongside the key, and can be used as an aid to
298298
// interpret the value or store other contextual bits corresponding to the
299299
// key-value pair.
300300
func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error {
301-
e := &entry{Key: key, Value: val, UserMeta: meta}
302-
return txn.setEntry(e)
301+
e := &Entry{Key: key, Value: val, UserMeta: meta}
302+
return txn.SetEntry(e)
303303
}
304304

305305
// SetWithTTL adds a key-value pair to the database, along with a time-to-live
306306
// (TTL) setting. A key stored with with a TTL would automatically expire after
307307
// the time has elapsed , and be eligible for garbage collection.
308308
func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error {
309309
expire := time.Now().Add(dur).Unix()
310-
e := &entry{Key: key, Value: val, ExpiresAt: uint64(expire)}
311-
return txn.setEntry(e)
310+
e := &Entry{Key: key, Value: val, ExpiresAt: uint64(expire)}
311+
return txn.SetEntry(e)
312312
}
313313

314-
func (txn *Txn) setEntry(e *entry) error {
314+
// SetEntry takes an Entry struct and adds the key-value pair in the struct, along
315+
// with other metadata to the database.
316+
func (txn *Txn) SetEntry(e *Entry) error {
315317
switch {
316318
case !txn.update:
317319
return ErrReadOnlyTxn
@@ -348,7 +350,7 @@ func (txn *Txn) Delete(key []byte) error {
348350
return exceedsMaxKeySizeError(key)
349351
}
350352

351-
e := &entry{
353+
e := &Entry{
352354
Key: key,
353355
meta: bitDelete,
354356
}
@@ -474,15 +476,15 @@ func (txn *Txn) Commit(callback func(error)) error {
474476
return ErrConflict
475477
}
476478

477-
entries := make([]*entry, 0, len(txn.pendingWrites)+1)
479+
entries := make([]*Entry, 0, len(txn.pendingWrites)+1)
478480
for _, e := range txn.pendingWrites {
479481
// Suffix the keys with commit ts, so the key versions are sorted in
480482
// descending order of commit timestamp.
481483
e.Key = y.KeyWithTs(e.Key, commitTs)
482484
e.meta |= bitTxn
483485
entries = append(entries, e)
484486
}
485-
e := &entry{
487+
e := &Entry{
486488
Key: y.KeyWithTs(txnKey, commitTs),
487489
Value: []byte(strconv.FormatUint(commitTs, 10)),
488490
meta: bitFinTxn,
@@ -532,7 +534,7 @@ func (db *DB) NewTransaction(update bool) *Txn {
532534
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
533535
}
534536
if update {
535-
txn.pendingWrites = make(map[string]*entry)
537+
txn.pendingWrites = make(map[string]*Entry)
536538
txn.db.orc.addRef()
537539
}
538540
return txn

transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) {
584584
err = db.View(func(txn *Txn) error {
585585
item, err := txn.Get([]byte("answer1"))
586586
require.NoError(t, err)
587-
err = txn.db.batchSet([]*entry{
587+
err = txn.db.batchSet([]*Entry{
588588
{
589589
Key: y.KeyWithTs(item.key, item.version),
590590
meta: bitDelete,

value.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (lf *logFile) sync() error {
151151

152152
var errStop = errors.New("Stop iteration")
153153

154-
type logEntry func(e entry, vp valuePointer) error
154+
type logEntry func(e Entry, vp valuePointer) error
155155

156156
// iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
157157
// Therefore, the kv pair is only valid for the duration of fn call.
@@ -184,7 +184,7 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error {
184184
return err
185185
}
186186

187-
var e entry
187+
var e Entry
188188
e.offset = recordOffset
189189
h.Decode(hbuf[:])
190190
if h.klen > maxKeySize {
@@ -268,12 +268,12 @@ func (vlog *valueLog) rewrite(f *logFile) error {
268268
defer elog.Finish()
269269
elog.Printf("Rewriting fid: %d", f.fid)
270270

271-
wb := make([]*entry, 0, 1000)
271+
wb := make([]*Entry, 0, 1000)
272272
var size int64
273273

274274
y.AssertTrue(vlog.kv != nil)
275275
var count int
276-
fe := func(e entry) error {
276+
fe := func(e Entry) error {
277277
count++
278278
if count%10000 == 0 {
279279
elog.Printf("Processing entry %d", count)
@@ -302,7 +302,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
302302
}
303303
if vp.Fid == f.fid && vp.Offset == e.offset {
304304
// This new entry only contains the key, and a pointer to the value.
305-
ne := new(entry)
305+
ne := new(Entry)
306306
ne.meta = 0 // Remove all bits.
307307
ne.UserMeta = e.UserMeta
308308
ne.Key = make([]byte, len(e.Key))
@@ -325,7 +325,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
325325
return nil
326326
}
327327

328-
err := vlog.iterate(f, 0, func(e entry, vp valuePointer) error {
328+
err := vlog.iterate(f, 0, func(e Entry, vp valuePointer) error {
329329
return fe(e)
330330
})
331331
if err != nil {
@@ -631,7 +631,7 @@ func (vlog *valueLog) Replay(ptr valuePointer, fn logEntry) error {
631631

632632
type request struct {
633633
// Input values
634-
Entries []*entry
634+
Entries []*Entry
635635
// Output values and wait group stuff below
636636
Ptrs []valuePointer
637637
Wg sync.WaitGroup
@@ -786,7 +786,7 @@ func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, func(), error) {
786786
}
787787

788788
// Test helper
789-
func valueBytesToEntry(buf []byte) (e entry) {
789+
func valueBytesToEntry(buf []byte) (e Entry) {
790790
var h header
791791
h.Decode(buf)
792792
n := uint32(headerBufSize)
@@ -841,7 +841,7 @@ func (vlog *valueLog) pickLog(head valuePointer) *logFile {
841841
return vlog.filesMap[fids[idx]]
842842
}
843843

844-
func discardEntry(e entry, vs y.ValueStruct) bool {
844+
func discardEntry(e Entry, vs y.ValueStruct) bool {
845845
if vs.Version != y.ParseTs(e.Key) {
846846
// Version not found. Discard.
847847
return true
@@ -892,7 +892,7 @@ func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) (err error
892892

893893
start := time.Now()
894894
y.AssertTrue(vlog.kv != nil)
895-
err = vlog.iterate(lf, 0, func(e entry, vp valuePointer) error {
895+
err = vlog.iterate(lf, 0, func(e Entry, vp valuePointer) error {
896896
esz := float64(vp.Len) / (1 << 20) // in MBs. +4 for the CAS stuff.
897897
skipped += esz
898898
if skipped < skipFirstM {

0 commit comments

Comments
 (0)