Skip to content

Commit d5b5b5e

Browse files
authored
Merge pull request #341 from share/handshake
Add basic handshake negotiation
2 parents 03ad2bf + 4b3ff13 commit d5b5b5e

File tree

11 files changed

+220
-73
lines changed

11 files changed

+220
-73
lines changed

lib/agent.js

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ function Agent(backend, stream) {
2020
this.stream = stream;
2121

2222
this.clientId = hat();
23+
// src is a client-configurable "id" which the client will set in its handshake,
24+
// and attach to its ops. This should take precedence over clientId if set.
25+
// Only legacy clients, or new clients connecting for the first time will use the
26+
// Agent-provided clientId. Ideally we'll deprecate clientId in favour of src
27+
// in the next breaking change.
28+
this.src = null;
2329
this.connectTime = Date.now();
2430

2531
// We need to track which documents are subscribed by the client. This is a
@@ -38,20 +44,15 @@ function Agent(backend, stream) {
3844
// active, and it is passed to each middleware call
3945
this.custom = {};
4046

41-
// Initialize the remote client by sending it its agent Id.
42-
this.send({
43-
a: 'init',
44-
protocol: 1,
45-
id: this.clientId,
46-
type: types.defaultType.uri
47-
});
47+
// Send the legacy message to initialize old clients with the random agent Id
48+
this.send(this._initMessage('init'));
4849
}
4950
module.exports = Agent;
5051

5152
// Close the agent with the client.
5253
Agent.prototype.close = function(err) {
5354
if (err) {
54-
logger.warn('Agent closed due to error', this.clientId, err.stack || err);
55+
logger.warn('Agent closed due to error', this._src(), err.stack || err);
5556
}
5657
if (this.closed) return;
5758
// This will end the writable stream and emit 'finish'
@@ -190,7 +191,7 @@ Agent.prototype._isOwnOp = function(collection, op) {
190191
// Detect ops from this client on the same projection. Since the client sent
191192
// these in, the submit reply will be sufficient and we can silently ignore
192193
// them in the streams for subscribed documents or queries
193-
return (this.clientId === op.src) && (collection === (op.i || op.c));
194+
return (this._src() === op.src) && (collection === (op.i || op.c));
194195
};
195196

196197
Agent.prototype.send = function(message) {
@@ -332,6 +333,9 @@ Agent.prototype._handleMessage = function(request, callback) {
332333
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));
333334

334335
switch (request.a) {
336+
case 'hs':
337+
if (request.id) this.src = request.id;
338+
return callback(null, this._initMessage('hs'));
335339
case 'qf':
336340
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
337341
case 'qs':
@@ -352,7 +356,13 @@ Agent.prototype._handleMessage = function(request, callback) {
352356
return this._unsubscribe(request.c, request.d, callback);
353357
case 'op':
354358
// Normalize the properties submitted
355-
var op = createClientOp(request, this.clientId);
359+
var op = createClientOp(request, this._src());
360+
if (op.seq >= util.MAX_SAFE_INTEGER) {
361+
return callback(new ShareDBError(
362+
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
363+
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
364+
));
365+
}
356366
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
357367
return this._submit(request.c, request.d, op, callback);
358368
case 'nf':
@@ -645,6 +655,20 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
645655
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
646656
};
647657

658+
Agent.prototype._initMessage = function(action) {
659+
return {
660+
a: action,
661+
protocol: 1,
662+
protocolMinor: 1,
663+
id: this._src(),
664+
type: types.defaultType.uri
665+
};
666+
};
667+
668+
Agent.prototype._src = function() {
669+
return this.src || this.clientId;
670+
};
671+
648672

649673
function createClientOp(request, clientId) {
650674
// src can be provided if it is not the same as the current agent,

lib/backend.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ Backend.prototype.close = function(callback) {
100100
finish();
101101
};
102102

103-
Backend.prototype.connect = function(connection, req) {
103+
Backend.prototype.connect = function(connection, req, callback) {
104104
var socket = new StreamSocket();
105105
if (connection) {
106106
connection.bindToSocket(socket);
@@ -113,6 +113,13 @@ Backend.prototype.connect = function(connection, req) {
113113
// not used internal to ShareDB, but it is handy for server-side only user
114114
// code that may cache state on the agent and read it in middleware
115115
connection.agent = agent;
116+
117+
if (typeof callback === 'function') {
118+
connection.once('connected', function() {
119+
callback(connection);
120+
});
121+
}
122+
116123
return connection;
117124
};
118125

lib/client/connection.js

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ function Connection(socket) {
5252
// A unique message number for the given id
5353
this.seq = 1;
5454

55-
// Equals agent.clientId on the server
55+
// Equals agent.src on the server
5656
this.id = null;
5757

5858
// This direct reference from connection to agent is not used internal to
@@ -141,6 +141,7 @@ Connection.prototype.bindToSocket = function(socket) {
141141

142142
socket.onopen = function() {
143143
connection._setState('connecting');
144+
connection._initializeHandshake();
144145
};
145146

146147
socket.onerror = function(err) {
@@ -183,29 +184,9 @@ Connection.prototype.handleMessage = function(message) {
183184
switch (message.a) {
184185
case 'init':
185186
// Client initialization packet
186-
if (message.protocol !== 1) {
187-
err = new ShareDBError(
188-
ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED,
189-
'Unsupported protocol version: ' + message.protocol
190-
);
191-
return this.emit('error', err);
192-
}
193-
if (types.map[message.type] !== types.defaultType) {
194-
err = new ShareDBError(
195-
ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH,
196-
message.type + ' does not match the server default type'
197-
);
198-
return this.emit('error', err);
199-
}
200-
if (typeof message.id !== 'string') {
201-
err = new ShareDBError(ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED, 'Client id must be a string');
202-
return this.emit('error', err);
203-
}
204-
this.id = message.id;
205-
206-
this._setState('connected');
207-
return;
208-
187+
return this._handleLegacyInit(message);
188+
case 'hs':
189+
return this._handleHandshake(err, message);
209190
case 'qf':
210191
var query = this.queries[message.id];
211192
if (query) query._handleFetch(err, message.data, message.extra);
@@ -305,8 +286,6 @@ Connection.prototype._handleBulkMessage = function(err, message, method) {
305286
};
306287

307288
Connection.prototype._reset = function() {
308-
this.seq = 1;
309-
this.id = null;
310289
this.agent = null;
311290
};
312291

@@ -714,3 +693,43 @@ Connection.prototype._handleSnapshotFetch = function(error, message) {
714693
delete this._snapshotRequests[message.id];
715694
snapshotRequest._handleResponse(error, message);
716695
};
696+
697+
Connection.prototype._handleLegacyInit = function(message) {
698+
// If the minor protocol version has been set, we can ignore this legacy
699+
// init message, and wait for a response to our handshake message.
700+
if (message.protocolMinor) return;
701+
this._initialize(message);
702+
};
703+
704+
Connection.prototype._initializeHandshake = function() {
705+
this.send({a: 'hs', id: this.id});
706+
};
707+
708+
Connection.prototype._handleHandshake = function(error, message) {
709+
if (error) return this.emit('error', error);
710+
this._initialize(message);
711+
};
712+
713+
Connection.prototype._initialize = function(message) {
714+
if (message.protocol !== 1) {
715+
return this.emit('error', new ShareDBError(
716+
ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED,
717+
'Unsupported protocol version: ' + message.protocol
718+
));
719+
}
720+
if (types.map[message.type] !== types.defaultType) {
721+
return this.emit('error', new ShareDBError(
722+
ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH,
723+
message.type + ' does not match the server default type'
724+
));
725+
}
726+
if (typeof message.id !== 'string') {
727+
return this.emit('error', new ShareDBError(
728+
ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED,
729+
'Client id must be a string'
730+
));
731+
}
732+
this.id = message.id;
733+
734+
this._setState('connected');
735+
};

lib/client/doc.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ var emitter = require('../emitter');
22
var logger = require('../logger');
33
var ShareDBError = require('../error');
44
var types = require('../types');
5+
var util = require('../util');
56

67
var ERROR_CODE = ShareDBError.CODES;
78

@@ -615,9 +616,8 @@ Doc.prototype._otApply = function(op, source) {
615616

616617
// Actually send op to the server.
617618
Doc.prototype._sendOp = function() {
618-
// Wait until we have a src id from the server
619+
if (!this.connection.canSend) return;
619620
var src = this.connection.id;
620-
if (!src) return;
621621

622622
// When there is no inflightOp, send the first item in pendingOps. If
623623
// there is inflightOp, try sending it again
@@ -642,7 +642,16 @@ Doc.prototype._sendOp = function() {
642642
// reconnect, since an op may still be pending after the reconnection and
643643
// this.connection.id will change. In case an op is sent multiple times, we
644644
// also need to be careful not to override the original seq value.
645-
if (op.seq == null) op.seq = this.connection.seq++;
645+
if (op.seq == null) {
646+
if (this.connection.seq >= util.MAX_SAFE_INTEGER) {
647+
return this.emit('error', new ShareDBError(
648+
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
649+
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
650+
));
651+
}
652+
653+
op.seq = this.connection.seq++;
654+
}
646655

647656
this.connection.sendOp(this, op);
648657

lib/error.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ ShareDBError.CODES = {
1616
ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT: 'ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT',
1717
ERR_APPLY_SNAPSHOT_NOT_PROVIDED: 'ERR_APPLY_SNAPSHOT_NOT_PROVIDED',
1818
ERR_CLIENT_ID_BADLY_FORMED: 'ERR_CLIENT_ID_BADLY_FORMED',
19+
ERR_CONNECTION_SEQ_INTEGER_OVERFLOW: 'ERR_CONNECTION_SEQ_INTEGER_OVERFLOW',
1920
ERR_CONNECTION_STATE_TRANSITION_INVALID: 'ERR_CONNECTION_STATE_TRANSITION_INVALID',
2021
ERR_DATABASE_ADAPTER_NOT_FOUND: 'ERR_DATABASE_ADAPTER_NOT_FOUND',
2122
ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE: 'ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE',

lib/util.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ exports.isValidVersion = function(version) {
2222
exports.isValidTimestamp = function(timestamp) {
2323
return exports.isValidVersion(timestamp);
2424
};
25+
26+
exports.MAX_SAFE_INTEGER = 9007199254740991;

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"lolex": "^5.1.1",
1919
"mocha": "^6.2.2",
2020
"nyc": "^14.1.1",
21+
"sharedb-legacy": "npm:sharedb@=1.1.0",
2122
"sinon": "^7.5.0"
2223
},
2324
"scripts": {

test/client/connection.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
var expect = require('chai').expect;
22
var Backend = require('../../lib/backend');
33
var Connection = require('../../lib/client/connection');
4+
var LegacyConnection = require('sharedb-legacy/lib/client').Connection;
5+
var StreamSocket = require('../../lib/stream-socket');
46

57
describe('client connection', function() {
68
beforeEach(function() {
@@ -197,4 +199,66 @@ describe('client connection', function() {
197199
});
198200
});
199201
});
202+
203+
it('persists its id and seq when reconnecting', function(done) {
204+
var backend = this.backend;
205+
backend.connect(null, null, function(connection) {
206+
var id = connection.id;
207+
expect(id).to.be.ok;
208+
var doc = connection.get('test', '123');
209+
doc.create({foo: 'bar'}, function(error) {
210+
if (error) return done(error);
211+
expect(connection.seq).to.equal(2);
212+
connection.close();
213+
backend.connect(connection, null, function() {
214+
expect(connection.id).to.equal(id);
215+
expect(connection.seq).to.equal(2);
216+
done();
217+
});
218+
});
219+
});
220+
});
221+
222+
it('still connects to legacy clients, whose ID changes on reconnection', function(done) {
223+
var currentBackend = this.backend;
224+
var socket = new StreamSocket();
225+
var legacyClient = new LegacyConnection(socket);
226+
currentBackend.connect(legacyClient);
227+
228+
var doc = legacyClient.get('test', '123');
229+
doc.create({foo: 'bar'}, function(error) {
230+
if (error) return done(error);
231+
var initialId = legacyClient.id;
232+
expect(initialId).to.equal(legacyClient.agent.clientId);
233+
expect(legacyClient.agent.src).to.be.null;
234+
legacyClient.close();
235+
currentBackend.connect(legacyClient);
236+
doc.submitOp({p: ['baz'], oi: 'qux'}, function(error) {
237+
if (error) return done(error);
238+
var newId = legacyClient.id;
239+
expect(newId).not.to.equal(initialId);
240+
expect(newId).to.equal(legacyClient.agent.clientId);
241+
expect(legacyClient.agent.src).to.be.null;
242+
done();
243+
});
244+
});
245+
});
246+
247+
it('errors when submitting an op with a very large seq', function(done) {
248+
this.backend.connect(null, null, function(connection) {
249+
var doc = connection.get('test', '123');
250+
doc.create({foo: 'bar'}, function(error) {
251+
if (error) return done(error);
252+
connection.sendOp(doc, {
253+
op: {p: ['foo'], od: 'bar'},
254+
src: connection.id,
255+
seq: Number.MAX_SAFE_INTEGER
256+
});
257+
doc.once('error', function(error) {
258+
expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW');
259+
done();
260+
});
261+
});
262+
});
263+
});
200264
});

test/client/doc.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ describe('Doc', function() {
4949
doc.destroy();
5050
});
5151

52+
it('errors when trying to set a very large seq', function(done) {
53+
var connection = this.connection;
54+
connection.seq = Number.MAX_SAFE_INTEGER;
55+
var doc = connection.get('dogs', 'fido');
56+
doc.create({name: 'fido'});
57+
doc.once('error', function(error) {
58+
expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW');
59+
done();
60+
});
61+
});
62+
5263
describe('when connection closed', function() {
5364
beforeEach(function(done) {
5465
this.op1 = [{p: ['snacks'], oi: true}];

0 commit comments

Comments
 (0)