diff --git a/mux.js b/mux.js index 0fecd03..39e9263 100644 --- a/mux.js +++ b/mux.js @@ -4,6 +4,7 @@ var inherits = require('inherits') var events = require('events') var debug = require('debug')('multifeed') var once = require('once') +var AbstractExtension = require('abstract-extension') // constants var MULTIFEED = 'MULTIFEED' @@ -19,6 +20,14 @@ var EXT_REPLICATE_FEEDS = 'MULTIFEED_REPLICATE_FEEDS' var ERR_VERSION_MISMATCH = 'ERR_VERSION_MISMATCH' var ERR_CLIENT_MISMATCH = 'ERR_CLIENT_MISMATCH' +var DEFAULT_TIMEOUT = 10000 + +class Extension extends AbstractExtension { + send (message) { + this.local.handlers.send(this.id, this.encode(message)) + } +} + // `key` - protocol encryption key function Multiplexer (isInitiator, key, opts) { if (!(this instanceof Multiplexer)) return new Multiplexer(isInitiator, key, opts) @@ -34,76 +43,88 @@ function Multiplexer (isInitiator, key, opts) { self._remoteOffer = [] self._activeFeedStreams = {} - var onFirstKey = true - var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, { - ondiscoverykey: function (key) { - if (onFirstKey) { - onFirstKey = false - if (!self.stream.remoteVerified(key)) { - self._finalize(new Error('Exchange key did not match remote')) - } - } - } - })) - - this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, { - onmessage: onHandshake, - onerror: function (err) { - self._finalize(err) - }, - encoding: 'json' - }) - - function onHandshake (header) { - debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header)) - var err + if (opts.stream) { + self.stream = opts.stream + } else { + self.stream = new Protocol(isInitiator, Object.assign({}, opts)) + } - if (!compatibleVersions(header.version, PROTOCOL_VERSION)) { - debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')') - err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version) - err.code = ERR_VERSION_MISMATCH - err.usVersion = PROTOCOL_VERSION - err.themVersion = header.version - self._finalize(err) - return + // Prepare the extension handlers. + self._extensions = Extension.createLocal({ + send (id, message) { + self._channel.extension(id, message) } + }) + self._remoteExtensions = self._extensions.remote() - if (header.client !== MULTIFEED) { - debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client) - err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client) - err.code = ERR_CLIENT_MISMATCH - err.usClient = MULTIFEED - err.themClient = header.client - self._finalize(err) - return - } - - // Wait a tick, otherwise the _ready handler below won't be listening for this event yet. - process.nextTick(function () { - self.emit('ready', header) - }) - } - - // Open a virtual feed that has the key set to the shared key. - this._feed = stream.open(key, { + // Open a protocol channel on the shared key. + self._channel = self.stream.open(key, { onopen: function () { - onFirstKey = false - if (!stream.remoteVerified(key)) { + if (!self.stream.remoteVerified(key)) { debug(self._id + ' [REPLICATION] aborting; shared key mismatch') - self._finalize(new Error('shared key version mismatch!')) + self._finalize(new Error('shared key mismatch!')) return } + self._registerExtensions() + // send handshake - self._handshakeExt.send(Object.assign({}, opts, { + self._handshakeExt.send(Object.assign({}, { client: MULTIFEED, version: PROTOCOL_VERSION, userData: opts.userData })) + }, + onextension (id, message) { + self._remoteExtensions.onmessage(id, message, self._channel) + }, + onoptions (options) { + self._remoteExtensions.update(options.extensions) } }) - this._manifestExt = stream.registerExtension(EXT_MANIFEST, { + if (!self._opts.live) { + self.stream.on('prefinalize', function () { + self._channel.close() + debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')') + }) + } + + // The timeout will occur if the remote does not open a channel for + // the shared key. + if (opts.timeout !== false) { + this.timeout = setTimeout(() => { + this._finalize(new Error('Multifeed handshake remote timeout')) + }, opts.timeout || DEFAULT_TIMEOUT) + } + + this._ready = readify(function (done) { + self.on('ready', function (remote) { + if (this.timeout) clearTimeout(this.timeout) + debug(self._id + ' [REPLICATION] remote connected and ready') + done(remote) + }) + }) +} + +inherits(Multiplexer, events.EventEmitter) + +Multiplexer.prototype.ready = function (cb) { + this._ready(cb) +} + +Multiplexer.prototype._registerExtensions = function () { + var self = this + + self._handshakeExt = self._extensions.add(EXT_HANDSHAKE, { + onmessage: onHandshake, + onerror: function (err) { + self._finalize(err) + }, + encoding: 'json' + }) + + self._manifestExt = self._extensions.add(EXT_MANIFEST, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext MANIFEST:', JSON.stringify(msg)) self._remoteOffer = uniq(self._remoteOffer.concat(msg.keys)) @@ -115,7 +136,7 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - this._requestFeedsExt = stream.registerExtension(EXT_REQUEST_FEEDS, { + self._requestFeedsExt = self._extensions.add(EXT_REQUEST_FEEDS, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext REQUEST_FEEDS:', msg) self._onRequestFeeds(msg) @@ -126,7 +147,7 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - this._replicateFeedsExt = stream.registerExtension(EXT_REPLICATE_FEEDS, { + self._replicateFeedsExt = self._extensions.add(EXT_REPLICATE_FEEDS, { onmessage: function (msg) { debug(self._id, 'RECV\'D Ext REPLICATE_FEEDS:', msg) self._onRemoteReplicate(msg) @@ -137,25 +158,38 @@ function Multiplexer (isInitiator, key, opts) { encoding: 'json' }) - if (!self._opts.live) { - self.stream.on('prefinalize', function () { - self._feed.close() - debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')') - }) - } + // Send the extension names to our remote peer. + self._channel.options({ extensions: self._extensions.names() }) - this._ready = readify(function (done) { - self.on('ready', function (remote) { - debug(self._id + ' [REPLICATION] remote connected and ready') - done(remote) - }) - }) -} + function onHandshake (header) { + debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header)) + var err -inherits(Multiplexer, events.EventEmitter) + if (!compatibleVersions(header.version, PROTOCOL_VERSION)) { + debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')') + err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version) + err.code = ERR_VERSION_MISMATCH + err.usVersion = PROTOCOL_VERSION + err.themVersion = header.version + self._finalize(err) + return + } -Multiplexer.prototype.ready = function (cb) { - this._ready(cb) + if (header.client !== MULTIFEED) { + debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client) + err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client) + err.code = ERR_CLIENT_MISMATCH + err.usClient = MULTIFEED + err.themClient = header.client + self._finalize(err) + return + } + + // Wait a tick, otherwise the _ready handler below won't be listening for this event yet. + process.nextTick(function () { + self.emit('ready', header) + }) + } } Multiplexer.prototype._finalize = function (err) { @@ -291,7 +325,7 @@ Multiplexer.prototype._replicateFeeds = function (keys, cb) { // Bail on replication entirely if there were no feeds to add, and none are pending or active. if (feeds.length === 0 && Object.keys(self._activeFeedStreams).length === 0) { debug('[REPLICATION] terminating mux: no feeds to sync') - self._feed.close() + self._channel.close() process.nextTick(cb) } } diff --git a/package.json b/package.json index 0a58413..f255816 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,10 @@ }, "keywords": [], "dependencies": { + "abstract-extension": "^3.1.0", "debug": "^4.1.0", - "hypercore": "^8.3.0", - "hypercore-protocol": "^7.7.1", + "hypercore": "^9", + "hypercore-protocol": "^8", "inherits": "^2.0.3", "mutexify": "^1.2.0", "once": "^1.4.0", @@ -26,13 +27,16 @@ "through2": "^3.0.0" }, "devDependencies": { - "hypercore-crypto": "^1.0.0", + "@hyperswarm/dht": "^3.6.5", + "corestore": "^5.3.2", + "corestore-swarm-networking": "^5.4.3", + "hypercore-crypto": "^2.0.0", "pump": "^3.0.0", "pumpify": "^1.5.1", "random-access-latency": "^1.0.0", "rimraf": "^2.6.3", "standard": "~10.0.0", - "tape": "~4.6.2", + "tape": "^5", "tmp": "0.0.33" }, "license": "ISC" diff --git a/test/corestore.js b/test/corestore.js new file mode 100644 index 0000000..51ed88b --- /dev/null +++ b/test/corestore.js @@ -0,0 +1,184 @@ +const test = require('tape') +const crypto = require('hypercore-crypto') +const hyperswarm = require('hyperswarm') +const ram = require('random-access-memory') +const Corestore = require('corestore') +const dht = require('@hyperswarm/dht') +const SwarmNetworker = require('corestore-swarm-networking') + +const Muxer = require('./lib/corestore') +const multifeed = require('..') + +const BOOTSTRAP_PORT = 3100 +var bootstrap = null + +const KEY_A = Buffer.alloc(32, 1) +const KEY_B = Buffer.alloc(32, 2) + +test('corestore networker example', async function (t) { + // Create two distinct corestores and networkers. + // They will communicate over a localhost DHT. + const { store: store1, networker: networker1 } = await create() + const { store: store2, networker: networker2 } = await create() + + // Init some cores. + const core1 = store1.get() + const core2a = store2.get() + const core2b = store2.get() + + await append(core1, 'hello') + const data = await get(core1, 0) + t.same(data, Buffer.from('hello')) + + // For each networker, setup the mulitfeed mux wrapper. + const muxer1 = new Muxer(networker1) + const muxer2 = new Muxer(networker2) + + // For each mux wrapper, join on two different multifeed rootkeys. + const mux1a = muxer1.join(KEY_A, { live: true, name: 'm1a' }) + const mux1b = muxer1.join(KEY_B, { live: true, name: 'm1b' }) + + const mux2a = muxer2.join(KEY_A, { live: true, name: 'mux2a' }) + const mux2b = muxer2.join(KEY_B, { live: true, name: 'mux2b' }) + + // Person 1 adds the same feed to both multifeeds. + mux1a.addFeed(core1.key) + mux1b.addFeed(core1.key) + + // Person2 adds two different feeds to each multifeed. + mux2a.addFeed(core2a.key) + mux2b.addFeed(core2b.key) + + // Wait for things to sync. + // TODO: Remove timeout, wait for event instead. + await timeout(500) + + // Check that the muxers for the same keys arrived at the same set of feeds. + t.deepEqual(toKeys(mux1a.feeds()), toKeys([core1, core2a])) + t.deepEqual(toKeys(mux2a.feeds()), toKeys([core1, core2a])) + t.deepEqual(toKeys(mux1b.feeds()), toKeys([core1, core2b])) + t.deepEqual(toKeys(mux2b.feeds()), toKeys([core1, core2b])) + + // Check that the cores actually replicated. + let checked = false + for (const feed of mux2b.feeds()) { + if (feed.key.toString('hex') === core1.key.toString('hex')) { + const data = await get(store2.get(core1.key), 0) + t.same(data, Buffer.from('hello')) + checked = true + } + } + if (!checked) t.fail('missing data check') + + // Cleanup. + await cleanup([networker1, networker2]) + t.end() +}) + +test('corestore to multifeed over hyperswarm', async t => { + const muxkey = KEY_A + + // setup a corestore muxer + const { store, networker } = await create() + const core = store.get() + const muxer = new Muxer(networker) + const mux = muxer.join(muxkey) + mux.addFeed(core.key) + + // setup a multifeed muxer plus network + const multi = multifeed(ram, { encryptionKey: muxkey }) + let multifeedWriter + await new Promise(resolve => { + multi.writer('local', (err, feed) => { + multifeedWriter = feed + t.error(err) + feed.append('hello', resolve) + }) + }) + const swarm2 = hyperswarm({ + bootstrap: `localhost:${BOOTSTRAP_PORT}` + }) + swarm2.join(crypto.discoveryKey(muxkey), { announce: true, lookup: true }) + let didConnect = false + // Just pipe all connections directly into the multifeed replication stream. + // This is what e.g. cabal currently does, and would need a seperate hyperswarm + // instance to replicate multiple multifeeds in parallel (before this patch). + swarm2.on('connection', (socket, details) => { + if (didConnect) return + didConnect = true + const isInitiator = !!details.client + const stream = multi.replicate(isInitiator, { live: true }) + stream.pipe(socket).pipe(stream) + }) + + // wait and see + await timeout(500) + t.deepEqual(toKeys(multi.feeds()), toKeys([multifeedWriter, core])) + t.deepEqual(toKeys(mux.feeds()), toKeys([multifeedWriter, core])) + swarm2.destroy() + await cleanup([networker]) +}) + +function toKeys (feeds) { + return feeds.map(f => f.key.toString('hex')).sort() +} + +function append (core, data) { + return new Promise((resolve, reject) => { + core.append(data, err => { + if (err) return reject(err) + return resolve() + }) + }) +} +function get (core, idx, opts = {}) { + return new Promise((resolve, reject) => { + core.get(idx, opts, (err, data) => { + if (err) return reject(err) + return resolve(data) + }) + }) +} + +function timeout (ms) { + return new Promise(resolve => setTimeout(resolve, ms)) +} + +async function create (opts = {}) { + if (!bootstrap) { + bootstrap = dht({ + bootstrap: false + }) + bootstrap.listen(BOOTSTRAP_PORT) + await new Promise(resolve => { + return bootstrap.once('listening', resolve) + }) + } + const store = new Corestore(ram) + await store.ready() + const networker = new SwarmNetworker(store, { + ...opts, + bootstrap: `localhost:${BOOTSTRAP_PORT}` + }) + // logEvents(networker, 'networker') + return { store, networker } +} + +async function cleanup (networkers) { + for (let networker of networkers) { + await networker.close() + } + if (bootstrap) { + await bootstrap.destroy() + bootstrap = null + } +} + +function logEvents (emitter, name) { + const emit = emitter.emit.bind(emitter) + emitter.emit = function (event, ...args) { + console.log(name, event) + if (event === 'replication-error') console.log(args) + emit(event, ...args) + } +} diff --git a/test/lib/corestore.js b/test/lib/corestore.js new file mode 100644 index 0000000..b049c4f --- /dev/null +++ b/test/lib/corestore.js @@ -0,0 +1,127 @@ +const crypto = require('hypercore-crypto') +const Multiplexer = require('../../mux') +const debug = require('debug')('multifeed') +const { EventEmitter } = require('events') + +class CorestoreMuxerTopic extends EventEmitter { + constructor (corestore, rootKey, opts = {}) { + super() + this.corestore = corestore + this.rootKey = rootKey + this._feeds = new Map() + this.streams = new Map() + this.opts = opts + } + + addStream (stream, opts) { + const self = this + opts = { ...this.opts, ...opts, stream } + + const mux = new Multiplexer(null, this.rootKey, opts) + + mux.on('manifest', onmanifest) + mux.on('replicate', onreplicate) + mux.ready(onready) + + stream.once('end', cleanup) + stream.once('error', cleanup) + this.streams.set(stream, { mux, cleanup }) + + function onready () { + const keys = Array.from(self._feeds.keys()) + mux.offerFeeds(keys) + } + + function onmanifest (manifest) { + mux.requestFeeds(manifest.keys) + } + + function onreplicate (keys, repl) { + for (const key of keys) { + if (self._feeds.has(key)) continue + self.addFeed(key) + self.emit('feed', self._feeds.get(key)) + } + const feeds = keys.map(key => self._feeds.get(key)) + repl(feeds) + } + + function cleanup (_err) { + mux.removeListener('manifest', onmanifest) + mux.removeListener('replicate', onreplicate) + self.streams.delete(stream) + } + } + + removeStream (stream) { + if (!this.streams.has(stream)) return + const { cleanup } = this.streams.get(stream) + cleanup() + } + + feeds () { + return Array.from(this._feeds.values()) + } + + addFeed (key) { + if (!Buffer.isBuffer(key)) key = Buffer.from(key, 'hex') + const feed = this.corestore.get(key) + const hkey = feed.key.toString('hex') + this._feeds.set(hkey, feed) + for (const { mux } of this.streams.values()) { + mux.ready(() => { + if (mux.knownFeeds().indexOf(hkey) === -1) { + debug('Forwarding new feed to existing peer:', hkey) + mux.offerFeeds([hkey]) + } + }) + } + } +} + +module.exports = class CorestoreMuxer { + constructor (networker) { + this.networker = networker + this.corestore = networker.corestore + this.muxers = new Map() + this.streamsByKey = new Map() + + this._joinListener = this._onjoin.bind(this) + this._leaveListener = this._onleave.bind(this) + this.networker.on('handshake', this._joinListener) + this.networker.on('stream-closed', this._leaveListener) + } + + _onjoin (stream, info) { + const remoteKey = stream.remotePublicKey + const keyString = remoteKey.toString('hex') + this.streamsByKey.set(keyString, { stream, info }) + for (const mux of this.muxers.values()) { + mux.addStream(stream, this.opts) + } + } + + _onleave (stream, info, finishedHandshake) { + if (!finishedHandshake || (info && info.duplicate)) return + for (const mux of this.muxers.values()) { + mux.removeStream(stream) + } + } + + join (rootKey, opts) { + if (!Buffer.isBuffer(rootKey)) rootKey = Buffer.from(rootKey, 'hex') + const hkey = rootKey.toString('hex') + if (this.muxers.has(hkey)) return this.muxers.get(hkey) + + const discoveryKey = crypto.discoveryKey(rootKey) + const mux = new CorestoreMuxerTopic(this.corestore, rootKey, opts) + this.networker.join(discoveryKey) + this.muxers.set(hkey, mux) + for (const { stream } of this.streamsByKey.values()) { + mux.addStream(stream) + } + return mux + } +} + +module.exports.CorestoreMuxerTopic = CorestoreMuxerTopic diff --git a/test/regression.js b/test/regression.js index 18d5dd9..e51502f 100644 --- a/test/regression.js +++ b/test/regression.js @@ -251,7 +251,7 @@ test('regression: MFs with different root keys cannot replicate', function (t) { var r = m1.replicate(true) var s = m2.replicate(false) pump(r, s, r, function (err) { - t.same(err.toString(), 'Error: Exchange key did not match remote') + t.same(err.toString(), 'Error: Multifeed handshake remote timeout') t.end() }) })