From 1987f89ce2534a87758f02e892558f41aa481033 Mon Sep 17 00:00:00 2001 From: okdistribute <633012+okdistribute@users.noreply.github.com> Date: Mon, 3 Aug 2020 21:01:35 -0700 Subject: [PATCH 1/3] test: add a memory leak test --- test/memory-leak.js | 62 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 test/memory-leak.js diff --git a/test/memory-leak.js b/test/memory-leak.js new file mode 100644 index 0000000..0cc216a --- /dev/null +++ b/test/memory-leak.js @@ -0,0 +1,62 @@ +var test = require('tape') +var multifeed = require('..') +var ram = require('random-access-memory') +var parallel = require('run-parallel') + +test('so many feeds', function (t) { + function createMultifeed (cb) { + var multi = multifeed(ram, { valueEncoding: 'json' }) + multi.writer(function (err, feed) { + if (err) throw err + multi.ready(function () { + if ((Math.random() * 100) < 30) feed.append({'foo': 'bar'}, done) + else done() + }) + }) + + function done (err) { + cb(err, multi) + } + } + + var total = 500 + var tasks = [] + + for (var i = 0; i < total; i++) { + tasks.push((done) => { + createMultifeed((err, m) => { + if (err) return done(err) + done(null, m) + }) + }) + } + + parallel(tasks, (err, feeds) => { + if (err) throw err + console.log(feeds.length, 'feeds') + console.log('ready!') + var replications = [] + + feeds.sort((a, b) => { + if (a && b) { + replications.push((done) => replicate(a, b, done)) + } + return -1 + }) + + parallel(replications, (err) => { + if (err) throw err + console.log('replicated! everything!') + t.end() + }) + }) + + function replicate (m1, m2, cb) { + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', cb) + .once('remote-feeds', (feeds) => { + console.log('got remote feeds') + }) + } +}) From 947e191e8d26cf1c98b4a8342704bd83d419152b Mon Sep 17 00:00:00 2001 From: okdistribute <633012+okdistribute@users.noreply.github.com> Date: Mon, 3 Aug 2020 21:04:22 -0700 Subject: [PATCH 2/3] fix: clean up listeners for 'discovery-key' and 'error' --- index.js | 15 +++++++++------ mux.js | 23 +++++++++++++---------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/index.js b/index.js index 5d2236a..265066c 100644 --- a/index.js +++ b/index.js @@ -37,6 +37,10 @@ function Multifeed (storage, opts) { this._close = readyify(_close.bind(this), true) this.closed = false + this._onFeedError = function (err) { + this.emit('error', err) + }.bind(this) + // random-access-storage wrapper that wraps all hypercores in a directory // structures. (dir/0, dir/1, ...) this._storage = function (dir) { @@ -61,9 +65,8 @@ function Multifeed (storage, opts) { var feed = hypercore(ram, encryptionKey) - feed.on('error', function (err) { - self.emit('error', err) - }) + feed.on('error', self._onFeedError) + feed.on('close', () => feed.removeListener('error', self._onFeedError)) feed.ready(function () { self._root = feed @@ -218,9 +221,9 @@ Multifeed.prototype.writer = function (name, opts, cb) { var feed = keypair ? self._hypercore(storage, keypair.publicKey, Object.assign({}, self._opts, { secretKey: keypair.secretKey })) : self._hypercore(storage, self._opts) - feed.on('error', function (err) { - self.emit('error', err) - }) + + feed.on('error', self._onFeedError) + feed.on('close', () => feed.removeListener('error', self._onFeedError)) feed.ready(function () { self._addFeed(feed, String(idx)) diff --git a/mux.js b/mux.js index 7bc2cf5..d656311 100644 --- a/mux.js +++ b/mux.js @@ -35,22 +35,24 @@ function Multiplexer (isInitiator, key, opts) { self._activeFeedStreams = {} var onFirstKey = true + + self._ondiscoverykey = function (key) { + if (onFirstKey) { + onFirstKey = false + if (!this.stream.remoteVerified(key)) { + this._finalize(new Error('Exchange key did not match remote')) + } + } + }.bind(self) + if (Protocol.isProtocolStream(isInitiator)) { var stream = this.stream = isInitiator - stream.on('discovery-key', ondiscoverykey) + stream.on('discovery-key', self._ondiscoverykey) } else { var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, { - ondiscoverykey + ondiscoverykey: self._ondiscoverykey })) } - function ondiscoverykey (key) { - if (onFirstKey) { - onFirstKey = false - if (!self.stream.remoteVerified(key)) { - self._finalize(new Error('Exchange key did not match remote')) - } - } - } this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, { onmessage: onHandshake, @@ -175,6 +177,7 @@ Multiplexer.prototype._finalize = function (err) { debug(this._id + ' [REPLICATION] finalized', err) this.stream.finalize() } + this.stream.removeListener('discovery-key', this._ondiscoverykey) } // Calls to this method results in the creation of a 'manifest' From f3780de5433e1dddcafcb05e00840c220d7019c0 Mon Sep 17 00:00:00 2001 From: Kira Oakley Date: Sat, 22 Aug 2020 16:29:10 -0700 Subject: [PATCH 3/3] print memory use; try to not overwhelm old computers --- package.json | 1 + test/memory-leak.js | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/package.json b/package.json index c054c89..e7599bc 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ }, "keywords": [], "dependencies": { + "async-series": "0.0.1", "debug": "^4.1.0", "hypercore": "^8.3.0", "hypercore-protocol": "^7.7.1", diff --git a/test/memory-leak.js b/test/memory-leak.js index 0cc216a..a22f7f0 100644 --- a/test/memory-leak.js +++ b/test/memory-leak.js @@ -1,7 +1,7 @@ var test = require('tape') var multifeed = require('..') var ram = require('random-access-memory') -var parallel = require('run-parallel') +var series = require('async-series') test('so many feeds', function (t) { function createMultifeed (cb) { @@ -19,34 +19,49 @@ test('so many feeds', function (t) { } } - var total = 500 + var total = 20 var tasks = [] + var feeds = [] for (var i = 0; i < total; i++) { - tasks.push((done) => { - createMultifeed((err, m) => { - if (err) return done(err) - done(null, m) + ;(function (n) { + tasks.push((done) => { + // console.log('making', n) + createMultifeed((err, m) => { + if (err) return done(err) + // console.log('made', n) + feeds.push(m) + m.n = n + done(null, m) + }) }) - }) + })(i) } - parallel(tasks, (err, feeds) => { + series(tasks, (err) => { if (err) throw err - console.log(feeds.length, 'feeds') + console.log('created', feeds.length, 'feeds') console.log('ready!') var replications = [] feeds.sort((a, b) => { if (a && b) { - replications.push((done) => replicate(a, b, done)) + replications.push((done) => { + // console.log('syncing', a.n, 'and', b.n) + replicate(a, b, err => { + // console.log('SYNCED', a.n, 'and', b.n) + done(err) + }) + }) } return -1 }) - parallel(replications, (err) => { + console.log('pre-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb') + series(replications, (err) => { if (err) throw err console.log('replicated! everything!') + console.log('post-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb') t.end() }) }) @@ -55,8 +70,5 @@ test('so many feeds', function (t) { var r = m1.replicate(true) r.pipe(m2.replicate(false)).pipe(r) .once('end', cb) - .once('remote-feeds', (feeds) => { - console.log('got remote feeds') - }) } })