From 2d940c7d2006262d1c7dc98d3b0a856f683f3107 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 10 Jun 2020 15:45:46 +0200 Subject: [PATCH 1/8] Move the mux extensions onto the root channel. Before, the extensions where mounted on the stream and not on a channel. Now, the extensions are mounted on the channel that is opened with the root key. This makes it possible to mount the mux extensions onto any existing hypercore-protocol stream, multifeed does not have to own the stream. This will allow to make multifeed compatible with corestore and corestore-swarm-networker. --- mux.js | 180 +++++++++++++++++++++++++++------------------ package.json | 6 +- test/regression.js | 2 +- 3 files changed, 113 insertions(+), 75 deletions(-) diff --git a/mux.js b/mux.js index 0fecd03..39e9263 100644 --- a/mux.js +++ b/mux.js @@ -4,6 +4,7 @@ var inherits = require('inherits') var events = require('events') var debug = require('debug')('multifeed') var once = require('once') +var AbstractExtension = require('abstract-extension') // constants var MULTIFEED = 'MULTIFEED' @@ -19,6 +20,14 @@ var EXT_REPLICATE_FEEDS = 'MULTIFEED_REPLICATE_FEEDS' var ERR_VERSION_MISMATCH = 'ERR_VERSION_MISMATCH' var ERR_CLIENT_MISMATCH = 'ERR_CLIENT_MISMATCH' +var DEFAULT_TIMEOUT = 10000 + +class Extension extends AbstractExtension { + send (message) { + this.local.handlers.send(this.id, this.encode(message)) + } +} + // `key` - protocol encryption key function Multiplexer (isInitiator, key, opts) { if (!(this instanceof Multiplexer)) return new Multiplexer(isInitiator, key, opts) @@ -34,76 +43,88 @@ function Multiplexer (isInitiator, key, opts) { self._remoteOffer = [] self._activeFeedStreams = {} - var onFirstKey = true - var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, { - ondiscoverykey: function (key) { - if (onFirstKey) { - onFirstKey = false - if (!self.stream.remoteVerified(key)) { - self._finalize(new Error('Exchange key did not match remote')) - } - } - } - })) - - this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, { - onmessage: onHandshake, - onerror: function (err) { - self._finalize(err) - }, - encoding: 'json' - }) - - function onHandshake (header) { - debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header)) - var err + if (opts.stream) { + self.stream = opts.stream + } else { + self.stream = new Protocol(isInitiator, Object.assign({}, opts)) + } - if (!compatibleVersions(header.version, PROTOCOL_VERSION)) { - debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')') - err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version) - err.code = ERR_VERSION_MISMATCH - err.usVersion = PROTOCOL_VERSION - err.themVersion = header.version - self._finalize(err) - return + // Prepare the extension handlers. + self._extensions = Extension.createLocal({ + send (id, message) { + self._channel.extension(id, message) } + }) + self._remoteExtensions = self._extensions.remote() - if (header.client !== MULTIFEED) { - debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client) - err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client) - err.code = ERR_CLIENT_MISMATCH - err.usClient = MULTIFEED - err.themClient = header.client - self._finalize(err) - return - } - - // Wait a tick, otherwise the _ready handler below won't be listening for this event yet. - process.nextTick(function () { - self.emit('ready', header) - }) - } - - // Open a virtual feed that has the key set to the shared key. - this._feed = stream.open(key, { + // Open a protocol channel on the shared key. + self._channel = self.stream.open(key, { onopen: function () { - onFirstKey = false - if (!stream.remoteVerified(key)) { + if (!self.stream.remoteVerified(key)) { debug(self._id + ' [REPLICATION] aborting; shared key mismatch') - self._finalize(new Error('shared key version mismatch!')) + self._finalize(new Error('shared key mismatch!')) return } + self._registerExtensions() + // send handshake - self._handshakeExt.send(Object.assign({}, opts, { + self._handshakeExt.send(Object.assign({}, { client: MULTIFEED, version: PROTOCOL_VERSION, userData: opts.userData })) + }, + onextension (id, message) { + self._remoteExtensions.onmessage(id, message, self._channel) + }, + onoptions (options) { + self._remoteExtensions.update(options.extensions) } }) - this._manifestExt = stream.registerExtension(EXT_MANIFEST, { + if (!self._opts.live) { + self.stream.on('prefinalize', function () { + self._channel.close() + debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')') + }) + } + + // The timeout will occur if the remote does not open a channel for + // the shared key. + if (opts.timeout !== false) { + this.timeout = setTimeout(() => { + this._finalize(new Error('Multifeed handshake remote timeout')) + }, opts.timeout || DEFAULT_TIMEOUT) + } + + this._ready = readify(function (done) { + self.on('ready', function (remote) { + if (this.timeout) clearTimeout(this.timeout) + debug(self._id + ' [REPLICATION] remote connected and ready') + done(remote) + }) + }) +} + +inherits(Multiplexer, events.EventEmitter) + +Multiplexer.prototype.ready = function (cb) { + this._ready(cb) +} + +Multiplexer.prototype._registerExtensions = function () { + var self = this + + self._handshakeExt = self._extensions.add(EXT_HANDSHAKE, { + onmessage: onHandshake, + onerror: function (err) { + self._finalize(err) + }, + encoding: 'json' + }) + + self._manifestExt = self._extensions.add(EXT_MANIFEST, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext MANIFEST:', JSON.stringify(msg)) self._remoteOffer = uniq(self._remoteOffer.concat(msg.keys)) @@ -115,7 +136,7 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - this._requestFeedsExt = stream.registerExtension(EXT_REQUEST_FEEDS, { + self._requestFeedsExt = self._extensions.add(EXT_REQUEST_FEEDS, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext REQUEST_FEEDS:', msg) self._onRequestFeeds(msg) @@ -126,7 +147,7 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - this._replicateFeedsExt = stream.registerExtension(EXT_REPLICATE_FEEDS, { + self._replicateFeedsExt = self._extensions.add(EXT_REPLICATE_FEEDS, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext REPLICATE_FEEDS:', msg) self._onRemoteReplicate(msg) @@ -137,25 +158,38 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - if (!self._opts.live) { - self.stream.on('prefinalize', function () { - self._feed.close() - debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')') - }) - } + // Send the extension names to our remote peer. + self._channel.options({ extensions: self._extensions.names() }) - this._ready = readify(function (done) { - self.on('ready', function (remote) { - debug(self._id + ' [REPLICATION] remote connected and ready') - done(remote) - }) - }) -} + function onHandshake (header) { + debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header)) + var err -inherits(Multiplexer, events.EventEmitter) + if (!compatibleVersions(header.version, PROTOCOL_VERSION)) { + debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')') + err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version) + err.code = ERR_VERSION_MISMATCH + err.usVersion = PROTOCOL_VERSION + err.themVersion = header.version + self._finalize(err) + return + } -Multiplexer.prototype.ready = function (cb) { - this._ready(cb) + if (header.client !== MULTIFEED) { + debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client) + err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client) + err.code = ERR_CLIENT_MISMATCH + err.usClient = MULTIFEED + err.themClient = header.client + self._finalize(err) + return + } + + // Wait a tick, otherwise the _ready handler below won't be listening for this event yet. + process.nextTick(function () { + self.emit('ready', header) + }) + } } Multiplexer.prototype._finalize = function (err) { @@ -291,7 +325,7 @@ Multiplexer.prototype._replicateFeeds = function (keys, cb) { // Bail on replication entirely if there were no feeds to add, and none are pending or active. if (feeds.length === 0 && Object.keys(self._activeFeedStreams).length === 0) { debug('[REPLICATION] terminating mux: no feeds to sync') - self._feed.close() + self._channel.close() process.nextTick(cb) } } diff --git a/package.json b/package.json index 0a58413..a9a0ed8 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ }, "keywords": [], "dependencies": { + "abstract-extension": "^3.1.0", "debug": "^4.1.0", "hypercore": "^8.3.0", "hypercore-protocol": "^7.7.1", @@ -26,13 +27,16 @@ "through2": "^3.0.0" }, "devDependencies": { + "@hyperswarm/dht": "^3.6.5", + "corestore": "^5.3.2", + "corestore-swarm-networking": "^5.4.3", "hypercore-crypto": "^1.0.0", "pump": "^3.0.0", "pumpify": "^1.5.1", "random-access-latency": "^1.0.0", "rimraf": "^2.6.3", "standard": "~10.0.0", - "tape": "~4.6.2", + "tape": "^5", "tmp": "0.0.33" }, "license": "ISC" diff --git a/test/regression.js b/test/regression.js index 18d5dd9..e51502f 100644 --- a/test/regression.js +++ b/test/regression.js @@ -251,7 +251,7 @@ test('regression: MFs with different root keys cannot replicate', function (t) { var r = m1.replicate(true) var s = m2.replicate(false) pump(r, s, r, function (err) { - t.same(err.toString(), 'Error: Exchange key did not match remote') + t.same(err.toString(), 'Error: Multifeed handshake remote timeout') t.end() }) }) From ceb91d6f4d999d03ad8d8a0124371575d124946b Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 10 Jun 2020 17:13:58 +0200 Subject: [PATCH 2/8] Update hypercore and hypercore-protocol --- package.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index a9a0ed8..f255816 100644 --- a/package.json +++ b/package.json @@ -17,8 +17,8 @@ "dependencies": { "abstract-extension": "^3.1.0", "debug": "^4.1.0", - "hypercore": "^8.3.0", - "hypercore-protocol": "^7.7.1", + "hypercore": "^9", + "hypercore-protocol": "^8", "inherits": "^2.0.3", "mutexify": "^1.2.0", "once": "^1.4.0", @@ -30,7 +30,7 @@ "@hyperswarm/dht": "^3.6.5", "corestore": "^5.3.2", "corestore-swarm-networking": "^5.4.3", - "hypercore-crypto": "^1.0.0", + "hypercore-crypto": "^2.0.0", "pump": "^3.0.0", "pumpify": "^1.5.1", "random-access-latency": "^1.0.0", From ce622846b999bce1d032d74e7aa79b83cd00252d Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 10 Jun 2020 17:15:07 +0200 Subject: [PATCH 3/8] Allow to use the mux extensions with corestore. Exposes a CorestoreMuxer that makes the mux extensions usable on top of corestore and corestore-swarm-networker. See test/corestore.js for test and example. --- corestore.js | 114 +++++++++++++++++++++++++++++++ test/corestore.js | 171 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 corestore.js create mode 100644 test/corestore.js diff --git a/corestore.js b/corestore.js new file mode 100644 index 0000000..a5d93c5 --- /dev/null +++ b/corestore.js @@ -0,0 +1,114 @@ +const crypto = require('hypercore-crypto') +const Multiplexer = require('./mux') +const { EventEmitter } = require('events') + +class CorestoreMuxerTopic extends EventEmitter { + constructor (corestore, rootKey, opts = {}) { + super() + this.corestore = corestore + this.rootKey = rootKey + this._feeds = new Map() + this.streams = new Map() + } + + addStream (stream, info) { + const self = this + const isInitiator = !!info.client + const opts = { stream, live: true } + + const mux = new Multiplexer(isInitiator, this.rootKey, opts) + + mux.on('manifest', onmanifest) + mux.on('replicate', onreplicate) + mux.ready(onready) + + stream.once('end', cleanup) + stream.once('error', cleanup) + this.streams.set(stream, { mux, cleanup }) + + function onready () { + const keys = Array.from(self._feeds.keys()) + if (keys.length) mux.offerFeeds(keys) + } + + function onmanifest (manifest) { + mux.requestFeeds(manifest.keys) + } + + function onreplicate (keys, repl) { + for (const key of keys) { + if (self._feeds.has(key)) continue + self.addFeed(key) + self.emit('feed', self._feeds.get(key)) + } + const feeds = keys.map(key => self._feeds.get(key)) + repl(feeds) + } + + function cleanup (_err) { + mux.removeListener('manifest', onmanifest) + mux.removeListener('replicate', onreplicate) + self.streams.delete(stream) + } + } + + removeStream (stream) { + if (!this.streams.has(stream)) return + const { cleanup } = this.streams.get(stream) + cleanup() + } + + feeds () { + return Array.from(this._feeds.values()) + } + + addFeed (key) { + if (!Buffer.isBuffer(key)) key = Buffer.from(key, 'hex') + const feed = this.corestore.get(key) + const hkey = feed.key.toString('hex') + this._feeds.set(hkey, feed) + for (const { mux } of this.streams.values()) { + if (mux.knownFeeds().indexOf(hkey) === -1) { + // debug('Forwarding new feed to existing peer:', hexKey) + mux.offerFeeds([hkey]) + } + } + } +} + +module.exports = class CorestoreMuxer { + constructor (networker) { + this.networker = networker + this.corestore = networker.corestore + this.muxers = new Map() + + this._joinListener = this._onjoin.bind(this) + this._leaveListener = this._onleave.bind(this) + this.networker.on('handshake', this._joinListener) + this.networker.on('stream-closed', this._leaveListener) + } + + _onjoin (stream, info) { + for (const mux of this.muxers.values()) { + mux.addStream(stream, info) + } + } + + _onleave (stream, info, finishedHandshake) { + for (const mux of this.muxers.values()) { + mux.removeStream(stream) + } + } + + join (rootKey, opts) { + if (!Buffer.isBuffer(rootKey)) rootKey = Buffer.from(rootKey, 'hex') + const hkey = rootKey.toString('hex') + if (this.muxers.has(hkey)) return this.muxers.get(hkey) + + const discoveryKey = crypto.discoveryKey(rootKey) + const mux = new CorestoreMuxerTopic(this.corestore, rootKey, opts) + this.networker.join(discoveryKey) + this.muxers.set(hkey, mux) + return mux + } +} diff --git a/test/corestore.js b/test/corestore.js new file mode 100644 index 0000000..61b4ac6 --- /dev/null +++ b/test/corestore.js @@ -0,0 +1,171 @@ +const test = require('tape') +const crypto = require('hypercore-crypto') +const hyperswarm = require('hyperswarm') +const ram = require('random-access-memory') +const Corestore = require('corestore') +const dht = require('@hyperswarm/dht') +const SwarmNetworker = require('corestore-swarm-networking') + +const Muxer = require('../corestore') +const multifeed = require('..') + +const BOOTSTRAP_PORT = 3100 +var bootstrap = null + +const KEY_A = Buffer.alloc(32, 1) +const KEY_B = Buffer.alloc(32, 2) + +test('corestore networker example', async function (t) { + const { store: store1, networker: networker1 } = await create() + const { store: store2, networker: networker2 } = await create() + + const core1 = store1.get() + const core2a = store2.get() + const core2b = store2.get() + + await append(core1, 'hello') + const data = await get(core1, 0) + t.same(data, Buffer.from('hello')) + + const muxer1 = new Muxer(networker1) + const muxer2 = new Muxer(networker2) + + const mux1a = muxer1.join(KEY_A, { name: 'm1a' }) + const mux1b = muxer1.join(KEY_B, { name: 'm1b' }) + + const mux2a = muxer2.join(KEY_A, { name: 'mux2a' }) + const mux2b = muxer2.join(KEY_B, { name: 'mux2b' }) + + mux1a.addFeed(core1.key) + mux1b.addFeed(core1.key) + + mux2a.addFeed(core2a.key) + mux2b.addFeed(core2b.key) + + // TODO: Remove timeout, wait for event instead. + await timeout(500) + + t.deepEqual(toKeys(mux1a.feeds()), toKeys([core1, core2a])) + t.deepEqual(toKeys(mux2a.feeds()), toKeys([core1, core2a])) + t.deepEqual(toKeys(mux1b.feeds()), toKeys([core1, core2b])) + t.deepEqual(toKeys(mux2b.feeds()), toKeys([core1, core2b])) + let checked = false + for (const feed of mux2b.feeds()) { + if (feed.key.toString('hex') === core1.key.toString('hex')) { + const data = await get(store2.get(core1.key), 0) + t.same(data, Buffer.from('hello')) + checked = true + } + } + if (!checked) t.fail('missing data check') + await cleanup([networker1, networker2]) + t.end() +}) + +test('corestore to multifeed over hyperswarm', async t => { + const muxkey = KEY_A + + // setup a corestore muxer + const { store, networker } = await create() + const core = store.get() + const muxer = new Muxer(networker) + const mux = muxer.join(muxkey) + mux.addFeed(core.key) + + // setup a multifeed muxer plus network + const multi = multifeed(ram, { encryptionKey: muxkey }) + let multifeedWriter + await new Promise(resolve => { + multi.writer('local', (err, feed) => { + multifeedWriter = feed + t.error(err) + feed.append('hello', resolve) + }) + }) + const swarm2 = hyperswarm({ + bootstrap: `localhost:${BOOTSTRAP_PORT}` + }) + swarm2.join(crypto.discoveryKey(muxkey), { announce: true, lookup: true }) + let didConnect = false + // Just pipe all connections directly into the multifeed replication stream. + // This is what e.g. cabal currently does, and would need a seperate hyperswarm + // instance to replicate multiple multifeeds in parallel (before this patch). + swarm2.on('connection', (socket, details) => { + if (didConnect) return + didConnect = true + const isInitiator = !!details.client + const stream = multi.replicate(isInitiator, { live: true }) + stream.pipe(socket).pipe(stream) + }) + + // wait and see + await timeout(500) + t.deepEqual(toKeys(multi.feeds()), toKeys([multifeedWriter, core])) + t.deepEqual(toKeys(mux.feeds()), toKeys([multifeedWriter, core])) + swarm2.destroy() + await cleanup([networker]) +}) + +function toKeys (feeds) { + return feeds.map(f => f.key.toString('hex')).sort() +} + +function append (core, data) { + return new Promise((resolve, reject) => { + core.append(data, err => { + if (err) return reject(err) + return resolve() + }) + }) +} +function get (core, idx, opts = {}) { + return new Promise((resolve, reject) => { + core.get(idx, opts, (err, data) => { + if (err) return reject(err) + return resolve(data) + }) + }) +} + +function timeout (ms) { + return new Promise(resolve => setTimeout(resolve, ms)) +} + +async function create (opts = {}) { + if (!bootstrap) { + bootstrap = dht({ + bootstrap: false + }) + bootstrap.listen(BOOTSTRAP_PORT) + await new Promise(resolve => { + return bootstrap.once('listening', resolve) + }) + } + const store = new Corestore(ram) + await store.ready() + const networker = new SwarmNetworker(store, { + ...opts, + bootstrap: `localhost:${BOOTSTRAP_PORT}` + }) + // logEvents(networker, 'networker') + return { store, networker } +} + +async function cleanup (networkers) { + for (let networker of networkers) { + await networker.close() + } + if (bootstrap) { + await bootstrap.destroy() + bootstrap = null + } +} + +function logEvents (emitter, name) { + const emit = emitter.emit.bind(emitter) + emitter.emit = function (event, ...args) { + console.log(name, event) + if (event === 'replication-error') console.log(args) + emit(event, ...args) + } +} From f83bb591f15aed2cfc270e23cd309c367a27d983 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 12 Jun 2020 00:14:03 +0200 Subject: [PATCH 4/8] Add comments to corestore test --- test/corestore.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/corestore.js b/test/corestore.js index 61b4ac6..cd55724 100644 --- a/test/corestore.js +++ b/test/corestore.js @@ -16,9 +16,12 @@ const KEY_A = Buffer.alloc(32, 1) const KEY_B = Buffer.alloc(32, 2) test('corestore networker example', async function (t) { + // Create two distinct corestores and networkers. + // They will communicate over a localhost DHT. const { store: store1, networker: networker1 } = await create() const { store: store2, networker: networker2 } = await create() + // Init some cores. const core1 = store1.get() const core2a = store2.get() const core2b = store2.get() @@ -27,28 +30,36 @@ test('corestore networker example', async function (t) { const data = await get(core1, 0) t.same(data, Buffer.from('hello')) + // For each networker, setup the mulitfeed mux wrapper. const muxer1 = new Muxer(networker1) const muxer2 = new Muxer(networker2) + // For each mux wrapper, join on two different multifeed rootkeys. const mux1a = muxer1.join(KEY_A, { name: 'm1a' }) const mux1b = muxer1.join(KEY_B, { name: 'm1b' }) const mux2a = muxer2.join(KEY_A, { name: 'mux2a' }) const mux2b = muxer2.join(KEY_B, { name: 'mux2b' }) + // Person 1 adds the same feed to both multifeeds. mux1a.addFeed(core1.key) mux1b.addFeed(core1.key) + // Person2 adds two different feeds to each multifeed. mux2a.addFeed(core2a.key) mux2b.addFeed(core2b.key) + // Wait for things to sync. // TODO: Remove timeout, wait for event instead. await timeout(500) + // Check that the muxers for the same keys arrived at the same set of feeds. t.deepEqual(toKeys(mux1a.feeds()), toKeys([core1, core2a])) t.deepEqual(toKeys(mux2a.feeds()), toKeys([core1, core2a])) t.deepEqual(toKeys(mux1b.feeds()), toKeys([core1, core2b])) t.deepEqual(toKeys(mux2b.feeds()), toKeys([core1, core2b])) + + // Check that the cores actually replicated. let checked = false for (const feed of mux2b.feeds()) { if (feed.key.toString('hex') === core1.key.toString('hex')) { @@ -58,6 +69,8 @@ test('corestore networker example', async function (t) { } } if (!checked) t.fail('missing data check') + + // Cleanup. await cleanup([networker1, networker2]) t.end() }) From 69c4c9084e2e9cebaa90659fc12e96e81948434e Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 12 Jun 2020 00:23:00 +0200 Subject: [PATCH 5/8] corestore networker: properly track streams --- corestore.js | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/corestore.js b/corestore.js index a5d93c5..297706a 100644 --- a/corestore.js +++ b/corestore.js @@ -1,5 +1,6 @@ const crypto = require('hypercore-crypto') const Multiplexer = require('./mux') +const debug = require('debug')('multifeed') const { EventEmitter } = require('events') class CorestoreMuxerTopic extends EventEmitter { @@ -68,10 +69,12 @@ class CorestoreMuxerTopic extends EventEmitter { const hkey = feed.key.toString('hex') this._feeds.set(hkey, feed) for (const { mux } of this.streams.values()) { - if (mux.knownFeeds().indexOf(hkey) === -1) { - // debug('Forwarding new feed to existing peer:', hexKey) - mux.offerFeeds([hkey]) - } + mux.ready(() => { + if (mux.knownFeeds().indexOf(hkey) === -1) { + debug('Forwarding new feed to existing peer:', hkey) + mux.offerFeeds([hkey]) + } + }) } } } @@ -81,6 +84,7 @@ module.exports = class CorestoreMuxer { this.networker = networker this.corestore = networker.corestore this.muxers = new Map() + this.streamsByKey = new Map() this._joinListener = this._onjoin.bind(this) this._leaveListener = this._onleave.bind(this) @@ -89,12 +93,16 @@ module.exports = class CorestoreMuxer { } _onjoin (stream, info) { + const remoteKey = stream.remotePublicKey + const keyString = remoteKey.toString('hex') + this.streamsByKey.set(keyString, { stream, info }) for (const mux of this.muxers.values()) { mux.addStream(stream, info) } } _onleave (stream, info, finishedHandshake) { + if (!finishedHandshake || (info && info.duplicate)) return for (const mux of this.muxers.values()) { mux.removeStream(stream) } @@ -109,6 +117,9 @@ module.exports = class CorestoreMuxer { const mux = new CorestoreMuxerTopic(this.corestore, rootKey, opts) this.networker.join(discoveryKey) this.muxers.set(hkey, mux) + for (const { stream, info } of this.streamsByKey.values()) { + mux.addStream(stream, info) + } return mux } } From b73a7b5b9a9721cfe24f18e50c36f7bd1c5fdac5 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 12 Jun 2020 01:45:58 +0200 Subject: [PATCH 6/8] Better option handling --- corestore.js | 16 +++++++++------- test/corestore.js | 8 ++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/corestore.js b/corestore.js index 297706a..8d9dd00 100644 --- a/corestore.js +++ b/corestore.js @@ -10,14 +10,14 @@ class CorestoreMuxerTopic extends EventEmitter { this.rootKey = rootKey this._feeds = new Map() this.streams = new Map() + this.opts = opts } - addStream (stream, info) { + addStream (stream, opts) { const self = this - const isInitiator = !!info.client - const opts = { stream, live: true } + opts = { ...this.opts, ...opts, stream } - const mux = new Multiplexer(isInitiator, this.rootKey, opts) + const mux = new Multiplexer(null, this.rootKey, opts) mux.on('manifest', onmanifest) mux.on('replicate', onreplicate) @@ -97,7 +97,7 @@ module.exports = class CorestoreMuxer { const keyString = remoteKey.toString('hex') this.streamsByKey.set(keyString, { stream, info }) for (const mux of this.muxers.values()) { - mux.addStream(stream, info) + mux.addStream(stream, this.opts) } } @@ -117,9 +117,11 @@ module.exports = class CorestoreMuxer { const mux = new CorestoreMuxerTopic(this.corestore, rootKey, opts) this.networker.join(discoveryKey) this.muxers.set(hkey, mux) - for (const { stream, info } of this.streamsByKey.values()) { - mux.addStream(stream, info) + for (const { stream } of this.streamsByKey.values()) { + mux.addStream(stream) } return mux } } + +module.exports.CorestoreMuxerTopic = CorestoreMuxerTopic diff --git a/test/corestore.js b/test/corestore.js index cd55724..6ba5852 100644 --- a/test/corestore.js +++ b/test/corestore.js @@ -35,11 +35,11 @@ test('corestore networker example', async function (t) { const muxer2 = new Muxer(networker2) // For each mux wrapper, join on two different multifeed rootkeys. - const mux1a = muxer1.join(KEY_A, { name: 'm1a' }) - const mux1b = muxer1.join(KEY_B, { name: 'm1b' }) + const mux1a = muxer1.join(KEY_A, { live: true, name: 'm1a' }) + const mux1b = muxer1.join(KEY_B, { live: true, name: 'm1b' }) - const mux2a = muxer2.join(KEY_A, { name: 'mux2a' }) - const mux2b = muxer2.join(KEY_B, { name: 'mux2b' }) + const mux2a = muxer2.join(KEY_A, { live: true, name: 'mux2a' }) + const mux2b = muxer2.join(KEY_B, { live: true, name: 'mux2b' }) // Person 1 adds the same feed to both multifeeds. mux1a.addFeed(core1.key) From 90058bafdb748639cfbfdee86016a4f36e1fd5af Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 12 Jun 2020 01:53:39 +0200 Subject: [PATCH 7/8] Always call offerFeeds --- corestore.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/corestore.js b/corestore.js index 8d9dd00..5b4a82b 100644 --- a/corestore.js +++ b/corestore.js @@ -29,7 +29,7 @@ class CorestoreMuxerTopic extends EventEmitter { function onready () { const keys = Array.from(self._feeds.keys()) - if (keys.length) mux.offerFeeds(keys) + mux.offerFeeds(keys) } function onmanifest (manifest) { From 62e14ab652ede82428d4c9c55d1c9f2a11008656 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 29 Jun 2020 18:54:47 +0200 Subject: [PATCH 8/8] Move corestore.js to test. --- test/corestore.js | 2 +- corestore.js => test/lib/corestore.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename corestore.js => test/lib/corestore.js (98%) diff --git a/test/corestore.js b/test/corestore.js index 6ba5852..51ed88b 100644 --- a/test/corestore.js +++ b/test/corestore.js @@ -6,7 +6,7 @@ const Corestore = require('corestore') const dht = require('@hyperswarm/dht') const SwarmNetworker = require('corestore-swarm-networking') -const Muxer = require('../corestore') +const Muxer = require('./lib/corestore') const multifeed = require('..') const BOOTSTRAP_PORT = 3100 diff --git a/corestore.js b/test/lib/corestore.js similarity index 98% rename from corestore.js rename to test/lib/corestore.js index 5b4a82b..b049c4f 100644 --- a/corestore.js +++ b/test/lib/corestore.js @@ -1,5 +1,5 @@ const crypto = require('hypercore-crypto') -const Multiplexer = require('./mux') +const Multiplexer = require('../../mux') const debug = require('debug')('multifeed') const { EventEmitter } = require('events')