diff --git a/guest.js b/guest.js index d3abf63..11bac0f 100644 --- a/guest.js +++ b/guest.js @@ -4,12 +4,14 @@ const { AbstractLevel, AbstractIterator } = require('abstract-level') const lpstream = require('@vweevers/length-prefixed-stream') const ModuleError = require('module-error') const { input, output } = require('./tags') -const { Duplex, pipeline, finished } = require('readable-stream') +const { promises: readablePromises, Duplex } = require('readable-stream') +const { pipeline, finished } = readablePromises const kExplicitClose = Symbol('explicitClose') const kAbortRequests = Symbol('abortRequests') const kEnded = Symbol('kEnded') const kRemote = Symbol('remote') +const kCleanup = Symbol('cleanup') const kAckMessage = Symbol('ackMessage') const kEncode = Symbol('encode') const kRef = Symbol('ref') @@ -45,6 +47,7 @@ class ManyLevelGuest extends AbstractLevel { this[kRetry] = !!retry this[kEncode] = lpstream.encode() this[kRemote] = _remote || null + this[kCleanup] = null this[kRpcStream] = null this[kRef] = null this[kDb] = null @@ -108,12 +111,15 @@ class ManyLevelGuest extends AbstractLevel { }) const proxy = Duplex.from({ writable: decode, readable: encode }) - finished(proxy, cleanup) - this[kRpcStream] = proxy - return proxy - - function cleanup () { + self[kCleanup] = (async () => { + await finished(proxy).catch(err => { + // Abort error is expected on close, which is what triggers finished + if (err.code === 'ABORT_ERR') { + // TODO: abort in-flight ops + } + }) self[kRpcStream] = null + // Create a dummy stream to flush pending requests to self[kEncode] = lpstream.encode() if (!self[kRetry]) { @@ -123,19 +129,21 @@ class ManyLevelGuest extends AbstractLevel { } for (const req of self[kRequests].values()) { - self[kWrite](req) + await self[kWrite](req) } for (const req of self[kIterators].values()) { - self[kWrite](req) + await self[kWrite](req) } - } + })() + self[kRpcStream] = proxy + return proxy function oniteratordata (res) { const req = self[kIterators].get(res.id) if (!req || req.iterator[kSeq] !== res.seq) return req.iterator[kPending].push(res) - if (req.iterator[kCallback]) req.iterator._next(req.iterator[kCallback]) + if (req.iterator[kCallback]) req.iterator[kCallback](null, res) } function oniteratorend (res) { @@ -143,19 +151,19 @@ class ManyLevelGuest extends AbstractLevel { if (!req || req.iterator[kSeq] !== res.seq) return // https://github.com/Level/abstract-level/issues/19 req.iterator[kEnded] = true - if (req.iterator[kCallback]) req.iterator._next(req.iterator[kCallback]) + if (req.iterator[kCallback]) req.iterator[kCallback](null, res) } function oncallback (res) { const req = self[kRequests].remove(res.id) - if (!req) return + if (!req || !req.callback) return if (res.error) req.callback(new ModuleError('Could not get value', { code: res.error })) else req.callback(null, normalizeValue(res.value)) } function ongetmanycallback (res) { const req = self[kRequests].remove(res.id) - if (!req) return + if (!req || !req.callback) return if (res.error) req.callback(new ModuleError('Could not get values', { code: res.error })) else req.callback(null, res.values.map(v => normalizeValue(v.value))) } @@ -191,11 +199,13 @@ class ManyLevelGuest extends AbstractLevel { [kAbortRequests] (msg, code) { for (const req of this[kRequests].clear()) { + // TODO: this doesn't actually abort the request, but neither did the old way req.callback(new ModuleError(msg, { code })) } for (const req of this[kIterators].clear()) { // Cancel in-flight operation if any + // TODO: does this need to be refactored to use AbortError to pass back up to the request initiator? const callback = req.iterator[kCallback] req.iterator[kCallback] = null @@ -208,138 +218,175 @@ class ManyLevelGuest extends AbstractLevel { } } - _get (key, opts, cb) { + async _get (key, opts) { // TODO: this and other methods assume db state matches our state - if (this[kDb]) return this[kDb]._get(key, opts, cb) - - const req = { - tag: input.get, - id: 0, - key: key, - callback: cb - } + if (this[kDb]) return this[kDb]._get(key, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.get, + id: 0, + key: key, + // This will resolve or reject based on the Host's response + callback: (err, value) => { + if (err) reject(err) + else resolve(value) + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - _getMany (keys, opts, cb) { - if (this[kDb]) return this[kDb]._getMany(keys, opts, cb) - - const req = { - tag: input.getMany, - id: 0, - keys: keys, - callback: cb - } + async _getMany (keys, opts) { + if (this[kDb]) return this[kDb]._getMany(keys, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.getMany, + id: 0, + keys: keys, + // This will resolve or reject based on the Host's response + callback: (err, values) => { + if (err) reject(err) + else resolve(values) + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - _put (key, value, opts, cb) { - if (this[kDb]) return this[kDb]._put(key, value, opts, cb) - - const req = { - tag: input.put, - id: 0, - key: key, - value: value, - callback: cb - } + async _put (key, value, opts) { + if (this[kDb]) return this[kDb]._put(key, value, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.put, + id: 0, + key: key, + value: value, + // This will resolve or reject based on the Host's response + callback: (err) => { + if (err) reject(err) + else resolve() + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - _del (key, opts, cb) { - if (this[kDb]) return this[kDb]._del(key, opts, cb) - - const req = { - tag: input.del, - id: 0, - key: key, - callback: cb - } + async _del (key, opts) { + if (this[kDb]) return this[kDb]._del(key, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.del, + id: 0, + key: key, + // This will resolve or reject based on the Host's response + callback: (err) => { + if (err) reject(err) + else resolve() + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - _batch (batch, opts, cb) { - if (this[kDb]) return this[kDb]._batch(batch, opts, cb) - - const req = { - tag: input.batch, - id: 0, - ops: batch, - callback: cb - } + async _batch (batch, opts) { + if (this[kDb]) return this[kDb]._batch(batch, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.batch, + id: 0, + ops: batch, + // This will resolve or reject based on the Host's response + callback: (err) => { + if (err) reject(err) + else resolve() + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - _clear (opts, cb) { - if (this[kDb]) return this[kDb]._clear(opts, cb) - - const req = { - tag: input.clear, - id: 0, - options: opts, - callback: cb - } + async _clear (opts) { + if (this[kDb]) return this[kDb]._clear(opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.clear, + id: 0, + options: opts, + // This will resolve or reject based on the Host's response + callback: (err) => { + if (err) reject(err) + else resolve() + } + } - req.id = this[kRequests].add(req) - this[kWrite](req) + req.id = this[kRequests].add(req) + this[kWrite](req) + }) } - [kWrite] (req) { + async [kWrite] (req) { if (this[kRequests].size + this[kIterators].size === 1) ref(this[kRef]) const enc = input.encoding(req.tag) const buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1) buf[0] = req.tag enc.encode(req, buf, 1) - this[kEncode].write(buf) + return this[kEncode].write(buf) } - _close (cb) { + async _close () { // Even if forward() was used, still need to abort requests made before forward(). this[kExplicitClose] = true this[kAbortRequests]('Aborted on database close()', 'LEVEL_DATABASE_NOT_OPEN') if (this[kRpcStream]) { - finished(this[kRpcStream], () => { - this[kRpcStream] = null - this._close(cb) - }) - this[kRpcStream].destroy() - } else if (this[kDb]) { + const finishedPromise = finished(this[kRpcStream]).catch(() => null) + this[kRpcStream].destroy().catch(() => null) + await finishedPromise + if (this[kCleanup]) await this[kCleanup] + this[kRpcStream] = null + this[kCleanup] = null + } + if (this[kDb]) { // To be safe, use close() not _close(). - this[kDb].close(cb) - } else { - this.nextTick(cb) + return this[kDb].close() } } - _open (options, cb) { + async _open (options) { if (this[kRemote]) { // For tests only so does not need error handling this[kExplicitClose] = false const remote = this[kRemote]() pipeline( remote, - this.connect(), - remote, - () => {} - ) + this.createRpcStream(), + remote + ).catch(err => { + if (err.code === 'ABORT_ERR') { + return this.close() + } + }) } else if (this[kExplicitClose]) { throw new ModuleError('Cannot reopen many-level database after close()', { code: 'LEVEL_NOT_SUPPORTED' }) } - - this.nextTick(cb) } iterator (options) { @@ -352,13 +399,13 @@ class ManyLevelGuest extends AbstractLevel { } _iterator (options) { - return new Iterator(this, options) + return new ManyLevelGuestIterator(this, options) } } exports.ManyLevelGuest = ManyLevelGuest -class Iterator extends AbstractIterator { +class ManyLevelGuestIterator extends AbstractIterator { constructor (db, options) { // Need keys to know where to restart if (db[kRetry]) options.keys = true @@ -419,59 +466,68 @@ class Iterator extends AbstractIterator { } // TODO: implement optimized `nextv()` - _next (callback) { - this[kCallback] = null - + async _next () { if (this[kRequest].consumed >= this.limit || this[kErrored]) { - this.nextTick(callback) - } else if (this[kPending].length !== 0) { - const next = this[kPending][0] - const req = this[kRequest] - - // TODO: document that error ends the iterator - if (next.error) { - this[kErrored] = true - this[kPending] = [] - - return this.nextTick(callback, new ModuleError('Could not read entry', { - code: next.error - })) - } + return + } + // If nothing is pending, wait for the host to send more data + // except if this[kEnded] is true and nothing is pending, then + // don't wait! Return undefined. + if (this[kEnded] && !this[kPending].length) { + return undefined + } + // oniteratordata (in ManyLevelGuest) will use the callback to resolve + // this promise to the data received from the host. + if (!this[kPending].length) { + await new Promise((resolve, reject) => { + this[kCallback] = (err, data) => { + if (err) reject(err) + else resolve(data) + } + }) + } + const next = this[kPending][0] + const req = this[kRequest] + + // If the host iterator has ended and we have no pending data, we are done. + if (!next && this[kEnded]) return + if (next.error) { + this[kErrored] = true + this[kEnded] = true + this[kPending] = [] + + throw new ModuleError('Could not read entry', { + code: next.error + }) + } - const consumed = ++req.consumed - const key = req.options.keys ? next.data.shift() : undefined - const val = req.options.values ? next.data.shift() : undefined + const consumed = ++req.consumed + const key = req.options.keys ? next.data.shift() : undefined + const val = req.options.values ? next.data.shift() : undefined - if (next.data.length === 0) { - this[kPending].shift() + if (next.data.length === 0) { + this[kPending].shift() - // Acknowledge receipt. Not needed if we don't want more data. - if (consumed < this.limit) { - this[kAckMessage].consumed = consumed - this.db[kWrite](this[kAckMessage]) - } + // Acknowledge receipt. Not needed if we don't want more data. + if (consumed < this.limit) { + this[kAckMessage].consumed = consumed + await this.db[kWrite](this[kAckMessage]) } + } - // Once we've consumed the result of a seek() it must not get retried - req.seek = null - - if (this.db[kRetry]) { - req.bookmark = key - } + // Once we've consumed the result of a seek() it must not get retried + req.seek = null - this.nextTick(callback, undefined, key, val) - } else if (this[kEnded]) { - this.nextTick(callback) - } else { - this[kCallback] = callback + if (this.db[kRetry]) { + req.bookmark = key } + return [key, val] } - _close (cb) { - this.db[kWrite]({ tag: input.iteratorClose, id: this[kRequest].id }) + async _close () { + await this.db[kWrite]({ tag: input.iteratorClose, id: this[kRequest].id }) this.db[kIterators].remove(this[kRequest].id) this.db[kFlushed]() - this.nextTick(cb) } } diff --git a/host.js b/host.js index cb0bd78..74f10fd 100644 --- a/host.js +++ b/host.js @@ -2,7 +2,8 @@ const lpstream = require('@vweevers/length-prefixed-stream') const ModuleError = require('module-error') -const { Duplex, finished } = require('readable-stream') +const { Duplex, promises: readablePromises } = require('readable-stream') +const { finished } = readablePromises const { input, output } = require('./tags') const rangeOptions = new Set(['gt', 'gte', 'lt', 'lte']) @@ -23,7 +24,6 @@ const kBusy = Symbol('busy') const kPendingSeek = Symbol('pendingSeek') const kLimit = Symbol('limit') const kReadAhead = Symbol('readAhead') -const noop = () => {} const limbo = Symbol('limbo') // TODO: make use of db.supports manifest @@ -65,7 +65,7 @@ function createRpcStream (db, options, streamOptions) { const predel = options.predel const prebatch = options.prebatch - db.open({ passive: true }, ready) + db.open({ passive: true }).then(() => ready()).catch(ready) // TODO: send events to guest. Challenges: // - Need to know encodings or emit encoded data; current abstract-level events don't suffice @@ -84,13 +84,16 @@ function createRpcStream (db, options, streamOptions) { const iterators = new Map() - finished(stream, function () { + const cleanup = async () => { + await finished(stream).catch(() => null) for (const iterator of iterators.values()) { iterator.close() } iterators.clear() - }) + } + // Don't await + cleanup() decode.on('data', function (data) { if (!data.length) return @@ -131,54 +134,61 @@ function createRpcStream (db, options, streamOptions) { encode.write(encodeMessage(msg, output.getManyCallback)) } - function onput (req) { - preput(req.key, req.value, function (err) { - if (err) return callback(req.id, err) - db.put(req.key, req.value, encodingOptions, function (err) { - callback(req.id, err, null) - }) - }) + async function onput (req) { + preput(req.key, req.value, function (err) { return callback(req.id, err) }) + try { + await db.put(req.key, req.value, encodingOptions) + } catch (err) { + return callback(req.id, err, null) + } } - function onget (req) { - db.get(req.key, encodingOptions, function (err, value) { - callback(req.id, err, value) - }) + async function onget (req) { + try { + const value = await db.get(req.key, encodingOptions) + return callback(req.id, null, value) + } catch (err) { + return callback(req.id, err, null) + } } - function ongetmany (req) { - db.getMany(req.keys, encodingOptions, function (err, values) { - getManyCallback(req.id, err, values.map(value => ({ value }))) - }) + async function ongetmany (req) { + try { + const values = await db.getMany(req.keys, encodingOptions) + return getManyCallback(req.id, null, values.map(value => ({ value }))) + } catch (err) { + return getManyCallback(req.id, err, []) + } } - function ondel (req) { - predel(req.key, function (err) { - if (err) return callback(req.id, err) - db.del(req.key, encodingOptions, function (err) { - callback(req.id, err) - }) - }) + async function ondel (req) { + predel(req.key, function (err) { return callback(req.id, err) }) + try { + await db.del(req.key, encodingOptions) + return callback(req.id, null) + } catch (err) { + return callback(req.id, err) + } } function onreadonly (req) { callback(req.id, new ModuleError('Database is readonly', { code: 'LEVEL_READONLY' })) } - function onbatch (req) { - prebatch(req.ops, function (err) { - if (err) return callback(req.id, err) - - db.batch(req.ops, encodingOptions, function (err) { - callback(req.id, err) - }) - }) + async function onbatch (req) { + prebatch(req.key, function (err) { return callback(req.id, err) }) + try { + await db.batch(req.ops, encodingOptions) + return callback(req.id, null) + } catch (err) { + return callback(req.id, err) + } } function oniterator ({ id, seq, options, consumed, bookmark, seek }) { if (iterators.has(id)) return - const it = new Iterator(db, id, seq, options, consumed, encode) + const it = new ManyLevelHostIterator(db, id, seq, options, consumed, encode) iterators.set(id, it) if (seek) { @@ -211,15 +221,18 @@ function createRpcStream (db, options, streamOptions) { if (it !== undefined) it.close() } - function onclear (req) { - db.clear(cleanRangeOptions(req.options), function (err) { - callback(req.id, err) - }) + async function onclear (req) { + try { + await db.clear(cleanRangeOptions(req.options)) + return callback(req.id, null) + } catch (err) { + return callback(req.id, err) + } } } } -class Iterator { +class ManyLevelHostIterator { constructor (db, id, seq, options, consumed, encode) { options = cleanRangeOptions(options) @@ -245,7 +258,7 @@ class Iterator { this.pendingAcks = 0 } - next (first) { + async next (first) { if (this[kBusy] || this[kClosed]) return if (this[kEnded] || this.pendingAcks > 1) return @@ -269,7 +282,12 @@ class Iterator { if (size <= 0) { process.nextTick(this[kHandleMany], null, []) } else { - this[kIterator].nextv(size, this[kHandleMany]) + try { + const nextVal = await this[kIterator].nextv(size) + this[kHandleMany](null, nextVal) + } catch (err) { + this[kHandleMany](err, []) + } } } @@ -341,10 +359,10 @@ class Iterator { } } - close () { + async close () { if (this[kClosed]) return this[kClosed] = true - this[kIterator].close(noop) + await this[kIterator].close() } } diff --git a/index.d.ts b/index.d.ts index 203fc95..e821809 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,8 +1,7 @@ import { AbstractLevel, AbstractDatabaseOptions, - AbstractOpenOptions, - NodeCallback + AbstractOpenOptions } from 'abstract-level' // Requires `npm install @types/readable-stream`. @@ -25,8 +24,6 @@ export class ManyLevelGuest open (): Promise open (options: GuestOpenOptions): Promise - open (callback: NodeCallback): void - open (options: GuestOpenOptions, callback: NodeCallback): void /** * Create a duplex guest stream to be piped into a host stream. Until that's done, @@ -158,17 +155,17 @@ declare interface HostOptions { /** * A function to be called before `db.put()` operations. */ - preput?: (key: Buffer, value: Buffer, callback: NodeCallback) => void + preput?: (key: Buffer, value: Buffer) => Promise /** * A function to be called before `db.del()` operations. */ - predel?: (key: Buffer, callback: NodeCallback) => void + predel?: (key: Buffer) => Promise /** * A function to be called before `db.batch()` operations. */ - prebatch?: (operations: HostBatchOperation[], callback: NodeCallback) => void + prebatch?: (operations: HostBatchOperation[]) => Promise } declare type HostBatchOperation = HostBatchPutOperation | HostBatchDelOperation diff --git a/package.json b/package.json index 1366a37..f465642 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "types": "./index.d.ts", "scripts": { "test": "standard && ts-standard *.ts && hallmark && (nyc -s tape test/*.js | faucet) && nyc report", + "test:stacktrace": "nyc -s tape test/*.js && nyc report", "coverage": "nyc report -r lcovonly", "hallmark": "hallmark --fix", "protobuf": "protocol-buffers schema.proto -o messages.js", @@ -26,7 +27,7 @@ ], "dependencies": { "@vweevers/length-prefixed-stream": "^1.0.0", - "abstract-level": "^1.0.3", + "abstract-level": "^2.0.2", "module-error": "^1.0.2", "protocol-buffers-encodings": "^1.1.0", "readable-stream": "^4.0.0" @@ -39,7 +40,7 @@ "faucet": "^0.0.3", "hallmark": "^4.0.0", "level-read-stream": "^1.1.0", - "memory-level": "^1.0.0", + "memory-level": "^2.0.0", "nyc": "^15.1.0", "protocol-buffers": "^5.0.0", "standard": "^16.0.3", @@ -56,7 +57,7 @@ }, "homepage": "https://github.com/Level/many-level", "engines": { - "node": ">=12" + "node": ">=16" }, "standard": { "ignore": [