Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
3b448d9
Initial transaction log API
cb1kenobi Sep 15, 2025
85be3ed
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Sep 16, 2025
ed8cbd1
Merge
cb1kenobi Sep 16, 2025
8aa2ea1
Init TransactionLog and add constructor
cb1kenobi Sep 16, 2025
2a17e10
Add option to disable the write ahead log
cb1kenobi Sep 16, 2025
ae7b2cf
Clean up tests
cb1kenobi Sep 18, 2025
f5238af
GitHub action cleanup, other cleanup
cb1kenobi Sep 18, 2025
04a6f93
Re-enable getSync()
cb1kenobi Sep 18, 2025
756257a
Pass GitHub token to setup deps action
cb1kenobi Sep 18, 2025
4ce9763
Check needs node
cb1kenobi Sep 18, 2025
c21c150
Setup deps
cb1kenobi Sep 18, 2025
c7b6547
Have bun run tests instead of coverage
cb1kenobi Sep 18, 2025
2af1735
Try to get statuses to be happy
cb1kenobi Sep 18, 2025
2a3ad4e
Try step summary
cb1kenobi Sep 18, 2025
90e4bf3
Just let bun fail
cb1kenobi Sep 18, 2025
8cc9d81
Fix some tests
cb1kenobi Sep 18, 2025
7039b6e
Added note
cb1kenobi Sep 18, 2025
500e67f
Don't cancel workflow
cb1kenobi Sep 18, 2025
60a5b3b
Actually build for bun
cb1kenobi Sep 18, 2025
20fe900
Fix seg fault and get tests passing on Bun
cb1kenobi Sep 18, 2025
e9e06ef
Fix and wire up deno
cb1kenobi Sep 19, 2025
6d46769
Bench scripts for deno and bun
cb1kenobi Sep 19, 2025
c027257
Merge
cb1kenobi Sep 23, 2025
82ce49a
Work
cb1kenobi Sep 23, 2025
c53e44c
Revert debug
cb1kenobi Sep 25, 2025
fc0f7df
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Sep 25, 2025
c8959d3
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Sep 25, 2025
baf9353
Fix int formats
cb1kenobi Sep 25, 2025
842af36
Reorg cleanup
cb1kenobi Sep 30, 2025
ced2b2d
Fix listLogs()
cb1kenobi Sep 30, 2025
326d3a7
Back to native
cb1kenobi Sep 30, 2025
373f990
Major reorg to move transaction log state into shared TransactionLogS…
cb1kenobi Oct 1, 2025
6ab3972
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Oct 1, 2025
afcc882
Work on GC test
cb1kenobi Oct 2, 2025
8db667f
Fixed test
cb1kenobi Oct 2, 2025
77dbb8f
Add transactionLogMaxSize setting
cb1kenobi Oct 3, 2025
72adbb6
Document transactionLogsPath
cb1kenobi Oct 3, 2025
130266b
Add support for transaction log subdirs and start wiring up addEntry
cb1kenobi Oct 3, 2025
5f233fb
Shuffled log discovery code, work on transaction file loading
cb1kenobi Oct 7, 2025
08105d5
Remove isOpen
cb1kenobi Oct 7, 2025
09745ed
Remove unnecessary includes
cb1kenobi Oct 7, 2025
e16cf73
Windows support
cb1kenobi Oct 7, 2025
531198c
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Oct 7, 2025
e13ee90
Fix semicolon
cb1kenobi Oct 7, 2025
7d96cee
Read/write bytes
cb1kenobi Oct 7, 2025
5c73764
read/write data
cb1kenobi Oct 7, 2025
28b1342
read/write
cb1kenobi Oct 7, 2025
d93f1c9
More Windows fixes
cb1kenobi Oct 7, 2025
8d35f1a
undef near/far
cb1kenobi Oct 7, 2025
e31f771
More guessing
cb1kenobi Oct 7, 2025
c88e9a1
Fixes
cb1kenobi Oct 7, 2025
a858819
More guessing
cb1kenobi Oct 7, 2025
8bf7b1d
Break up platform specific stuff
cb1kenobi Oct 7, 2025
baa173b
No minmax
cb1kenobi Oct 8, 2025
f6fbbc0
Wire up closing log store
cb1kenobi Oct 8, 2025
8948ba3
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Oct 8, 2025
e67f63e
Log file concurrency, stub query, remove memory map (for now)
cb1kenobi Oct 9, 2025
6743226
Wire up transaction log purging and write some data
cb1kenobi Oct 10, 2025
69a7b28
Fix paths for Windows
cb1kenobi Oct 10, 2025
2d6d380
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Oct 10, 2025
4dd11c1
Docs, tests, tried to fix thread synchronization
cb1kenobi Oct 10, 2025
a374f84
Add test worker
cb1kenobi Oct 10, 2025
8f1bcb5
Fix merge conflicts
cb1kenobi Oct 20, 2025
de9f5c5
Reorg import
cb1kenobi Oct 20, 2025
f42ff16
Merge branch 'main' into core-2698-transaction-log-store
cb1kenobi Oct 20, 2025
cdef72a
Rename fixtures to workers
cb1kenobi Oct 20, 2025
b0658d7
Fixed lots of race conditions, add docs
cb1kenobi Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ Creates a new database instance.
- `path: string` The path to write the database files to. This path does not
need to exist, but the parent directories do.
- `options: object` [optional]
- `name:string` The column family name. Defaults to `"default"`.
- `disableWAL: boolean` Whether to disable the RocksDB write ahead log.
- `name: string` The column family name. Defaults to `"default"`.
- `noBlockCache: boolean` When `true`, disables the block cache. Block
caching is enabled by default and the cache is shared across all database
instances.
Expand All @@ -52,6 +53,12 @@ Creates a new database instance.
- `store: Store` A custom store that handles all interaction between the
`RocksDatabase` or `Transaction` instances and the native database
interface. See [Custom Store](#custom-store) for more information.
- `transactionLogMaxSize: number` The maximum size of a transaction log before
it is rotated to the next sequence number. Defaults to 16 MB.
- `transactionLogRetention: string | number` The number of minutes to retain
transaction logs before purging. Defaults to `'3d'` (3 days).
- `transactionLogsPath: string` The path to store transaction logs. Defaults
to `"${db.path}/transaction_logs"`.

### `db.close()`

Expand Down Expand Up @@ -597,6 +604,100 @@ Note: If the `callback` throws an error, Node.js suppress the error. Node.js
will cause errors to emit the `'uncaughtException'` event. Future Node.js
releases will enable this flag by default.

## Transaction Log

A user controlled API for logging transactions. This API is designed to be
generic so that you can log gets, puts, and deletes, but also arbitrary entries.

### `db.listLogs(): string[]`

Returns an array of log store names.

```typescript
const names = db.listLogs();
```

### `db.purgeLogs(options?): string[]`

Deletes transaction log files older than the `transactionLogRetention` (defaults
to 3 days).

- `options: object`
- `destroy?: boolean` When `true`, deletes transaction log stores including
all log sequence files on disk.
- `name?: string` The name of a store to limit the purging to.

Returns an array with the full path of each log file deleted.

```typescript
const removed = db.purgeLogs();
console.log(`Removed ${removed.length} log files`);
```

### `db.useLog(name): TransactionLog`

Gets or creates a `TransactionLog` instance. Internally, the `TransactionLog`
interfaces with a shared transaction log store that is used by all threads.
Multiple worker threads can use the same log at the same time.

- `name: string | number` The name of the log. Numeric log names are converted
to a string.

```typescript
const log1 = db.useLog('foo');
const log2 = db.useLog('foo'); // gets the exist instance (e.g. log1 === log2)
const log3 = db.useLog(123);
```

### Class: `TransactionLog`

The transaction callback is passed in a `Transaction` instance which contains
all of the same data operations methods as the `RocksDatabase` instance plus:

- `log.addEntry()`
- `log.commit()`
- `log.query()`

#### `log.addEntry(timestamp, data, options?): void`

Adds an entry to the log.

- `timestamp: number` A numeric timestamp in the form as the number of
milliseconds elapsed since the epoch.
- `data: Buffer | UInt8Array` The entry data to store. There is no inherent
limit beyond what Node.js can handle.
- `options?: LogEntryOptions` An optional object containing log settings.
- `transaction?: Transaction` A related transaction used to group entries
together.

```typescript
const log = db.useLog('foo');
log.addEntry(Date.now(), Buffer.from('hello'));
```

#### `log.commit()`

Writes the queued entries to disk.

```typescript
const log = db.useLog('foo');
log.addEntry(Date.now(), Buffer.from('hello'), { transaction });
log.addEntry(Date.now(), Buffer.from('world'), { transaction });
log.commit();
```

#### `log.query()`

Returns an iterator that retreives all entries for the given filter.

```typescript
const log = db.useLog('foo');
const iter = log.query();
for (const entry of iter) {
console.log(entry);
}
```

## Custom Store

The store is a class that sits between the `RocksDatabase` or `Transaction`
Expand Down
4 changes: 4 additions & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
'src/binding/db_settings.cpp',
'src/binding/transaction_handle.cpp',
'src/binding/transaction.cpp',
'src/binding/transaction_log.cpp',
'src/binding/transaction_log_file.cpp',
'src/binding/transaction_log_handle.cpp',
'src/binding/transaction_log_store.cpp',
'src/binding/util.cpp',
],
'cflags!': [ '-fno-exceptions', '-std=c++17' ],
Expand Down
14 changes: 9 additions & 5 deletions src/binding/binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "macros.h"
#include "rocksdb/db.h"
#include "transaction.h"
#include "transaction_log.h"
#include "util.h"
#include <atomic>

