diff --git a/Tests/TransactionTests.cs b/Tests/TransactionTests.cs new file mode 100644 index 0000000..d1ca00c --- /dev/null +++ b/Tests/TransactionTests.cs @@ -0,0 +1,157 @@ +using System; +using System.IO; + +namespace RocksDbSharp.Tests; + +[TestClass] +public class TransactionTests +{ + private string? _path; + private TransactionDb? _db; + + [TestInitialize] + public void TestInitialize() + { + _path = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString()); + + var dbOptions = new DbOptions().SetCreateIfMissing(); + var transactionDbOptions = new TransactionDbOptions(); + _db = TransactionDb.Open(dbOptions, transactionDbOptions, _path); + } + + [TestCleanup] + public void TestCleanup() + { + _db?.Dispose(); + _db = null; + + if (Directory.Exists(_path)) + Directory.Delete(_path, true); + } + + [TestMethod] + public void key_should_be_visible_inside_transaction_while_uncommitted() + { + // Arrange + const string key = "key1"; + const string value = "value1"; + + using var tran = _db!.BeginTransaction(); + + // Act + tran.Put(key, value); + var exists = tran.HasKey(key); + + // Assert + Assert.IsTrue(exists); + } + + [TestMethod] + public void key_should_not_be_visible_outside_transaction_while_uncommitted() + { + // Arrange + const string key = "key1"; + const string value = "value1"; + + using var tran = _db!.BeginTransaction(); + + // Act + tran.Put(key, value); + var exists = _db.HasKey(key); + + // Assert + Assert.IsFalse(exists); + } + + [TestMethod] + public void key_should_be_visible_outside_transaction_after_commit() + { + // Arrange + const string key = "key1"; + const string value = "value1"; + + using var tran = _db!.BeginTransaction(); + + // Act + tran.Put(key, value); + tran.Commit(); + var exists = _db.HasKey(key); + + // Assert + Assert.IsTrue(exists); + } + + [TestMethod] + public void key_should_not_be_iterable_outside_transaction_while_uncommitted() + { + // Arrange + const string key = "key1"; + const string value = "value1"; + + using var tran = _db!.BeginTransaction(); + + // Act + tran.Put(key, value); + using var iterator = _db.NewIterator().SeekToFirst(); + + // Assert + Assert.AreNotEqual(key, iterator.StringKey()); + } + + [TestMethod] + public void key_should_be_iterable_outside_transaction_after_commit() + { + // Arrange + const string key = "key1"; + const string value = "value1"; + + using var tran = _db!.BeginTransaction(); + + // Act + tran.Put(key, value); + tran.Commit(); + using var iterator = _db.NewIterator().SeekToFirst(); + + // Assert + Assert.AreEqual(key, iterator.StringKey()); + } + + [TestMethod] + public void old_value_should_be_visible_outside_transaction_while_uncommitted() + { + // Arrange + const string key = "key"; + const string value1 = "value1"; + const string value2 = "value2"; + _db!.Put(key, value1); + + using var tran = _db.BeginTransaction(); + + // Act + tran.Put(key, value2); + var result = _db.Get(key); + + // Assert + Assert.AreEqual(value1, result); + } + + [TestMethod] + public void new_value_should_be_visible_outside_transaction_after_commit() + { + // Arrange + const string key = "key"; + const string value1 = "value1"; + const string value2 = "value2"; + _db!.Put(key, value1); + + using var tran = _db.BeginTransaction(); + + // Act + tran.Put(key, value2); + tran.Commit(); + var result = _db.Get(key); + + // Assert + Assert.AreEqual(value2, result); + } +} \ No newline at end of file diff --git a/Tests/Usings.cs b/Tests/Usings.cs new file mode 100644 index 0000000..ab67c7e --- /dev/null +++ b/Tests/Usings.cs @@ -0,0 +1 @@ +global using Microsoft.VisualStudio.TestTools.UnitTesting; \ No newline at end of file diff --git a/csharp/src/Native.Marshaled.cs b/csharp/src/Native.Marshaled.cs index 80b447f..0fa858e 100644 --- a/csharp/src/Native.Marshaled.cs +++ b/csharp/src/Native.Marshaled.cs @@ -1,4 +1,4 @@ -/* +/* The functions in this file provide some wrappers around the lowest level C API to aid in marshalling. This is kept separate so that the lowest level imports can be kept as close as possible to c.h from rocksdb. See Native.Raw.cs for more information. @@ -104,6 +104,51 @@ public void rocksdb_put( } } + public void rocksdb_transaction_put( + /*rocksdb_t**/ IntPtr db, + string key, + string val, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + unsafe + { + if (encoding is null) + { + encoding = Encoding.UTF8; + } + + fixed (char* k = key, v = val) + { + int klength = key.Length; + int vlength = val.Length; + int bklength = encoding.GetByteCount(k, klength); + int bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + byte* bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + UIntPtr sklength = (UIntPtr)bklength; + UIntPtr svlength = (UIntPtr)bvlength; + + if (cf is null) + { + rocksdb_transaction_put(db, bk, sklength, bv, svlength, out errptr); + } + else + { + rocksdb_transaction_put_cf(db, cf.Handle, bk, sklength, bv, svlength, out errptr); + } +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + public string rocksdb_get( /*rocksdb_t**/ IntPtr db, /*const rocksdb_readoptions_t**/ IntPtr read_options, @@ -151,6 +196,53 @@ public string rocksdb_get( } } + public string rocksdb_transaction_get( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + if (encoding is null) + { + encoding = Encoding.UTF8; + } + + unsafe + { + fixed (char* k = key) + { + int klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + UIntPtr sklength = (UIntPtr)bklength; + + var resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, bk, sklength, out UIntPtr bvlength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, bk, sklength, out bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + { + return null; + } + + if (resultPtr == IntPtr.Zero) + { + return null; + } + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding); + } + } + } + private unsafe string MarshalAndFreeRocksDbString(IntPtr resultPtr, long resultLength, Encoding encoding) { var result = CurrentFramework.CreateString((sbyte*)resultPtr.ToPointer(), 0, (int)resultLength, encoding); @@ -188,6 +280,32 @@ public bool rocksdb_has_key( return true; } + public bool rocksdb_transaction_has_key( + IntPtr db, + IntPtr read_options, + byte[] key, + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)keyLength; + var resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, key, skLength, out _, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, key, skLength, out _, out errptr); + if (errptr != IntPtr.Zero) + { + return false; + } + + if (resultPtr == IntPtr.Zero) + { + return false; + } + + rocksdb_free(resultPtr); + return true; + } + public byte[] rocksdb_get( IntPtr db, IntPtr read_options, @@ -216,6 +334,34 @@ public byte[] rocksdb_get( return result; } + public byte[] rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + byte[] key, + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)keyLength; + var resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, key, skLength, out UIntPtr valueLength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, key, skLength, out valueLength, out errptr); + if (errptr != IntPtr.Zero) + { + return null; + } + + if (resultPtr == IntPtr.Zero) + { + return null; + } + + var result = new byte[(ulong)valueLength]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength); + rocksdb_free(resultPtr); + return result; + } + #if !NETSTANDARD2_0 public unsafe byte[] rocksdb_get( IntPtr db, @@ -356,6 +502,146 @@ public unsafe T rocksdb_get( rocksdb_free(resultPtr); } } + + public unsafe byte[] rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)key.Length; + IntPtr resultPtr; + UIntPtr valueLength; + fixed (byte* ptr = &MemoryMarshal.GetReference(key)) + { + resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, ptr, skLength, out valueLength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, ptr, skLength, out valueLength, out errptr); + } + if (errptr != IntPtr.Zero) + { + return null; + } + + if (resultPtr == IntPtr.Zero) + { + return null; + } + + var result = new byte[(ulong)valueLength]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength); + rocksdb_free(resultPtr); + return result; + } + + public unsafe bool rocksdb_transaction_has_key( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)key.Length; + IntPtr resultPtr; + UIntPtr valueLength; + fixed (byte* ptr = &MemoryMarshal.GetReference(key)) + { + resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, ptr, skLength, out valueLength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, ptr, skLength, out valueLength, out errptr); + } + + if (errptr != IntPtr.Zero) + { + return false; + } + + if (resultPtr == IntPtr.Zero) + { + return false; + } + + rocksdb_free(resultPtr); + + return true; + } + + public unsafe T rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + ISpanDeserializer deserializer, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)key.Length; + IntPtr resultPtr; + UIntPtr valueLength; + fixed (byte* ptr = &MemoryMarshal.GetReference(key)) + { + resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, ptr, skLength, out valueLength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, ptr, skLength, out valueLength, out errptr); + } + if (errptr != IntPtr.Zero) + { + return default(T); + } + + if (resultPtr == IntPtr.Zero) + { + return default(T); + } + + var span = new ReadOnlySpan((void*)resultPtr, (int)valueLength); + try + { + return deserializer.Deserialize(span); + } + finally + { + rocksdb_free(resultPtr); + } + } + + public unsafe T rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + Func deserializer, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + UIntPtr skLength = (UIntPtr)key.Length; + IntPtr resultPtr; + UIntPtr valueLength; + fixed (byte* ptr = &MemoryMarshal.GetReference(key)) + { + resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, ptr, skLength, out valueLength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, ptr, skLength, out valueLength, out errptr); + } + if (errptr != IntPtr.Zero) + { + return default(T); + } + + if (resultPtr == IntPtr.Zero) + { + return default(T); + } + + try + { + using var stream = new UnmanagedMemoryStream((byte*)resultPtr, (long)valueLength); + return deserializer(stream); + } + finally + { + rocksdb_free(resultPtr); + } + } #endif /// @@ -556,6 +842,204 @@ public unsafe KeyValuePair[] rocksdb_multi_get( return values; } + /// + /// Executes a multi_get with automatic marshalling + /// + /// + /// + /// + /// when non-zero, specifies the number of keys in the array to fetch + /// when non-null specifies the lengths of each key to fetch + /// when non-null, must be an array that will be populated with error codes + /// when non-null is a pre-allocated array to put the resulting values in + /// + /// + public unsafe KeyValuePair[] rocksdb_transaction_multi_get( + IntPtr db, + IntPtr read_options, + byte[][] keys, + IntPtr[] errptrs, + ulong numKeys = 0, + ulong[] keyLengths = null, + KeyValuePair[] values = null, + ColumnFamilyHandle[] cf = null) + { + uint count = numKeys == 0 ? (uint)keys.Length : (uint)numKeys; + UIntPtr sCount = (UIntPtr)count; + GCHandle[] pinned = new GCHandle[count]; + IntPtr[] keyPtrs = new IntPtr[count]; + IntPtr[] valuePtrs = new IntPtr[count]; + UIntPtr[] valueLengths = new UIntPtr[count]; + UIntPtr[] keyLengthsConverted = new UIntPtr[count]; + + if (values is null) + { + values = new KeyValuePair[count]; + } + + if (errptrs is null) + { + errptrs = new IntPtr[count]; + } + + if (keyLengths is null) + { + for (int i = 0; i < count; i++) + { + keyLengthsConverted[i] = new UIntPtr((uint)keys[i].Length); + } + } + else + { + for (int i = 0; i < count; i++) + { + keyLengthsConverted[i] = new UIntPtr((uint)keyLengths[i]); + } + } + + // first we have to pin and take the address of each key + for (int i = 0; i < count; i++) + { + var gch = GCHandle.Alloc(keys[i], GCHandleType.Pinned); + pinned[i] = gch; + keyPtrs[i] = gch.AddrOfPinnedObject(); + } + if (cf is null) + { + rocksdb_transaction_multi_get(db, read_options, sCount, keyPtrs, keyLengthsConverted, valuePtrs, valueLengths, errptrs); + } + else + { + IntPtr[] cfhs = new IntPtr[cf.Length]; + for (int i = 0; i < count; i++) + { + cfhs[i] = cf[i].Handle; + } + + rocksdb_transaction_multi_get_cf(db, read_options, cfhs, sCount, keyPtrs, keyLengthsConverted, valuePtrs, valueLengths, errptrs); + } + // unpin the keys + foreach (var gch in pinned) + { + gch.Free(); + } + + // now marshal all of the values + for (int i = 0; i < count; i++) + { + var valuePtr = valuePtrs[i]; + if (valuePtr != IntPtr.Zero) + { + var valueLength = (ulong)valueLengths[i]; + byte[] value = new byte[valueLength]; + Marshal.Copy(valuePtr, value, 0, (int)valueLength); + values[i] = new KeyValuePair(keys[i], value); + rocksdb_free(valuePtr); + } + else + { + values[i] = new KeyValuePair(keys[i], null); + } + } + return values; + } + /// + /// Executes a multi_get with automatic marshalling + /// + /// + /// + /// + /// when non-zero, specifies the number of keys in the array to fetch + /// when non-null specifies the lengths of each key to fetch + /// when non-null, must be an array that will be populated with error codes + /// when non-null is a pre-allocated array to put the resulting values in + /// + /// + public unsafe KeyValuePair[] rocksdb_transaction_multi_get( + IntPtr db, + IntPtr read_options, + string[] keys, + IntPtr[] errptrs, + ulong numKeys = 0, + KeyValuePair[] values = null, + ColumnFamilyHandle[] cf = null, + Encoding encoding = null) + { + if (encoding is null) + { + encoding = Encoding.UTF8; + } + + uint count = numKeys == 0 ? (uint)keys.Length : (uint)numKeys; + UIntPtr sCount = (UIntPtr)count; + IntPtr[] keyPtrs = new IntPtr[count]; + UIntPtr[] keyLengths = new UIntPtr[count]; + IntPtr[] valuePtrs = new IntPtr[count]; + UIntPtr[] valueLengths = new UIntPtr[count]; + + if (values is null) + { + values = new KeyValuePair[count]; + } + + if (errptrs is null) + { + errptrs = new IntPtr[count]; + } + + // first we have to encode each key + for (int i = 0; i < count; i++) + { + var key = keys[i]; + fixed (char* k = key) + { + var klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var bk = Marshal.AllocHGlobal(bklength); + encoding.GetBytes(k, klength, (byte*)bk.ToPointer(), bklength); + keyPtrs[i] = bk; + keyLengths[i] = new UIntPtr((uint)bklength); + } + } + if (cf is null) + { + rocksdb_transaction_multi_get(db, read_options, sCount, keyPtrs, keyLengths, valuePtrs, valueLengths, errptrs); + } + else + { + IntPtr[] cfhs = new IntPtr[cf.Length]; + for (int i = 0; i < count; i++) + { + cfhs[i] = cf[i].Handle; + } + + rocksdb_transaction_multi_get_cf(db, read_options, cfhs, sCount, keyPtrs, keyLengths, valuePtrs, valueLengths, errptrs); + } + // free the buffers allocated for each encoded key + foreach (var keyPtr in keyPtrs) + { + Marshal.FreeHGlobal(keyPtr); + } + + // now marshal all of the values + for (int i = 0; i < count; i++) + { + var resultPtr = valuePtrs[i]; + if (resultPtr != IntPtr.Zero) + { + var bv = (sbyte*)resultPtr.ToPointer(); + var bvLength = valueLengths[i]; + values[i] = new KeyValuePair(keys[i], CurrentFramework.CreateString(bv, 0, (int)bvLength, encoding)); + rocksdb_free(resultPtr); + } + else + { + values[i] = new KeyValuePair(keys[i], null); + } + } + return values; + } + public void rocksdb_delete( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, @@ -576,6 +1060,25 @@ public void rocksdb_delete( } } + public void rocksdb_transaction_delete( + /*rocksdb_t**/ IntPtr db, + /*const*/ string key, + out IntPtr errptr, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + UIntPtr kLength = (UIntPtr)bkey.GetLongLength(0); + if (cf is null) + { + rocksdb_transaction_delete(db, bkey, kLength, out errptr); + } + else + { + rocksdb_transaction_delete_cf(db, cf.Handle, bkey, kLength, out errptr); + } + } + public string rocksdb_options_statistics_get_string_marshaled(IntPtr opts) { return MarshalNullTermAsciiStr(rocksdb_options_statistics_get_string(opts)); diff --git a/csharp/src/Native.Wrap.cs b/csharp/src/Native.Wrap.cs index 5a00a68..119327b 100644 --- a/csharp/src/Native.Wrap.cs +++ b/csharp/src/Native.Wrap.cs @@ -56,6 +56,46 @@ public void rocksdb_put( } } + public void rocksdb_transaction_put( + /*rocksdb_t**/ IntPtr db, + string key, + string val, + ColumnFamilyHandle cf = null, + System.Text.Encoding encoding = null) + { + rocksdb_transaction_put(db, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + + public void rocksdb_transaction_put( + IntPtr db, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)keyLength; + UIntPtr svlength = (UIntPtr)valueLength; + if (cf is null) + { + rocksdb_transaction_put(db, key, sklength, value, svlength, out errptr); + } + else + { + rocksdb_transaction_put_cf(db, cf.Handle, key, sklength, value, svlength, out errptr); + } + + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + #if !NETSTANDARD2_0 public unsafe void rocksdb_put( IntPtr db, @@ -86,6 +126,35 @@ public unsafe void rocksdb_put( } } } + + public unsafe void rocksdb_transaction_put( + IntPtr db, + ReadOnlySpan key, + ReadOnlySpan value, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)key.Length; + UIntPtr svlength = (UIntPtr)value.Length; + + fixed (byte* keyPtr = &MemoryMarshal.GetReference(key)) + fixed (byte* valuePtr = &MemoryMarshal.GetReference(value)) + { + if (cf is null) + { + rocksdb_transaction_put(db, keyPtr, sklength, valuePtr, svlength, out errptr); + } + else + { + rocksdb_transaction_put_cf(db, cf.Handle, keyPtr, sklength, valuePtr, svlength, out errptr); + } + + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + } #endif public void rocksdb_merge( @@ -200,6 +269,60 @@ public byte[] rocksdb_get( return result; } + public string rocksdb_transaction_get( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + ColumnFamilyHandle cf, + System.Text.Encoding encoding = null) + { + var result = rocksdb_transaction_get(db, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + + public IntPtr rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + byte[] key, + long keyLength, + out long vallen, + ColumnFamilyHandle cf) + { + UIntPtr sklength = (UIntPtr)keyLength; + var result = cf is null + ? rocksdb_transaction_get(db, read_options, key, sklength, out UIntPtr valLength, out IntPtr errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, key, sklength, out valLength, out errptr); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + vallen = (long)valLength; + return result; + } + + public byte[] rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + byte[] key, + long keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(db, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + + public bool rocksdb_has_key( IntPtr db, IntPtr read_options, @@ -260,6 +383,65 @@ internal bool rocksdb_has_key(IntPtr db, IntPtr read_options, string key, Column } } + public bool rocksdb_transaction_has_key( + IntPtr db, + IntPtr read_options, + byte[] key, + long keyLength, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_has_key(db, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + + internal bool rocksdb_transaction_has_key(IntPtr db, IntPtr read_options, string key, ColumnFamilyHandle cf, Encoding encoding) + { + if (encoding is null) + { + encoding = Encoding.UTF8; + } + + IntPtr errptr; + + unsafe + { + fixed (char* k = key) + { + int klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + UIntPtr sklength = (UIntPtr)bklength; + + var resultPtr = cf is null + ? rocksdb_transaction_get(db, read_options, bk, sklength, out UIntPtr bvlength, out errptr) + : rocksdb_transaction_get_cf(db, read_options, cf.Handle, bk, sklength, out bvlength, out errptr); + + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + if (resultPtr == IntPtr.Zero) + { + return false; + } + + rocksdb_free(resultPtr); + + return true; + } + } + } #if !NETSTANDARD2_0 public byte[] rocksdb_get( @@ -323,6 +505,68 @@ public T rocksdb_get( return result; } + + public byte[] rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(db, read_options, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + + public bool rocksdb_transaction_has_key( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_has_key(db, read_options, key, out IntPtr errptr, cf); + + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + public T rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + ISpanDeserializer deserializer, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(db, read_options, key, deserializer, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } + + public T rocksdb_transaction_get( + IntPtr db, + IntPtr read_options, + ReadOnlySpan key, + Func deserializer, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(db, read_options, key, deserializer, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + + return result; + } #endif public System.Collections.Generic.KeyValuePair[] rocksdb_multi_get( @@ -371,6 +615,51 @@ public System.Collections.Generic.KeyValuePair[] rocksdb_multi_g return result; } + public System.Collections.Generic.KeyValuePair[] rocksdb_transaction_multi_get( + IntPtr db, + IntPtr read_options, + string[] keys, + ColumnFamilyHandle[] cf = null, + System.Text.Encoding encoding = null) + { + if (encoding == null) + { + encoding = System.Text.Encoding.UTF8; + } + + IntPtr[] errptrs = new IntPtr[keys.Length]; + var result = rocksdb_transaction_multi_get(db, read_options, keys, cf: cf, errptrs: errptrs, encoding: encoding); + foreach (var errptr in errptrs) + { + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + + return result; + } + + public System.Collections.Generic.KeyValuePair[] rocksdb_transaction_multi_get( + IntPtr db, + IntPtr read_options, + byte[][] keys, + ulong[] keyLengths = null, + ColumnFamilyHandle[] cf = null) + { + IntPtr[] errptrs = new IntPtr[keys.Length]; + var result = rocksdb_transaction_multi_get(db, read_options, keys, keyLengths: keyLengths, cf: cf, errptrs: errptrs); + foreach (var errptr in errptrs) + { + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + + return result; + } + public void rocksdb_delete( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, @@ -384,6 +673,18 @@ public void rocksdb_delete( } } + public void rocksdb_transaction_delete( + /*rocksdb_t**/ IntPtr db, + /*const*/ string key, + ColumnFamilyHandle cf) + { + rocksdb_transaction_delete(db, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + { + throw new RocksDbException(errptr); + } + } + [Obsolete("Use UIntPtr version instead")] public void rocksdb_delete( /*rocksdb_t**/ IntPtr db, diff --git a/csharp/src/Options/TransactionDbOptions.cs b/csharp/src/Options/TransactionDbOptions.cs new file mode 100644 index 0000000..93d2f73 --- /dev/null +++ b/csharp/src/Options/TransactionDbOptions.cs @@ -0,0 +1,81 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionDbOptions + { + public TransactionDbOptions() + { + Handle = Native.Instance.rocksdb_transactiondb_options_create(); + } + + public IntPtr Handle { get; private set; } + + ~TransactionDbOptions() + { + if (Handle == IntPtr.Zero) + return; + +#if !NODESTROY + Native.Instance.rocksdb_transactiondb_options_destroy(Handle); +#endif + Handle = IntPtr.Zero; + } + + /// + /// Specifies the maximum number of keys that can be locked at the same time per column family. + /// + /// The maximum number of keys that can be locked; If this value is not positive, no limit will be enforced. + /// + public TransactionDbOptions SetMaxNumLocks(long value) + { + Native.Instance.rocksdb_transactiondb_options_set_max_num_locks(Handle, value); + return this; + } + + /// + /// Increasing this value will increase the concurrency by dividing the lock table (per column family) into more sub-tables, each with their own separate mutex. + /// Default: 16 + /// + /// The number of sub-tables + /// + public TransactionDbOptions SetNumStripes(ulong value = 16) + { + Native.Instance.rocksdb_transactiondb_options_set_num_stripes(Handle, (UIntPtr)value); + return this; + } + + /// + /// If positive, specifies the default wait timeout in milliseconds when a transaction attempts to lock a key if not specified by TransactionOptionsSetLockTimeout(long). + /// If 0, no waiting is done if a lock cannot instantly be acquired. + /// If negative, there is no timeout. Not using a timeout is not recommended as it can lead to deadlocks. + /// Currently, there is no deadlock-detection to recover from a deadlock. + /// Default: 1000 + /// + /// The default wait timeout in milliseconds + /// + public TransactionDbOptions SetTransactionLockTimeout(long value = 1000) + { + Native.Instance.rocksdb_transactiondb_options_set_transaction_lock_timeout(Handle, value); + return this; + } + + /// + /// If positive, specifies the wait timeout in milliseconds when writing a key OUTSIDE of a transaction (ie by calling put, merge, delete or write directly). + /// If 0, no waiting is done if a lock cannot instantly be acquired. + /// If negative, there is no timeout and will block indefinitely when acquiring a lock. + /// Not using a timeout can lead to deadlocks. + /// Currently, there is no deadlock-detection to recover from a deadlock. + /// While DB writes cannot deadlock with other DB writes, they can deadlock with a transaction. + /// A negative timeout should only be used if all transactions have a small expiration set. + /// Default: 1000 + /// + /// The timeout in milliseconds when writing a key OUTSIDE of a transaction. + /// + public TransactionDbOptions SetDefaultLockTimeout(long value = 1000) + { + Native.Instance.rocksdb_transactiondb_options_set_default_lock_timeout(Handle, value); + return this; + } + } +} \ No newline at end of file diff --git a/csharp/src/Options/TransactionOptions.cs b/csharp/src/Options/TransactionOptions.cs new file mode 100644 index 0000000..0c1165d --- /dev/null +++ b/csharp/src/Options/TransactionOptions.cs @@ -0,0 +1,102 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionOptions + { + public TransactionOptions() + { + Handle = Native.Instance.rocksdb_transaction_options_create(); + } + + public IntPtr Handle { get; private set; } + + ~TransactionOptions() + { + if (Handle == IntPtr.Zero) + return; + +#if !NODESTROY + Native.Instance.rocksdb_transaction_options_destroy(Handle); +#endif + Handle = IntPtr.Zero; + } + + /// + /// Setting the setSnapshot to true is the same as calling Transaction.SetSnapshot(). + /// Default: false + /// + /// Whether to set a snapshot + /// + public TransactionOptions SetSetSnapshot(bool value) + { + Native.Instance.rocksdb_transaction_options_set_set_snapshot(Handle, value); + return this; + } + + /// + /// Setting to true means that before acquiring locks, this transaction will check if doing so will cause a deadlock. + /// If so, it will return with Status.Code.Busy. + /// The user should retry their transaction. + /// + /// true if we should detect deadlocks. + /// + public TransactionOptions SetDeadlockDetect(bool value) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect(Handle, value); + return this; + } + + /// + /// If positive, specifies the wait timeout in milliseconds when a transaction attempts to lock a key. + /// If 0, no waiting is done if a lock cannot instantly be acquired. + /// If negative, TransactionDBOptions.GetTransactionLockTimeout(long) will be used + /// Default: -1 + /// + /// The lock timeout in milliseconds + /// + public TransactionOptions SetLockTimeout(long value) + { + Native.Instance.rocksdb_transaction_options_set_lock_timeout(Handle, value); + return this; + } + + /// + /// Expiration duration in milliseconds. + /// If non-negative, transactions that last longer than this many milliseconds will fail to commit. + /// If not set, a forgotten transaction that is never committed, rolled back, or deleted will never relinquish any locks it holds. + /// This could prevent keys from being written by other writers. + /// Default: -1 + /// + /// The expiration duration in milliseconds + /// + public TransactionOptions SetExpiration(long value) + { + Native.Instance.rocksdb_transaction_options_set_expiration(Handle, value); + return this; + } + + /// + /// Sets the number of traversals to make during deadlock detection. + /// Default: 50 + /// + /// The number of traversals to make during deadlock detection + /// + public TransactionOptions SetDeadlockDetectDepth(long value) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect_depth(Handle, value); + return this; + } + + /// + /// Set the maximum number of bytes that may be used for the write batch. + /// + /// The maximum number of bytes, 0 means no limit. + /// + public TransactionOptions SetMaxWriteBatchSize(ulong value) + { + Native.Instance.rocksdb_transaction_options_set_max_write_batch_size(Handle, (UIntPtr)value); + return this; + } + } +} \ No newline at end of file diff --git a/csharp/src/RocksDb.cs b/csharp/src/RocksDb.cs index 6fbd4a8..924e2c2 100644 --- a/csharp/src/RocksDb.cs +++ b/csharp/src/RocksDb.cs @@ -1,74 +1,26 @@ using System; using System.Collections.Generic; -using System.Dynamic; -using System.IO; using System.Linq; -using System.Runtime.InteropServices; -using System.Text; -using Transitional; namespace RocksDbSharp { - - public sealed class RocksDb : IDisposable + public sealed class RocksDb : RocksDbBase { - private bool _disposed; - internal static ReadOptions DefaultReadOptions { get; } = new ReadOptions(); - internal static OptionsHandle DefaultOptions { get; } = new DbOptions(); - internal static WriteOptions DefaultWriteOptions { get; } = new WriteOptions(); - internal static Encoding DefaultEncoding => Encoding.UTF8; - private Dictionary columnFamilies; - - // Managed references to unmanaged resources that need to live at least as long as the db - internal dynamic References { get; } = new ExpandoObject(); - - public IntPtr Handle { get; internal set; } - private RocksDb(IntPtr handle, dynamic optionsReferences, dynamic cfOptionsRefs, Dictionary columnFamilies = null) + : base(handle, (object)optionsReferences, (object)cfOptionsRefs, columnFamilies) { - this.Handle = handle; - References.Options = optionsReferences; - References.CfOptions = cfOptionsRefs; - this.columnFamilies = columnFamilies; } - ~RocksDb() + protected override void ReleaseUnmanagedResources() { - ReleaseUnmanagedResources(); - } - - public void Dispose() - { - if (_disposed) return; + base.ReleaseUnmanagedResources(); - try - { - ReleaseUnmanagedResources(); - GC.SuppressFinalize(this); - } - finally - { - _disposed = true; - } - } - - private void ReleaseUnmanagedResources() - { - if (columnFamilies is object) - { - foreach (var cfh in columnFamilies.Values) - { - cfh.Dispose(); - } - columnFamilies = null; - } + if (Handle == IntPtr.Zero) + return; - if(Handle != IntPtr.Zero) - { - var handle = Handle; - Handle = IntPtr.Zero; - Native.Instance.rocksdb_close(handle); - } + var handle = Handle; + Handle = IntPtr.Zero; + Native.Instance.rocksdb_close(handle); } public static RocksDb Open(OptionsHandle options, string path) @@ -170,7 +122,7 @@ public static RocksDb OpenAsSecondary(DbOptions options, string path, string sec columnFamilies: cfHandleMap); } } - + /// /// Usage: /// > options) - { - var keys = options.Select(e => e.Key).ToArray(); - var values = options.Select(e => e.Value).ToArray(); - Native.Instance.rocksdb_set_options(Handle, keys.Length, keys, values); - } - - public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) - { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); - } - - public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Get(key, key.GetLongLength(0), cf, readOptions); - } - -#if !NETSTANDARD2_0 - public byte[] Get(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf); - } - - public bool HasKey(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf); - } - - public T Get(ReadOnlySpan key, ISpanDeserializer deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, deserializer, cf); - } - - public T Get(ReadOnlySpan key, Func deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, deserializer, cf); - } -#endif - - public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); - } - - public bool HasKey(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); - } - - public bool HasKey(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) - { - return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); - } - - /// - /// Reads the contents of the database value associated with , if present, into the supplied - /// at up to bytes, returning the - /// length of the value in the database, or -1 if the key is not present. - /// - /// - /// - /// - /// - /// - /// - /// The actual length of the database field if it exists, otherwise -1 - public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); - } - - /// - /// Reads the contents of the database value associated with , if present, into the supplied - /// at up to bytes, returning the - /// length of the value in the database, or -1 if the key is not present. - /// - /// - /// - /// - /// - /// - /// - /// - /// The actual length of the database field if it exists, otherwise -1 - public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - unsafe - { - var ptr = Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); - if (ptr == IntPtr.Zero) - { - return -1; - } - - var copyLength = Math.Min(length, valLength); - Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); - Native.Instance.rocksdb_free(ptr); - return valLength; - } - } - - public KeyValuePair[] MultiGet(byte[][] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys, null, cf); - } - - public KeyValuePair[] MultiGet(string[] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) - { - return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys, cf); - } - - public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); - } - - public void Write(WriteBatchWithIndex writeBatch, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_write_writebatch_wi(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); - } - - public void Remove(string key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, cf); - } - - public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Remove(key, key.Length, cf, writeOptions); - } - -#if !NETSTANDARD2_0 - public unsafe void Remove(ReadOnlySpan key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - fixed (byte* keyPtr = &MemoryMarshal.GetReference(key)) - { - if (cf is null) - { - Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, keyPtr, (UIntPtr)key.Length); - } - else - { - Native.Instance.rocksdb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, cf.Handle, keyPtr, (UIntPtr)key.Length); - } - } - } -#endif - - public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - if (cf is null) - { - Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, (UIntPtr)keyLength); - } - else - { - Native.Instance.rocksdb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, cf.Handle, key, (UIntPtr)keyLength); - } - } - - public void Put(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) - { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); - } - - public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); - } - -#if !NETSTANDARD2_0 - public void Put(ReadOnlySpan key, ReadOnlySpan value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf); - } -#endif - - public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); - } - - public void Merge(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) - { - Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); - } - - public void Merge(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Merge(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); - } - -#if !NETSTANDARD2_0 - public void Merge(ReadOnlySpan key, ReadOnlySpan value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf); - } -#endif - - public void Merge(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) - { - Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); - } - - public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) - { - IntPtr iteratorHandle = cf is null - ? Native.Instance.rocksdb_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle) - : Native.Instance.rocksdb_create_iterator_cf(Handle, (readOptions ?? DefaultReadOptions).Handle, cf.Handle); - // Note: passing in read options here only to ensure that it is not collected before the iterator - return new Iterator(iteratorHandle, readOptions); - } - - public Iterator[] NewIterators(ColumnFamilyHandle[] cfs, ReadOptions[] readOptions) - { - throw new NotImplementedException("TODO: Implement NewIterators()"); - // See rocksdb_create_iterators - } - - public Snapshot CreateSnapshot() - { - IntPtr snapshotHandle = Native.Instance.rocksdb_create_snapshot(Handle); - return new Snapshot(Handle, snapshotHandle); - } - - public static IEnumerable ListColumnFamilies(DbOptions options, string name) - { - return Native.Instance.rocksdb_list_column_families(options.Handle, name); - } - - public static bool TryListColumnFamilies(DbOptions options, string name, out string[] columnFamilies) - { - var result = Native.Instance.rocksdb_list_column_families(options.Handle, name, out UIntPtr lencf, out IntPtr errptr); - if (errptr != IntPtr.Zero) - { - columnFamilies = Array.Empty(); - return false; - } - - IntPtr[] ptrs = new IntPtr[(ulong)lencf]; - Marshal.Copy(result, ptrs, 0, (int)lencf); - columnFamilies = new string[(ulong)lencf]; - for (ulong i = 0; i < (ulong)lencf; i++) - { - columnFamilies[i] = Marshal.PtrToStringAnsi(ptrs[i]); - } - - Native.Instance.rocksdb_list_column_families_destroy(result, lencf); - return true; - } - - public ColumnFamilyHandle CreateColumnFamily(ColumnFamilyOptions cfOptions, string name) - { - var cfh = Native.Instance.rocksdb_create_column_family(Handle, cfOptions.Handle, name); - var cfhw = new ColumnFamilyHandleInternal(cfh); - columnFamilies.Add(name, cfhw); - return cfhw; - } - - public void DropColumnFamily(string name) - { - var cf = GetColumnFamily(name); - Native.Instance.rocksdb_drop_column_family(Handle, cf.Handle); - columnFamilies.Remove(name); - } - - public ColumnFamilyHandle GetDefaultColumnFamily() - { - return GetColumnFamily(ColumnFamilies.DefaultName); - } - - public ColumnFamilyHandle GetColumnFamily(string name) - { - if (columnFamilies is null) - { - throw new RocksDbSharpException("Database not opened for column families"); - } - - return columnFamilies[name]; - } - - public bool TryGetColumnFamily(string name, out ColumnFamilyHandle handle) - { - if (columnFamilies is null) - { - throw new RocksDbSharpException("Database not opened for column families"); - } - - if (columnFamilies.TryGetValue(name, out var internalHandle)) - { - handle = internalHandle; - return true; - } - - handle = null; - return false; - } - - public string GetProperty(string propertyName) - { - return Native.Instance.rocksdb_property_value_string(Handle, propertyName); - } - - public string GetProperty(string propertyName, ColumnFamilyHandle cf) - { - return Native.Instance.rocksdb_property_value_cf_string(Handle, cf.Handle, propertyName); - } - - public void IngestExternalFiles(string[] files, IngestExternalFileOptions ingestOptions, ColumnFamilyHandle cf = null) - { - if (cf is null) - { - Native.Instance.rocksdb_ingest_external_file(Handle, files, (UIntPtr)files.GetLongLength(0), ingestOptions.Handle); - } - else - { - Native.Instance.rocksdb_ingest_external_file_cf(Handle, cf.Handle, files, (UIntPtr)files.GetLongLength(0), ingestOptions.Handle); - } - } - - public void CompactRange(byte[] start, byte[] limit, ColumnFamilyHandle cf = null) - { - if (cf is null) - { - Native.Instance.rocksdb_compact_range(Handle, start, (UIntPtr)(start?.GetLongLength(0) ?? 0L), limit, (UIntPtr)(limit?.GetLongLength(0) ?? 0L)); - } - else - { - Native.Instance.rocksdb_compact_range_cf(Handle, cf.Handle, start, (UIntPtr)(start?.GetLongLength(0) ?? 0L), limit, (UIntPtr)(limit?.GetLongLength(0) ?? 0L)); - } - } - - public void CompactRange(string start, string limit, ColumnFamilyHandle cf = null, Encoding encoding = null) - { - if (encoding is null) - { - encoding = Encoding.UTF8; - } - - CompactRange(start is null ? null : encoding.GetBytes(start), limit is null ? null : encoding.GetBytes(limit), cf); - } - - public void TryCatchUpWithPrimary() - { - Native.Instance.rocksdb_try_catch_up_with_primary(Handle); - } - - public void Flush(FlushOptions flushOptions) - { - Native.Instance.rocksdb_flush(Handle, flushOptions.Handle); - } - - - /// - /// Returns metadata about the file and data in the file. - /// - /// setting it to true only populates FileName, - /// Filesize and filelevel; By default it is false - /// LiveFilesMetadata or null in case of failure - public List GetLiveFilesMetadata(bool populateFileMetadataOnly=false) - { - IntPtr buffer = Native.Instance.rocksdb_livefiles(Handle); - if (buffer == IntPtr.Zero) - { - return null; - } - - try - { - List filesMetadata = new List(); - - int fileCount = Native.Instance.rocksdb_livefiles_count(buffer); - for (int index = 0; index < fileCount; index++) - { - LiveFileMetadata liveFileMetadata = new LiveFileMetadata(); - - FileMetadata metadata = new FileMetadata(); - IntPtr fileMetadata = Native.Instance.rocksdb_livefiles_name(buffer, index); - string fileName = Marshal.PtrToStringAnsi(fileMetadata); - - int level = Native.Instance.rocksdb_livefiles_level(buffer, index); - - UIntPtr fS = Native.Instance.rocksdb_livefiles_size(buffer, index); - ulong fileSize = fS.ToUInt64(); - - metadata.FileName = fileName; - metadata.FileLevel = level; - metadata.FileSize = fileSize; - - liveFileMetadata.FileMetadata = metadata; - - if (!populateFileMetadataOnly) - { - FileDataMetadata fileDataMetadata = new FileDataMetadata(); - var smallestKeyPtr = Native.Instance.rocksdb_livefiles_smallestkey(buffer, - index, - out var smallestKeySize); - string smallestKey = Marshal.PtrToStringAnsi(smallestKeyPtr); - - var largestKeyPtr = Native.Instance.rocksdb_livefiles_largestkey(buffer, - index, - out var largestKeySize); - string largestKey = Marshal.PtrToStringAnsi(largestKeyPtr); - - ulong entries = Native.Instance.rocksdb_livefiles_entries(buffer, index); - ulong deletions = Native.Instance.rocksdb_livefiles_deletions(buffer, index); - - fileDataMetadata.SmallestKeyInFile = smallestKey; - fileDataMetadata.LargestKeyInFile = largestKey; - fileDataMetadata.NumEntriesInFile = entries; - fileDataMetadata.NumDeletionsInFile = deletions; - - liveFileMetadata.FileDataMetadata = fileDataMetadata; - } - - filesMetadata.Add(liveFileMetadata); - } - - return filesMetadata; - } - finally - { - Native.Instance.rocksdb_livefiles_destroy(buffer); - buffer = IntPtr.Zero; - } - } - - /// - /// Lean API to just get Live file names. - /// Refer to GetLiveFilesMetadata() for the complete metadata - /// - /// - public List GetLiveFileNames() - { - IntPtr buffer = Native.Instance.rocksdb_livefiles(Handle); - if (buffer == IntPtr.Zero) - { - return new List(); - } - - try - { - List liveFiles = new List(); - - int fileCount = Native.Instance.rocksdb_livefiles_count(buffer); - - for (int index = 0; index < fileCount; index++) - { - IntPtr fileMetadata = Native.Instance.rocksdb_livefiles_name(buffer, index); - string fileName = Marshal.PtrToStringAnsi(fileMetadata); - liveFiles.Add(fileName); - } - - return liveFiles; - } - finally - { - Native.Instance.rocksdb_livefiles_destroy(buffer); - buffer = IntPtr.Zero; - } - } } } diff --git a/csharp/src/RocksDbBase.cs b/csharp/src/RocksDbBase.cs new file mode 100644 index 0000000..aaff3ff --- /dev/null +++ b/csharp/src/RocksDbBase.cs @@ -0,0 +1,530 @@ +using System; +using System.Collections.Generic; +using System.Dynamic; +using System.IO; +using System.Linq; +using System.Runtime.InteropServices; +using System.Text; +using Transitional; + +namespace RocksDbSharp +{ + + public abstract class RocksDbBase : IDisposable + { + private bool _disposed; + internal static ReadOptions DefaultReadOptions { get; } = new ReadOptions(); + internal static OptionsHandle DefaultOptions { get; } = new DbOptions(); + internal static WriteOptions DefaultWriteOptions { get; } = new WriteOptions(); + internal static Encoding DefaultEncoding => Encoding.UTF8; + private Dictionary columnFamilies; + + // Managed references to unmanaged resources that need to live at least as long as the db + internal dynamic References { get; } = new ExpandoObject(); + + public IntPtr Handle { get; internal set; } + + internal RocksDbBase(IntPtr handle, dynamic optionsReferences, dynamic cfOptionsRefs, Dictionary columnFamilies = null) + { + this.Handle = handle; + References.Options = optionsReferences; + References.CfOptions = cfOptionsRefs; + this.columnFamilies = columnFamilies; + } + + ~RocksDbBase() + { + ReleaseUnmanagedResources(); + } + + public void Dispose() + { + if (_disposed) return; + + try + { + ReleaseUnmanagedResources(); + GC.SuppressFinalize(this); + } + finally + { + _disposed = true; + } + } + + protected virtual void ReleaseUnmanagedResources() + { + if (columnFamilies is object) + { + foreach (var cfh in columnFamilies.Values) + { + cfh.Dispose(); + } + columnFamilies = null; + } + } + + + public void SetOptions(IEnumerable> options) + { + var keys = options.Select(e => e.Key).ToArray(); + var values = options.Select(e => e.Value).ToArray(); + Native.Instance.rocksdb_set_options(Handle, keys.Length, keys, values); + } + + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); + } + + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), cf, readOptions); + } + +#if !NETSTANDARD2_0 + public byte[] Get(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf); + } + + public bool HasKey(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf); + } + + public T Get(ReadOnlySpan key, ISpanDeserializer deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, deserializer, cf); + } + + public T Get(ReadOnlySpan key, Func deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, deserializer, cf); + } +#endif + + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + public bool HasKey(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + public bool HasKey(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_has_key(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + { + return -1; + } + + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public KeyValuePair[] MultiGet(byte[][] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys, null, cf); + } + + public KeyValuePair[] MultiGet(string[] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_multi_get(Handle, (readOptions ?? DefaultReadOptions).Handle, keys, cf); + } + + public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); + } + + public void Write(WriteBatchWithIndex writeBatch, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_write_writebatch_wi(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); + } + + public void Remove(string key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Remove(key, key.Length, cf, writeOptions); + } + +#if !NETSTANDARD2_0 + public unsafe void Remove(ReadOnlySpan key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + fixed (byte* keyPtr = &MemoryMarshal.GetReference(key)) + { + if (cf is null) + { + Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, keyPtr, (UIntPtr)key.Length); + } + else + { + Native.Instance.rocksdb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, cf.Handle, keyPtr, (UIntPtr)key.Length); + } + } + } +#endif + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + if (cf is null) + { + Native.Instance.rocksdb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, (UIntPtr)keyLength); + } + else + { + Native.Instance.rocksdb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, cf.Handle, key, (UIntPtr)keyLength); + } + } + + public void Put(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) + { + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); + } + +#if !NETSTANDARD2_0 + public void Put(ReadOnlySpan key, ReadOnlySpan value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf); + } +#endif + + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); + } + + public void Merge(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) + { + Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Merge(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Merge(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); + } + +#if !NETSTANDARD2_0 + public void Merge(ReadOnlySpan key, ReadOnlySpan value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf); + } +#endif + + public void Merge(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); + } + + public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + IntPtr iteratorHandle = cf is null + ? Native.Instance.rocksdb_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle) + : Native.Instance.rocksdb_create_iterator_cf(Handle, (readOptions ?? DefaultReadOptions).Handle, cf.Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle, readOptions); + } + + public Iterator[] NewIterators(ColumnFamilyHandle[] cfs, ReadOptions[] readOptions) + { + throw new NotImplementedException("TODO: Implement NewIterators()"); + // See rocksdb_create_iterators + } + + public Snapshot CreateSnapshot() + { + IntPtr snapshotHandle = Native.Instance.rocksdb_create_snapshot(Handle); + return new Snapshot(Handle, snapshotHandle); + } + + public static IEnumerable ListColumnFamilies(DbOptions options, string name) + { + return Native.Instance.rocksdb_list_column_families(options.Handle, name); + } + + public static bool TryListColumnFamilies(DbOptions options, string name, out string[] columnFamilies) + { + var result = Native.Instance.rocksdb_list_column_families(options.Handle, name, out UIntPtr lencf, out IntPtr errptr); + if (errptr != IntPtr.Zero) + { + columnFamilies = Array.Empty(); + return false; + } + + IntPtr[] ptrs = new IntPtr[(ulong)lencf]; + Marshal.Copy(result, ptrs, 0, (int)lencf); + columnFamilies = new string[(ulong)lencf]; + for (ulong i = 0; i < (ulong)lencf; i++) + { + columnFamilies[i] = Marshal.PtrToStringAnsi(ptrs[i]); + } + + Native.Instance.rocksdb_list_column_families_destroy(result, lencf); + return true; + } + + public ColumnFamilyHandle CreateColumnFamily(ColumnFamilyOptions cfOptions, string name) + { + var cfh = Native.Instance.rocksdb_create_column_family(Handle, cfOptions.Handle, name); + var cfhw = new ColumnFamilyHandleInternal(cfh); + columnFamilies.Add(name, cfhw); + return cfhw; + } + + public void DropColumnFamily(string name) + { + var cf = GetColumnFamily(name); + Native.Instance.rocksdb_drop_column_family(Handle, cf.Handle); + columnFamilies.Remove(name); + } + + public ColumnFamilyHandle GetDefaultColumnFamily() + { + return GetColumnFamily(ColumnFamilies.DefaultName); + } + + public ColumnFamilyHandle GetColumnFamily(string name) + { + if (columnFamilies is null) + { + throw new RocksDbSharpException("Database not opened for column families"); + } + + return columnFamilies[name]; + } + + public bool TryGetColumnFamily(string name, out ColumnFamilyHandle handle) + { + if (columnFamilies is null) + { + throw new RocksDbSharpException("Database not opened for column families"); + } + + if (columnFamilies.TryGetValue(name, out var internalHandle)) + { + handle = internalHandle; + return true; + } + + handle = null; + return false; + } + + public string GetProperty(string propertyName) + { + return Native.Instance.rocksdb_property_value_string(Handle, propertyName); + } + + public string GetProperty(string propertyName, ColumnFamilyHandle cf) + { + return Native.Instance.rocksdb_property_value_cf_string(Handle, cf.Handle, propertyName); + } + + public void IngestExternalFiles(string[] files, IngestExternalFileOptions ingestOptions, ColumnFamilyHandle cf = null) + { + if (cf is null) + { + Native.Instance.rocksdb_ingest_external_file(Handle, files, (UIntPtr)files.GetLongLength(0), ingestOptions.Handle); + } + else + { + Native.Instance.rocksdb_ingest_external_file_cf(Handle, cf.Handle, files, (UIntPtr)files.GetLongLength(0), ingestOptions.Handle); + } + } + + public void CompactRange(byte[] start, byte[] limit, ColumnFamilyHandle cf = null) + { + if (cf is null) + { + Native.Instance.rocksdb_compact_range(Handle, start, (UIntPtr)(start?.GetLongLength(0) ?? 0L), limit, (UIntPtr)(limit?.GetLongLength(0) ?? 0L)); + } + else + { + Native.Instance.rocksdb_compact_range_cf(Handle, cf.Handle, start, (UIntPtr)(start?.GetLongLength(0) ?? 0L), limit, (UIntPtr)(limit?.GetLongLength(0) ?? 0L)); + } + } + + public void CompactRange(string start, string limit, ColumnFamilyHandle cf = null, Encoding encoding = null) + { + if (encoding is null) + { + encoding = Encoding.UTF8; + } + + CompactRange(start is null ? null : encoding.GetBytes(start), limit is null ? null : encoding.GetBytes(limit), cf); + } + + public void TryCatchUpWithPrimary() + { + Native.Instance.rocksdb_try_catch_up_with_primary(Handle); + } + + public void Flush(FlushOptions flushOptions) + { + Native.Instance.rocksdb_flush(Handle, flushOptions.Handle); + } + + + /// + /// Returns metadata about the file and data in the file. + /// + /// setting it to true only populates FileName, + /// Filesize and filelevel; By default it is false + /// LiveFilesMetadata or null in case of failure + public List GetLiveFilesMetadata(bool populateFileMetadataOnly=false) + { + IntPtr buffer = Native.Instance.rocksdb_livefiles(Handle); + if (buffer == IntPtr.Zero) + { + return null; + } + + try + { + List filesMetadata = new List(); + + int fileCount = Native.Instance.rocksdb_livefiles_count(buffer); + for (int index = 0; index < fileCount; index++) + { + LiveFileMetadata liveFileMetadata = new LiveFileMetadata(); + + FileMetadata metadata = new FileMetadata(); + IntPtr fileMetadata = Native.Instance.rocksdb_livefiles_name(buffer, index); + string fileName = Marshal.PtrToStringAnsi(fileMetadata); + + int level = Native.Instance.rocksdb_livefiles_level(buffer, index); + + UIntPtr fS = Native.Instance.rocksdb_livefiles_size(buffer, index); + ulong fileSize = fS.ToUInt64(); + + metadata.FileName = fileName; + metadata.FileLevel = level; + metadata.FileSize = fileSize; + + liveFileMetadata.FileMetadata = metadata; + + if (!populateFileMetadataOnly) + { + FileDataMetadata fileDataMetadata = new FileDataMetadata(); + var smallestKeyPtr = Native.Instance.rocksdb_livefiles_smallestkey(buffer, + index, + out var smallestKeySize); + string smallestKey = Marshal.PtrToStringAnsi(smallestKeyPtr); + + var largestKeyPtr = Native.Instance.rocksdb_livefiles_largestkey(buffer, + index, + out var largestKeySize); + string largestKey = Marshal.PtrToStringAnsi(largestKeyPtr); + + ulong entries = Native.Instance.rocksdb_livefiles_entries(buffer, index); + ulong deletions = Native.Instance.rocksdb_livefiles_deletions(buffer, index); + + fileDataMetadata.SmallestKeyInFile = smallestKey; + fileDataMetadata.LargestKeyInFile = largestKey; + fileDataMetadata.NumEntriesInFile = entries; + fileDataMetadata.NumDeletionsInFile = deletions; + + liveFileMetadata.FileDataMetadata = fileDataMetadata; + } + + filesMetadata.Add(liveFileMetadata); + } + + return filesMetadata; + } + finally + { + Native.Instance.rocksdb_livefiles_destroy(buffer); + buffer = IntPtr.Zero; + } + } + + /// + /// Lean API to just get Live file names. + /// Refer to GetLiveFilesMetadata() for the complete metadata + /// + /// + public List GetLiveFileNames() + { + IntPtr buffer = Native.Instance.rocksdb_livefiles(Handle); + if (buffer == IntPtr.Zero) + { + return new List(); + } + + try + { + List liveFiles = new List(); + + int fileCount = Native.Instance.rocksdb_livefiles_count(buffer); + + for (int index = 0; index < fileCount; index++) + { + IntPtr fileMetadata = Native.Instance.rocksdb_livefiles_name(buffer, index); + string fileName = Marshal.PtrToStringAnsi(fileMetadata); + liveFiles.Add(fileName); + } + + return liveFiles; + } + finally + { + Native.Instance.rocksdb_livefiles_destroy(buffer); + buffer = IntPtr.Zero; + } + } + } +} diff --git a/csharp/src/Transaction.cs b/csharp/src/Transaction.cs new file mode 100644 index 0000000..069941d --- /dev/null +++ b/csharp/src/Transaction.cs @@ -0,0 +1,282 @@ +using System; +using System.Collections.Generic; +using System.Dynamic; +using System.IO; +using System.Runtime.InteropServices; +using System.Text; + +namespace RocksDbSharp +{ + public class Transaction : IDisposable + { + // Managed references to unmanaged resources that need to live at least as long as the db + private dynamic References { get; } = new ExpandoObject(); + + private bool _disposed; + + public IntPtr Handle { get; private set; } + + public Snapshot Snapshot { get; private set; } = null; + + internal Transaction(TransactionDb parent, WriteOptions writeOptions, TransactionOptions transactionOptions) + { + References.Parent = parent; + References.WriteOptions = writeOptions; + References.TransactionOptions = transactionOptions; + + Handle = Native.Instance.rocksdb_transaction_begin( + parent.Handle, + writeOptions.Handle, + transactionOptions.Handle, IntPtr.Zero); + } + + ~Transaction() + { + ReleaseUnmanagedResources(); + } + + public void Dispose() + { + if (_disposed) return; + + try + { + ReleaseUnmanagedResources(); + GC.SuppressFinalize(this); + } + finally + { + _disposed = true; + } + } + + private void ReleaseUnmanagedResources() + { + if (Handle == IntPtr.Zero) + return; + +#if !NODESTROY + Native.Instance.rocksdb_transaction_destroy(Handle); +#endif + Handle = IntPtr.Zero; + } + + /// + /// Prepare the current transaction for 2PC. + /// + /// if an error occurs when preparing the transaction + public void Prepare() + { + Native.Instance.rocksdb_transaction_prepare(Handle); + } + + /// + /// Write all batched keys to the db atomically. + /// + /// if an error occurs when committing the transaction + public void Commit() + { + Native.Instance.rocksdb_transaction_commit(Handle); + } + + /// + /// Discard all batched writes in this transaction. + /// + /// if an error occurs when rolling back the transaction + public void Rollback() + { + Native.Instance.rocksdb_transaction_rollback(Handle); + } + + /// + /// Records the state of the transaction for future calls to . + /// May be called multiple times to set multiple save points. + /// + public void SetSavePoint() + { + Native.Instance.rocksdb_transaction_set_savepoint(Handle); + } + + /// + /// Undo all operations in this transaction (put, merge, delete, putLogData) since the most recent call to and removes the most recent . + /// + public void RollbackToSavePoint() + { + Native.Instance.rocksdb_transaction_rollback_to_savepoint(Handle); + } + + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, cf, encoding ?? RocksDbBase.DefaultEncoding); + } + + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), cf, readOptions); + } + +#if !NETSTANDARD2_0 + public byte[] Get(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, cf); + } + + public bool HasKey(ReadOnlySpan key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_has_key(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, cf); + } + + public T Get(ReadOnlySpan key, ISpanDeserializer deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, deserializer, cf); + } + + public T Get(ReadOnlySpan key, Func deserializer, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, deserializer, cf); + } +#endif + + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, keyLength, cf); + } + + public bool HasKey(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_has_key(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, keyLength, cf); + } + + public bool HasKey(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_transaction_has_key(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, cf, encoding ?? RocksDbBase.DefaultEncoding); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + { + return -1; + } + + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public KeyValuePair[] MultiGet(byte[][] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_multi_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, keys, null, cf); + } + + public KeyValuePair[] MultiGet(string[] keys, ColumnFamilyHandle[] cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_multi_get(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, keys, cf); + } + + public void Remove(string key, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_delete(Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null) + { + Remove(key, key.Length, cf); + } + +#if !NETSTANDARD2_0 + public unsafe void Remove(ReadOnlySpan key, ColumnFamilyHandle cf = null) + { + fixed (byte* keyPtr = &MemoryMarshal.GetReference(key)) + { + if (cf is null) + { + Native.Instance.rocksdb_transaction_delete(Handle, keyPtr, (UIntPtr)key.Length); + } + else + { + Native.Instance.rocksdb_transaction_delete_cf(Handle, cf.Handle, keyPtr, (UIntPtr)key.Length); + } + } + } +#endif + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null) + { + if (cf is null) + { + Native.Instance.rocksdb_transaction_delete(Handle, key, (UIntPtr)keyLength); + } + else + { + Native.Instance.rocksdb_transaction_delete_cf(Handle, cf.Handle, key, (UIntPtr)keyLength); + } + } + + public void Put(string key, string value, ColumnFamilyHandle cf = null, Encoding encoding = null) + { + Native.Instance.rocksdb_transaction_put(Handle, key, value, cf, encoding ?? RocksDbBase.DefaultEncoding); + } + + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null) + { + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf); + } + +#if !NETSTANDARD2_0 + public void Put(ReadOnlySpan key, ReadOnlySpan value, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_put(Handle, key, value, cf); + } +#endif + + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_put(Handle, key, keyLength, value, valueLength, cf); + } + + public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + IntPtr iteratorHandle = cf is null + ? Native.Instance.rocksdb_transaction_create_iterator(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle) + : Native.Instance.rocksdb_transaction_create_iterator_cf(Handle, (readOptions ?? RocksDbBase.DefaultReadOptions).Handle, cf.Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle, readOptions); + } + } +} \ No newline at end of file diff --git a/csharp/src/TransactionDb.cs b/csharp/src/TransactionDb.cs new file mode 100644 index 0000000..869a014 --- /dev/null +++ b/csharp/src/TransactionDb.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace RocksDbSharp +{ + public sealed class TransactionDb : RocksDbBase + { + internal static TransactionOptions DefaultTransactionOptions { get; set; } = new TransactionOptions(); + + private TransactionDb(IntPtr handle, dynamic optionsReferences, dynamic cfOptionsRefs, TransactionDbOptions transactionDbOptions, Dictionary columnFamilies = null) + : base(handle, (object)optionsReferences, (object)cfOptionsRefs, columnFamilies) + { + References.TransactionDbOptions = transactionDbOptions; + } + + protected override void ReleaseUnmanagedResources() + { + base.ReleaseUnmanagedResources(); + + if (Handle == IntPtr.Zero) + return; + + var handle = Handle; + Handle = IntPtr.Zero; + Native.Instance.rocksdb_transactiondb_close(handle); + } + + public static TransactionDb Open(OptionsHandle options, TransactionDbOptions transactionDbOptions, string path) + { + using (var pathSafe = new RocksSafePath(path)) + { + IntPtr db = Native.Instance.rocksdb_transactiondb_open(options.Handle, transactionDbOptions.Handle, pathSafe.Handle); + return new TransactionDb(db, optionsReferences: options, cfOptionsRefs: null, transactionDbOptions: transactionDbOptions); + } + } + + public static TransactionDb Open(DbOptions options, TransactionDbOptions transactionDbOptions, string path, ColumnFamilies columnFamilies) + { + using (var pathSafe = new RocksSafePath(path)) + { + string[] cfnames = columnFamilies.Names.ToArray(); + IntPtr[] cfoptions = columnFamilies.OptionHandles.ToArray(); + IntPtr[] cfhandles = new IntPtr[cfnames.Length]; + IntPtr db = Native.Instance.rocksdb_transactiondb_open_column_families(options.Handle, transactionDbOptions.Handle, pathSafe.Handle, cfnames.Length, cfnames, cfoptions, cfhandles); + var cfHandleMap = new Dictionary(); + foreach (var pair in cfnames.Zip(cfhandles.Select(cfh => new ColumnFamilyHandleInternal(cfh)), (name, cfh) => new { Name = name, Handle = cfh })) + { + cfHandleMap.Add(pair.Name, pair.Handle); + } + + return new TransactionDb(db, + optionsReferences: options.References, + cfOptionsRefs: columnFamilies.Select(cfd => cfd.Options.References).ToArray(), + transactionDbOptions: transactionDbOptions, + columnFamilies: cfHandleMap); + } + } + + public Transaction BeginTransaction(WriteOptions writeOptions = null, TransactionOptions transactionOptions = null) + { + return new Transaction(this, + writeOptions ?? DefaultWriteOptions, + transactionOptions ?? DefaultTransactionOptions); + } + + /// + /// Usage: + /// + /// + /// + public Checkpoint Checkpoint() + { + var checkpoint = Native.Instance.rocksdb_transactiondb_checkpoint_object_create(Handle); + return new Checkpoint(checkpoint); + } + } +} \ No newline at end of file