Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ function Multifeed (storage, opts) {
this._close = readyify(_close.bind(this), true)
this.closed = false

this._onFeedError = function (err) {
this.emit('error', err)
}.bind(this)

// random-access-storage wrapper that wraps all hypercores in a directory
// structures. (dir/0, dir/1, ...)
this._storage = function (dir) {
Expand All @@ -61,9 +65,8 @@ function Multifeed (storage, opts) {

var feed = hypercore(ram, encryptionKey)

feed.on('error', function (err) {
self.emit('error', err)
})
feed.on('error', self._onFeedError)
feed.on('close', () => feed.removeListener('error', self._onFeedError))

feed.ready(function () {
self._root = feed
Expand Down Expand Up @@ -218,9 +221,9 @@ Multifeed.prototype.writer = function (name, opts, cb) {
var feed = keypair
? self._hypercore(storage, keypair.publicKey, Object.assign({}, self._opts, { secretKey: keypair.secretKey }))
: self._hypercore(storage, self._opts)
feed.on('error', function (err) {
self.emit('error', err)
})

feed.on('error', self._onFeedError)
feed.on('close', () => feed.removeListener('error', self._onFeedError))

feed.ready(function () {
self._addFeed(feed, String(idx))
Expand Down
23 changes: 13 additions & 10 deletions mux.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,24 @@ function Multiplexer (isInitiator, key, opts) {
self._activeFeedStreams = {}

var onFirstKey = true

self._ondiscoverykey = function (key) {
if (onFirstKey) {
onFirstKey = false
if (!this.stream.remoteVerified(key)) {
this._finalize(new Error('Exchange key did not match remote'))
}
}
}.bind(self)

if (Protocol.isProtocolStream(isInitiator)) {
var stream = this.stream = isInitiator
stream.on('discovery-key', ondiscoverykey)
stream.on('discovery-key', self._ondiscoverykey)
} else {
var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, {
ondiscoverykey
ondiscoverykey: self._ondiscoverykey
}))
}
function ondiscoverykey (key) {
if (onFirstKey) {
onFirstKey = false
if (!self.stream.remoteVerified(key)) {
self._finalize(new Error('Exchange key did not match remote'))
}
}
}

this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, {
onmessage: onHandshake,
Expand Down Expand Up @@ -175,6 +177,7 @@ Multiplexer.prototype._finalize = function (err) {
debug(this._id + ' [REPLICATION] finalized', err)
this.stream.finalize()
}
this.stream.removeListener('discovery-key', this._ondiscoverykey)
}

// Calls to this method results in the creation of a 'manifest'
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
},
"keywords": [],
"dependencies": {
"async-series": "0.0.1",
"debug": "^4.1.0",
"hypercore": "^8.3.0",
"hypercore-protocol": "^7.7.1",
Expand Down
74 changes: 74 additions & 0 deletions test/memory-leak.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
var test = require('tape')
var multifeed = require('..')
var ram = require('random-access-memory')
var series = require('async-series')

test('so many feeds', function (t) {
function createMultifeed (cb) {
var multi = multifeed(ram, { valueEncoding: 'json' })
multi.writer(function (err, feed) {
if (err) throw err
multi.ready(function () {
if ((Math.random() * 100) < 30) feed.append({'foo': 'bar'}, done)
else done()
})
})

function done (err) {
cb(err, multi)
}
}

var total = 20
var tasks = []
var feeds = []

for (var i = 0; i < total; i++) {
;(function (n) {
tasks.push((done) => {
// console.log('making', n)
createMultifeed((err, m) => {
if (err) return done(err)
// console.log('made', n)
feeds.push(m)
m.n = n
done(null, m)
})
})
})(i)
}

series(tasks, (err) => {
if (err) throw err
console.log('created', feeds.length, 'feeds')
console.log('ready!')
var replications = []

feeds.sort((a, b) => {
if (a && b) {
replications.push((done) => {
// console.log('syncing', a.n, 'and', b.n)
replicate(a, b, err => {
// console.log('SYNCED', a.n, 'and', b.n)
done(err)
})
})
}
return -1
})

console.log('pre-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb')
series(replications, (err) => {
if (err) throw err
console.log('replicated! everything!')
console.log('post-sync heap', process.memoryUsage().heapUsed / 1000000, 'mb')
t.end()
})
})

function replicate (m1, m2, cb) {
var r = m1.replicate(true)
r.pipe(m2.replicate(false)).pipe(r)
.once('end', cb)
}
})