Expand All @@ -18,7 +19,7 @@ namespace rocksdb_js {
* (main thread + worker threads) and we only want to cleanup after the last
* instance exits.
*/
static std::atomic<int> moduleRefCount{0};
static std::atomic<uint32_t> moduleRefCount{0};

NAPI_MODULE_INIT() {
#ifdef DEBUG
Expand All @@ -30,23 +31,23 @@ NAPI_MODULE_INIT() {
napi_create_string_utf8(env, rocksdb::GetRocksVersionAsString().c_str(), NAPI_AUTO_LENGTH, &version);
napi_set_named_property(env, exports, "version", version);

[[maybe_unused]] int refCount = ++moduleRefCount;
DEBUG_LOG("Binding::Init Module ref count: %d\n", refCount);
[[maybe_unused]] uint32_t refCount = ++moduleRefCount;
DEBUG_LOG("Binding::Init Module ref count: %u\n", refCount);

// initialize the registry
DBRegistry::Init();

// registry cleanup
NAPI_STATUS_THROWS(::napi_add_env_cleanup_hook(env, [](void* data) {
int newRefCount = --moduleRefCount;
uint32_t newRefCount = --moduleRefCount;
if (newRefCount == 0) {
DEBUG_LOG("Binding::Init Cleaning up last instance, purging all databases\n")
rocksdb_js::DBRegistry::PurgeAll();
DEBUG_LOG("Binding::Init env cleanup done\n")
} else if (newRefCount < 0) {
DEBUG_LOG("Binding::Init WARNING: Module ref count went negative!\n")
} else {
DEBUG_LOG("Binding::Init Skipping cleanup, %d remaining instances\n", newRefCount)
DEBUG_LOG("Binding::Init Skipping cleanup, %u remaining instances\n", newRefCount)
}
}, nullptr));

Expand All @@ -56,6 +57,9 @@ NAPI_MODULE_INIT() {
// transaction
rocksdb_js::Transaction::Init(env, exports);

// transaction log
rocksdb_js::TransactionLog::Init(env, exports);

// db iterator
rocksdb_js::DBIterator::Init(env, exports);

Expand Down
93 changes: 77 additions & 16 deletions src/binding/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ namespace rocksdb_js {
* ```
*/
napi_value Database::Constructor(napi_env env, napi_callback_info info) {
NAPI_CONSTRUCTOR("Database")
NAPI_CONSTRUCTOR_WITH_DATA("Database")

// create shared_ptr on heap so it persists after function returns
auto* dbHandle = new std::shared_ptr<DBHandle>(std::make_shared<DBHandle>());
napi_ref exportsRef = reinterpret_cast<napi_ref>(data);
auto* dbHandle = new std::shared_ptr<DBHandle>(std::make_shared<DBHandle>(env, exportsRef));

DEBUG_LOG("Database::Constructor Creating NativeDatabase DBHandle=%p\n", dbHandle->get())

Expand Down Expand Up @@ -488,7 +489,7 @@ napi_value Database::GetUserSharedBuffer(napi_env env, napi_callback_info info)
NAPI_STATUS_THROWS(::napi_typeof(env, argv[2], &type))
if (type != napi_undefined) {
if (type == napi_function) {
DEBUG_LOG("Database::GetUserSharedBuffer key start=%d end=%d:\n", keyStart, keyEnd)
DEBUG_LOG("Database::GetUserSharedBuffer key start=%u end=%u:\n", keyStart, keyEnd)
DEBUG_LOG_KEY_LN(keyStr)
callbackRef = (*dbHandle)->descriptor->addListener(env, keyStr, argv[2], *dbHandle);
} else {
Expand Down Expand Up @@ -538,6 +539,15 @@ napi_value Database::IsOpen(napi_env env, napi_callback_info info) {
return result;
}

/**
* Lists all transaction logs in the database.
*/
napi_value Database::ListLogs(napi_env env, napi_callback_info info) {
NAPI_METHOD()
UNWRAP_DB_HANDLE_AND_OPEN()
return (*dbHandle)->descriptor->listTransactionLogStores(env);
}

/**
* Opens the RocksDB database. This must be called before any data methods are called.
*/
Expand All @@ -553,24 +563,45 @@ napi_value Database::Open(napi_env env, napi_callback_info info) {
NAPI_GET_STRING(argv[0], path, "Database path is required")
const napi_value options = argv[1];

bool disableWAL = false;
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "disableWAL", disableWAL));

std::string name;
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "name", name));

bool noBlockCache = false;
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "noBlockCache", noBlockCache));

int parallelismThreads = std::max<int>(1, std::thread::hardware_concurrency() / 2);
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "parallelismThreads", parallelismThreads));

std::string modeName;
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "mode", modeName));

uint32_t parallelismThreads = std::max<uint32_t>(1, std::thread::hardware_concurrency() / 2);
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "parallelismThreads", parallelismThreads));

uint32_t transactionLogRetentionMs = 3 * 24 * 60 * 60 * 1000; // 3 days
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "transactionLogRetentionMs", transactionLogRetentionMs));

std::string transactionLogsPath;
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "transactionLogsPath", transactionLogsPath));

uint32_t transactionLogMaxSize = 16 * 1024 * 1024; // 16MB
NAPI_STATUS_THROWS(rocksdb_js::getProperty(env, options, "transactionLogMaxSize", transactionLogMaxSize));

DBMode mode = DBMode::Optimistic;
if (modeName == "pessimistic") {
mode = DBMode::Pessimistic;
}

DBOptions dbHandleOptions { mode, name, noBlockCache, parallelismThreads };
DBOptions dbHandleOptions {
disableWAL,
mode,
name,
noBlockCache,
parallelismThreads,
transactionLogMaxSize,
transactionLogRetentionMs,
transactionLogsPath
};

try {
(*dbHandle)->open(path, dbHandleOptions);
Expand All @@ -583,6 +614,16 @@ napi_value Database::Open(napi_env env, napi_callback_info info) {
NAPI_RETURN_UNDEFINED()
}

/**
* Purges transaction logs.
*/
napi_value Database::PurgeLogs(napi_env env, napi_callback_info info) {
NAPI_METHOD_ARGV(1)
UNWRAP_DB_HANDLE_AND_OPEN()

return (*dbHandle)->descriptor->purgeTransactionLogs(env, argv[0]);
}

/**
* Puts a key-value pair into the RocksDB database.
*/
Expand Down Expand Up @@ -622,8 +663,10 @@ napi_value Database::PutSync(napi_env env, napi_callback_info info) {
*dbHandle
);
} else {
rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = (*dbHandle)->disableWAL;
status = (*dbHandle)->descriptor->db->Put(
rocksdb::WriteOptions(),
writeOptions,
(*dbHandle)->column.get(),
keySlice,
valueSlice
Expand Down Expand Up @@ -666,8 +709,10 @@ napi_value Database::RemoveSync(napi_env env, napi_callback_info info) {
}
status = txnHandle->removeSync(keySlice, *dbHandle);
} else {
rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = (*dbHandle)->disableWAL;
status = (*dbHandle)->descriptor->db->Delete(
rocksdb::WriteOptions(),
writeOptions,
(*dbHandle)->column.get(),
keySlice
);
Expand Down Expand Up @@ -752,6 +797,16 @@ napi_value Database::Unlock(napi_env env, napi_callback_info info) {
return result;
}

/**
* Get or create a transaction log.
*/
napi_value Database::UseLog(napi_env env, napi_callback_info info) {
NAPI_METHOD_ARGV(1)
NAPI_GET_STRING(argv[0], name, "Name is required")
UNWRAP_DB_HANDLE_AND_OPEN()
return (*dbHandle)->useLog(env, jsThis, name);
}

/**
* Mutually exclusive execution of a function across threads for a given key.
*/
Expand Down Expand Up @@ -804,31 +859,37 @@ void Database::Init(napi_env env, napi_value exports) {
{ "getSync", nullptr, GetSync, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "getUserSharedBuffer", nullptr, GetUserSharedBuffer, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "hasLock", nullptr, HasLock, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "listeners", nullptr, Listeners, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "listeners", nullptr, Listeners, nullptr, nullptr, nullptr, napi_default, nullptr },\
{ "listLogs", nullptr, ListLogs, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "notify", nullptr, Notify, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "open", nullptr, Open, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "opened", nullptr, nullptr, IsOpen, nullptr, nullptr, napi_default, nullptr },
{ "purgeLogs", nullptr, PurgeLogs, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "putSync", nullptr, PutSync, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "removeListener", nullptr, RemoveListener, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "removeSync", nullptr, RemoveSync, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "tryLock", nullptr, TryLock, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "unlock", nullptr, Unlock, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "useLog", nullptr, UseLog, nullptr, nullptr, nullptr, napi_default, nullptr },
{ "withLock", nullptr, WithLock, nullptr, nullptr, nullptr, napi_default, nullptr }
};

auto className = "Database";
constexpr size_t len = sizeof("Database") - 1;

napi_ref exportsRef;
NAPI_STATUS_THROWS_VOID(::napi_create_reference(env, exports, 1, &exportsRef))

napi_value ctor;
NAPI_STATUS_THROWS_VOID(::napi_define_class(
env,
className, // className
len, // length of class name
Constructor, // constructor
nullptr, // constructor arg
className, // className
len, // length of class name
Database::Constructor, // constructor
reinterpret_cast<void*>(exportsRef), // constructor arg
sizeof(properties) / sizeof(napi_property_descriptor), // number of properties
properties, // properties array
&ctor // [out] constructor
properties, // properties array
&ctor // [out] constructor
))

NAPI_STATUS_THROWS_VOID(::napi_set_named_property(env, exports, className, ctor))
Expand Down
3 changes: 3 additions & 0 deletions src/binding/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ struct Database final {
static napi_value HasLock(napi_env env, napi_callback_info info);
static napi_value IsOpen(napi_env env, napi_callback_info info);
static napi_value Listeners(napi_env env, napi_callback_info info);
static napi_value ListLogs(napi_env env, napi_callback_info info);
static napi_value Notify(napi_env env, napi_callback_info info);
static napi_value Open(napi_env env, napi_callback_info info);
static napi_value PurgeLogs(napi_env env, napi_callback_info info);
static napi_value PutSync(napi_env env, napi_callback_info info);
static napi_value RemoveListener(napi_env env, napi_callback_info info);
static napi_value RemoveSync(napi_env env, napi_callback_info info);
static napi_value TryLock(napi_env env, napi_callback_info info);
static napi_value Unlock(napi_env env, napi_callback_info info);
static napi_value UseLog(napi_env env, napi_callback_info info);
static napi_value WithLock(napi_env env, napi_callback_info info);

static void Init(napi_env env, napi_value exports);
Expand Down
Loading
Loading