diff --git a/database/ffldb/dbcache.go b/database/ffldb/dbcache.go index 7e6a44dcbc..9fe7d4d449 100644 --- a/database/ffldb/dbcache.go +++ b/database/ffldb/dbcache.go @@ -611,19 +611,25 @@ func (c *dbCache) commitTx(tx *transaction) error { c.cacheLock.RUnlock() // Apply every key to add in the database transaction to the cache. + pendingKVs := make([]treap.KVPair, 0, tx.pendingKeys.Len()) tx.pendingKeys.ForEach(func(k, v []byte) bool { + pendingKVs = append(pendingKVs, treap.KVPair{Key: k, Value: v}) + newCachedRemove = newCachedRemove.Delete(k) - newCachedKeys = newCachedKeys.Put(k, v) return true }) + newCachedKeys = newCachedKeys.Put(pendingKVs...) tx.pendingKeys = nil // Apply every key to remove in the database transaction to the cache. + pendingRemoveKVs := make([]treap.KVPair, 0, tx.pendingRemove.Len()) tx.pendingRemove.ForEach(func(k, v []byte) bool { + pendingRemoveKVs = append(pendingRemoveKVs, treap.KVPair{Key: k, Value: v}) + newCachedKeys = newCachedKeys.Delete(k) - newCachedRemove = newCachedRemove.Put(k, nil) return true }) + newCachedRemove = newCachedRemove.Put(pendingRemoveKVs...) tx.pendingRemove = nil // Atomically replace the immutable treaps which hold the cached keys to diff --git a/database/internal/treap/common.go b/database/internal/treap/common.go index 090a7bd5ab..6fd9d2acda 100644 --- a/database/internal/treap/common.go +++ b/database/internal/treap/common.go @@ -6,6 +6,7 @@ package treap import ( "math/rand" + "sync" "time" ) @@ -33,6 +34,14 @@ var ( emptySlice = make([]byte, 0) ) +// treapNodePool defines a concurrent safe free list of treapNode used to +// provide temporary buffers. +var treapNodePool = sync.Pool{ + New: func() any { + return &treapNode{} + }, +} + // treapNode represents a node in the treap. type treapNode struct { key []byte @@ -42,6 +51,16 @@ type treapNode struct { right *treapNode } +// recycle resets and puts the treapNode into the treapNodePool to be recycled. +func (n *treapNode) recycle() { + n.key = nil + n.value = nil + n.priority = 0 + n.left = nil + n.right = nil + treapNodePool.Put(n) +} + // nodeSize returns the number of bytes the specified node occupies including // the struct fields and the contents of the key and value. func nodeSize(node *treapNode) uint64 { @@ -51,7 +70,14 @@ func nodeSize(node *treapNode) uint64 { // newTreapNode returns a new node from the given key, value, and priority. The // node is not initially linked to any others. func newTreapNode(key, value []byte, priority int) *treapNode { - return &treapNode{key: key, value: value, priority: priority} + n := treapNodePool.Get().(*treapNode) + n.key = key + n.value = value + n.priority = priority + n.left = nil + n.right = nil + + return n } // parentStack represents a stack of parent treap nodes that are used during diff --git a/database/internal/treap/immutable.go b/database/internal/treap/immutable.go index a6e13ff4a5..142f4b0b09 100644 --- a/database/internal/treap/immutable.go +++ b/database/internal/treap/immutable.go @@ -11,13 +11,14 @@ import ( // cloneTreapNode returns a shallow copy of the passed node. func cloneTreapNode(node *treapNode) *treapNode { - return &treapNode{ - key: node.key, - value: node.value, - priority: node.priority, - left: node.left, - right: node.right, - } + n := treapNodePool.Get().(*treapNode) + n.key = node.key + n.value = node.value + n.priority = node.priority + n.left = node.left + n.right = node.right + + return n } // Immutable represents a treap data structure which is used to hold ordered @@ -104,8 +105,59 @@ func (t *Immutable) Get(key []byte) []byte { return nil } -// Put inserts the passed key/value pair. -func (t *Immutable) Put(key, value []byte) *Immutable { +// KVPair is just a helper struct for a key-value pair that's going to be +// inserted into the treap. +type KVPair struct { + Key []byte + Value []byte +} + +// Put puts the passed in key/value pairs into the treap. For operations +// requiring many insertions at once, Put is memory efficient as the +// intermediary treap nodes created between each put operation is recycled +// through an internal sync.Pool, reducing overall memory allocation. +func (t *Immutable) Put(kvPairs ...KVPair) *Immutable { + treap := t + var prevTreapNodes [staticDepth]*treapNode + + for _, kvPair := range kvPairs { + newTreap, newTreapNodes := treap.put(kvPair.Key, kvPair.Value) + + // Loop through the prevTreapNodes and check for treapNodes that + // are no longer being utilized. These will be garbaged collected + // and they're better off being recycled in the treapNodePool. + for _, node := range prevTreapNodes { + if node == nil { + break + } + + // Make sure that the node we're going to recycle isn't + // being used by the latest immutable treap by checking + // if the pointer value of the node is the same. + got := newTreap.get(node.key) + if got == node { + continue + } + + // This node is only being used by the previous immutable + // copy and can safely be put into the treapNodePool to be + // recycled. + node.recycle() + } + + // Replace with the latest treap and treap nodes. + treap = newTreap + prevTreapNodes = newTreapNodes + } + + return treap +} + +// put inserts the passed key/value pair and returns all the newly created +// treapNodes that were created during this put operation. The returned +// treapNodes can then be put into data structures like sync.Pool to reduce the +// memory overhead of allocating new treapNodes during multiple put calls. +func (t *Immutable) put(key, value []byte) (*Immutable, [staticDepth]*treapNode) { // Use an empty byte slice for the value when none was provided. This // ultimately allows key existence to be determined from the value since // an empty byte slice is distinguishable from nil. @@ -113,10 +165,19 @@ func (t *Immutable) Put(key, value []byte) *Immutable { value = emptySlice } + // recycle is the treapNodes that are created during this put operation. + // We keep track of the nodes as the caller may be choose to recycle + // them to keep memory allocation low. + var ( + recycle [staticDepth]*treapNode + currentRecycleIndex int + ) + // The node is the root of the tree if there isn't already one. if t.root == nil { root := newTreapNode(key, value, rand.Int()) - return newImmutable(root, 1, nodeSize(root)) + recycle[currentRecycleIndex] = root + return newImmutable(root, 1, nodeSize(root)), recycle } // Find the binary tree insertion point and construct a replaced list of @@ -132,6 +193,16 @@ func (t *Immutable) Put(key, value []byte) *Immutable { for node := t.root; node != nil; { // Clone the node and link its parent to it if needed. nodeCopy := cloneTreapNode(node) + + // Check if we still have space in the recycle for this node. + // It's ok if we don't put every single new node to be recycled + // as there's no guarantee in the sync.Pool that every recycled + // treapNode will be re-utilized. + if currentRecycleIndex < staticDepth { + recycle[currentRecycleIndex] = nodeCopy + currentRecycleIndex++ + } + if oldParent := parents.At(0); oldParent != nil { if oldParent.left == node { oldParent.left = nodeCopy @@ -161,11 +232,20 @@ func (t *Immutable) Put(key, value []byte) *Immutable { newRoot := parents.At(parents.Len() - 1) newTotalSize := t.totalSize - uint64(len(node.value)) + uint64(len(value)) - return newImmutable(newRoot, t.count, newTotalSize) + return newImmutable(newRoot, t.count, newTotalSize), recycle } - // Link the new node into the binary tree in the correct position. + // Check if we still have space in the recycle for this node. + // It's ok if we don't put every single new node to be recycled + // as there's no guarantee in the sync.Pool that every recycled + // treapNode will be re-utilized. node := newTreapNode(key, value, rand.Int()) + if currentRecycleIndex < staticDepth { + recycle[currentRecycleIndex] = node + currentRecycleIndex++ + } + + // Link the new node into the binary tree in the correct position. parent := parents.At(0) if compareResult < 0 { parent.left = node @@ -205,7 +285,7 @@ func (t *Immutable) Put(key, value []byte) *Immutable { } } - return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node)) + return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node)), recycle } // Delete removes the passed key from the treap and returns the resulting treap diff --git a/database/internal/treap/immutable_test.go b/database/internal/treap/immutable_test.go index e0a1cb4af6..950974e85c 100644 --- a/database/internal/treap/immutable_test.go +++ b/database/internal/treap/immutable_test.go @@ -61,31 +61,43 @@ func TestImmutableSequential(t *testing.T) { // functions work as expected. expectedSize := uint64(0) numItems := 1000 + keyCount := 100 testTreap := NewImmutable() - for i := 0; i < numItems; i++ { - key := serializeUint32(uint32(i)) - testTreap = testTreap.Put(key, key) + for i := 0; i < numItems/keyCount; i++ { + keys := make([][]byte, 0, keyCount) + kvPairs := make([]KVPair, 0, keyCount) + for j := 0; j < keyCount; j++ { + n := i*keyCount + j + key := serializeUint32(uint32(n)) + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) + } + + testTreap = testTreap.Put(kvPairs...) // Ensure the treap length is the expected value. - if gotLen := testTreap.Len(); gotLen != i+1 { + if gotLen := testTreap.Len(); gotLen != (i+1)*keyCount { t.Fatalf("Len #%d: unexpected length - got %d, want %d", i, gotLen, i+1) } - // Ensure the treap has the key. - if !testTreap.Has(key) { - t.Fatalf("Has #%d: key %q is not in treap", i, key) - } + for j, key := range keys { + // Ensure the treap has the key. + if !testTreap.Has(key) { + t.Fatalf("Has #%d#%d: key %q is not in treap", i, j, key) + } - // Get the key from the treap and ensure it is the expected - // value. - if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { - t.Fatalf("Get #%d: unexpected value - got %x, want %x", - i, gotVal, key) + // Get the key from the treap and ensure it is the expected + // value. + if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { + t.Fatalf("Get #%d#%d: unexpected value - got %x, want %x", + i, j, gotVal, key) + } + + expectedSize += (nodeFieldsSize + 8) } // Ensure the expected size is reported. - expectedSize += (nodeFieldsSize + 8) if gotSize := testTreap.Size(); gotSize != expectedSize { t.Fatalf("Size #%d: unexpected byte size - got %d, "+ "want %d", i, gotSize, expectedSize) @@ -161,31 +173,43 @@ func TestImmutableReverseSequential(t *testing.T) { // functions work as expected. expectedSize := uint64(0) numItems := 1000 + keyCount := 100 testTreap := NewImmutable() - for i := 0; i < numItems; i++ { - key := serializeUint32(uint32(numItems - i - 1)) - testTreap = testTreap.Put(key, key) + for i := 0; i < numItems/keyCount; i++ { + keys := make([][]byte, 0, keyCount) + kvPairs := make([]KVPair, 0, keyCount) + for j := 0; j < keyCount; j++ { + n := numItems - (i * keyCount) - j - 1 + key := serializeUint32(uint32(n)) + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) + } + + testTreap = testTreap.Put(kvPairs...) // Ensure the treap length is the expected value. - if gotLen := testTreap.Len(); gotLen != i+1 { + if gotLen := testTreap.Len(); gotLen != (i+1)*keyCount { t.Fatalf("Len #%d: unexpected length - got %d, want %d", i, gotLen, i+1) } - // Ensure the treap has the key. - if !testTreap.Has(key) { - t.Fatalf("Has #%d: key %q is not in treap", i, key) - } + for j, key := range keys { + // Ensure the treap has the key. + if !testTreap.Has(key) { + t.Fatalf("Has #%d#%d: key %q is not in treap", i, j, key) + } - // Get the key from the treap and ensure it is the expected - // value. - if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { - t.Fatalf("Get #%d: unexpected value - got %x, want %x", - i, gotVal, key) + // Get the key from the treap and ensure it is the expected + // value. + if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { + t.Fatalf("Get #%d#%d: unexpected value - got %x, want %x", + i, j, gotVal, key) + } + + expectedSize += (nodeFieldsSize + 8) } // Ensure the expected size is reported. - expectedSize += (nodeFieldsSize + 8) if gotSize := testTreap.Size(); gotSize != expectedSize { t.Fatalf("Size #%d: unexpected byte size - got %d, "+ "want %d", i, gotSize, expectedSize) @@ -262,33 +286,45 @@ func TestImmutableUnordered(t *testing.T) { // treap functions work as expected. expectedSize := uint64(0) numItems := 1000 + keyCount := 100 testTreap := NewImmutable() - for i := 0; i < numItems; i++ { + for i := 0; i < numItems/keyCount; i++ { // Hash the serialized int to generate out-of-order keys. - hash := sha256.Sum256(serializeUint32(uint32(i))) - key := hash[:] - testTreap = testTreap.Put(key, key) + keys := make([][]byte, 0, keyCount) + kvPairs := make([]KVPair, 0, keyCount) + for j := 0; j < keyCount; j++ { + n := i*keyCount + j + hash := sha256.Sum256(serializeUint32(uint32(n))) + key := hash[:] + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) + } + + testTreap = testTreap.Put(kvPairs...) // Ensure the treap length is the expected value. - if gotLen := testTreap.Len(); gotLen != i+1 { + if gotLen := testTreap.Len(); gotLen != (i+1)*keyCount { t.Fatalf("Len #%d: unexpected length - got %d, want %d", i, gotLen, i+1) } - // Ensure the treap has the key. - if !testTreap.Has(key) { - t.Fatalf("Has #%d: key %q is not in treap", i, key) - } + for j, key := range keys { + // Ensure the treap has the key. + if !testTreap.Has(key) { + t.Fatalf("Has #%d#%d: key %q is not in treap", i, j, key) + } - // Get the key from the treap and ensure it is the expected - // value. - if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { - t.Fatalf("Get #%d: unexpected value - got %x, want %x", - i, gotVal, key) + // Get the key from the treap and ensure it is the expected + // value. + if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, key) { + t.Fatalf("Get #%d#%d: unexpected value - got %x, want %x", + i, j, gotVal, key) + } + + expectedSize += nodeFieldsSize + uint64(len(key)+len(key)) } // Ensure the expected size is reported. - expectedSize += nodeFieldsSize + uint64(len(key)+len(key)) if gotSize := testTreap.Size(); gotSize != expectedSize { t.Fatalf("Size #%d: unexpected byte size - got %d, "+ "want %d", i, gotSize, expectedSize) @@ -335,31 +371,53 @@ func TestImmutableUnordered(t *testing.T) { func TestImmutableDuplicatePut(t *testing.T) { t.Parallel() + keyCount := 100 expectedVal := []byte("testval") + expectedSize := uint64(0) numItems := 1000 testTreap := NewImmutable() - for i := 0; i < numItems; i++ { - key := serializeUint32(uint32(i)) - testTreap = testTreap.Put(key, key) - expectedSize += nodeFieldsSize + uint64(len(key)+len(key)) + for i := 0; i < numItems/keyCount; i++ { + keys := make([][]byte, 0, keyCount) + kvPairs := make([]KVPair, 0, keyCount) + for j := 0; j < keyCount; j++ { + n := i*keyCount + j + key := serializeUint32(uint32(n)) + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) + } - // Put a duplicate key with the expected final value. - testTreap = testTreap.Put(key, expectedVal) + testTreap = testTreap.Put(kvPairs...) - // Ensure the key still exists and is the new value. - if gotVal := testTreap.Has(key); !gotVal { - t.Fatalf("Has: unexpected result - got %v, want true", - gotVal) + // Get expectedSize. + for _, key := range keys { + expectedSize += nodeFieldsSize + uint64(len(key)+len(key)) } - if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, expectedVal) { - t.Fatalf("Get: unexpected result - got %x, want %x", - gotVal, expectedVal) + + // Put duplicate keys with the expected final values. + expectedPairs := make([]KVPair, keyCount) + for i := range expectedPairs { + expectedPairs[i] = KVPair{keys[i], expectedVal} + } + + testTreap = testTreap.Put(expectedPairs...) + + // Ensure the keys still exist and is the new value. + for _, key := range keys { + if gotVal := testTreap.Has(key); !gotVal { + t.Fatalf("Has: unexpected result - got %v, want true", + gotVal) + } + if gotVal := testTreap.Get(key); !bytes.Equal(gotVal, expectedVal) { + t.Fatalf("Get: unexpected result - got %x, want %x", + gotVal, expectedVal) + } + + expectedSize -= uint64(len(key)) + expectedSize += uint64(len(expectedVal)) } // Ensure the expected size is reported. - expectedSize -= uint64(len(key)) - expectedSize += uint64(len(expectedVal)) if gotSize := testTreap.Size(); gotSize != expectedSize { t.Fatalf("Size: unexpected byte size - got %d, want %d", gotSize, expectedSize) @@ -376,7 +434,7 @@ func TestImmutableNilValue(t *testing.T) { // Put the key with a nil value. testTreap := NewImmutable() - testTreap = testTreap.Put(key, nil) + testTreap = testTreap.Put(KVPair{key, nil}) // Ensure the key exists and is an empty byte slice. if gotVal := testTreap.Has(key); !gotVal { @@ -399,10 +457,12 @@ func TestImmutableForEachStopIterator(t *testing.T) { // Insert a few keys. numItems := 10 testTreap := NewImmutable() + kvPairs := make([]KVPair, 0, numItems) for i := 0; i < numItems; i++ { key := serializeUint32(uint32(i)) - testTreap = testTreap.Put(key, key) + kvPairs = append(kvPairs, KVPair{key, key}) } + testTreap = testTreap.Put(kvPairs...) // Ensure ForEach exits early on false return by caller. var numIterated int @@ -426,38 +486,50 @@ func TestImmutableSnapshot(t *testing.T) { // functions work as expected. expectedSize := uint64(0) numItems := 1000 + keyCount := 100 testTreap := NewImmutable() - for i := 0; i < numItems; i++ { + for i := 0; i < numItems/keyCount; i++ { treapSnap := testTreap - key := serializeUint32(uint32(i)) - testTreap = testTreap.Put(key, key) + keys := make([][]byte, 0, keyCount) + kvPairs := make([]KVPair, 0, keyCount) + for j := 0; j < keyCount; j++ { + n := i*keyCount + j + key := serializeUint32(uint32(n)) + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) + } + + testTreap = testTreap.Put(kvPairs...) // Ensure the length of the treap snapshot is the expected // value. - if gotLen := treapSnap.Len(); gotLen != i { + if gotLen := treapSnap.Len(); gotLen != i*keyCount { t.Fatalf("Len #%d: unexpected length - got %d, want %d", i, gotLen, i) } - // Ensure the treap snapshot does not have the key. - if treapSnap.Has(key) { - t.Fatalf("Has #%d: key %q is in treap", i, key) - } + for j, key := range keys { + // Ensure the treap snapshot does not have the key. + if treapSnap.Has(key) { + t.Fatalf("Has #%d#%d: key %q is in treap", i, j, key) + } - // Get the key that doesn't exist in the treap snapshot and - // ensure it is nil. - if gotVal := treapSnap.Get(key); gotVal != nil { - t.Fatalf("Get #%d: unexpected value - got %x, want nil", - i, gotVal) - } + // Get the key that doesn't exist in the treap snapshot and + // ensure it is nil. + if gotVal := treapSnap.Get(key); gotVal != nil { + t.Fatalf("Get #%d#%d: unexpected value - got %x, want nil", + i, j, gotVal) + } - // Ensure the expected size is reported. - if gotSize := treapSnap.Size(); gotSize != expectedSize { - t.Fatalf("Size #%d: unexpected byte size - got %d, "+ - "want %d", i, gotSize, expectedSize) + // Ensure the expected size is reported. + if gotSize := treapSnap.Size(); gotSize != expectedSize { + t.Fatalf("Size #%d#%d: unexpected byte size - got %d, "+ + "want %d", i, j, gotSize, expectedSize) + } } - expectedSize += (nodeFieldsSize + 8) + + expectedSize += (nodeFieldsSize + 8) * uint64(keyCount) } // Delete the keys one-by-one while checking several of the treap diff --git a/database/internal/treap/treapiter_test.go b/database/internal/treap/treapiter_test.go index 08b4335ebc..a6d12b51ff 100644 --- a/database/internal/treap/treapiter_test.go +++ b/database/internal/treap/treapiter_test.go @@ -496,10 +496,14 @@ testLoop: for i, test := range tests { // Insert a bunch of keys. testTreap := NewImmutable() + keys := make([][]byte, 0, test.numKeys) + kvPairs := make([]KVPair, 0, test.numKeys) for i := 0; i < test.numKeys; i += test.step { key := serializeUint32(uint32(i)) - testTreap = testTreap.Put(key, key) + keys = append(keys, key) + kvPairs = append(kvPairs, KVPair{key, key}) } + testTreap = testTreap.Put(kvPairs...) // Create new iterator limited by the test params. iter := testTreap.Iterator(test.startKey, test.limitKey)