diff --git a/index.js b/index.js index 5d2236a..265066c 100644 --- a/index.js +++ b/index.js @@ -37,6 +37,10 @@ function Multifeed (storage, opts) { this._close = readyify(_close.bind(this), true) this.closed = false + this._onFeedError = function (err) { + this.emit('error', err) + }.bind(this) + // random-access-storage wrapper that wraps all hypercores in a directory // structures. (dir/0, dir/1, ...) this._storage = function (dir) { @@ -61,9 +65,8 @@ function Multifeed (storage, opts) { var feed = hypercore(ram, encryptionKey) - feed.on('error', function (err) { - self.emit('error', err) - }) + feed.on('error', self._onFeedError) + feed.on('close', () => feed.removeListener('error', self._onFeedError)) feed.ready(function () { self._root = feed @@ -218,9 +221,9 @@ Multifeed.prototype.writer = function (name, opts, cb) { var feed = keypair ? self._hypercore(storage, keypair.publicKey, Object.assign({}, self._opts, { secretKey: keypair.secretKey })) : self._hypercore(storage, self._opts) - feed.on('error', function (err) { - self.emit('error', err) - }) + + feed.on('error', self._onFeedError) + feed.on('close', () => feed.removeListener('error', self._onFeedError)) feed.ready(function () { self._addFeed(feed, String(idx)) diff --git a/mux.js b/mux.js index 7bc2cf5..d656311 100644 --- a/mux.js +++ b/mux.js @@ -35,22 +35,24 @@ function Multiplexer (isInitiator, key, opts) { self._activeFeedStreams = {} var onFirstKey = true + + self._ondiscoverykey = function (key) { + if (onFirstKey) { + onFirstKey = false + if (!this.stream.remoteVerified(key)) { + this._finalize(new Error('Exchange key did not match remote')) + } + } + }.bind(self) + if (Protocol.isProtocolStream(isInitiator)) { var stream = this.stream = isInitiator - stream.on('discovery-key', ondiscoverykey) + stream.on('discovery-key', self._ondiscoverykey) } else { var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, { - ondiscoverykey + ondiscoverykey: self._ondiscoverykey })) } - function ondiscoverykey (key) { - if (onFirstKey) { - onFirstKey = false - if (!self.stream.remoteVerified(key)) { - self._finalize(new Error('Exchange key did not match remote')) - } - } - } this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, { onmessage: onHandshake, @@ -175,6 +177,7 @@ Multiplexer.prototype._finalize = function (err) { debug(this._id + ' [REPLICATION] finalized', err) this.stream.finalize() } + this.stream.removeListener('discovery-key', this._ondiscoverykey) } // Calls to this method results in the creation of a 'manifest' diff --git a/package.json b/package.json index c054c89..e7599bc 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ }, "keywords": [], "dependencies": { + "async-series": "0.0.1", "debug": "^4.1.0", "hypercore": "^8.3.0", "hypercore-protocol": "^7.7.1", diff --git a/test/memory-leak.js b/test/memory-leak.js new file mode 100644 index 0000000..a22f7f0 --- /dev/null +++ b/test/memory-leak.js @@ -0,0 +1,74 @@ +var test = require('tape') +var multifeed = require('..') +var ram = require('random-access-memory') +var series = require('async-series') + +test('so many feeds', function (t) { + function createMultifeed (cb) { + var multi = multifeed(ram, { valueEncoding: 'json' }) + multi.writer(function (err, feed) { + if (err) throw err + multi.ready(function () { + if ((Math.random() * 100) < 30) feed.append({'foo': 'bar'}, done) + else done() + }) + }) + + function done (err) { + cb(err, multi) + } + } + + var total = 20 + var tasks = [] + var feeds = [] + + for (var i = 0; i < total; i++) { + ;(function (n) { + tasks.push((done) => { + // console.log('making', n) + createMultifeed((err, m) => { + if (err) return done(err) + // console.log('made', n) + feeds.push(m) + m.n = n + done(null, m) + }) + }) + })(i) + } + + series(tasks, (err) => { + if (err) throw err + console.log('created', feeds.length, 'feeds') + console.log('ready!') + var replications = [] + + feeds.sort((a, b) => { + if (a && b) { + replications.push((done) => { + // console.log('syncing', a.n, 'and', b.n) + replicate(a, b, err => { + // console.log('SYNCED', a.n, 'and', b.n) + done(err) + }) + }) + } + return -1 + }) + + console.log('pre-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb') + series(replications, (err) => { + if (err) throw err + console.log('replicated! everything!') + console.log('post-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb') + t.end() + }) + }) + + function replicate (m1, m2, cb) { + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', cb) + } +})