Skip to content

Commit fd9c332

Browse files
committed
refact(storage): remove dnode dependency and implement our own RPC storage protocol
Refs screeps/screeps#66
1 parent fecd509 commit fd9c332

File tree

5 files changed

+187
-106
lines changed

5 files changed

+187
-106
lines changed

index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
const dnode = require('dnode');
21
const q = require('q');
32
const net = require('net');
43
const path = require('path');
54
const _ = require('lodash');
65

76
exports.configManager = require('./lib/config-manager');
87
exports.storage = require('./lib/storage');
8+
exports.rpc = require('./lib/rpc');
99

1010
exports.findPort = function findPort(port) {
1111
var defer = q.defer();

lib/rpc.js

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
const { Writable } = require('stream');
2+
const q = require('q');
3+
const { EventEmitter } = require('events');
4+
5+
class JSONFrameStream extends Writable {
6+
7+
constructor(handler, options) {
8+
super(options);
9+
this.handler = handler;
10+
this.frame = null;
11+
}
12+
_write(chunk, encoding, callback) {
13+
this._parse(chunk);
14+
callback();
15+
}
16+
_parse(buffer) {
17+
if(!buffer.length) {
18+
return;
19+
}
20+
if(!this.frame) {
21+
this.frame = {
22+
data: Buffer.alloc(4),
23+
pointer: 0
24+
};
25+
}
26+
if(!this.frame.size) {
27+
const length = Math.min(buffer.length, 4 - this.frame.pointer);
28+
buffer.copy(this.frame.data, this.frame.pointer, 0, length);
29+
this.frame.pointer += length;
30+
if(this.frame.pointer === 4) {
31+
this.frame.size = this.frame.data.readUInt32BE();
32+
this.frame.data = Buffer.alloc(this.frame.size);
33+
this.frame.pointer = 0;
34+
}
35+
return this._parse(buffer.slice(length));
36+
}
37+
else {
38+
const length = Math.min(buffer.length, this.frame.size - this.frame.pointer);
39+
buffer.copy(this.frame.data, this.frame.pointer, 0, length);
40+
this.frame.pointer += length;
41+
if (this.frame.pointer == this.frame.size) {
42+
this.handler(JSON.parse(this.frame.data.toString('utf8')));
43+
this.frame = null;
44+
}
45+
return this._parse(buffer.slice(length));
46+
}
47+
}
48+
static makeFrame(obj) {
49+
var data = Buffer.from(JSON.stringify(obj), 'utf8');
50+
var length = Buffer.alloc(4);
51+
length.writeUInt32BE(data.length);
52+
return Buffer.concat([length, data]);
53+
}
54+
}
55+
56+
class RpcServer {
57+
constructor(socket, methods) {
58+
this.socket = socket;
59+
this.socket.pipe(new JSONFrameStream(this._processFrame.bind(this)));
60+
this.methods = methods;
61+
}
62+
63+
_processFrame(obj) {
64+
let args = obj.args || [];
65+
if(obj.method == 'subscribe') {
66+
this.methods.subscribe(obj.channel, (pubsub) => {
67+
this.socket.write(JSONFrameStream.makeFrame({pubsub}));
68+
});
69+
return;
70+
}
71+
this.methods[obj.method].apply(null, args.concat([(error, result) => {
72+
let response = {id: obj.id};
73+
if(error) {
74+
response.error = error;
75+
}
76+
else {
77+
response.result = result;
78+
}
79+
this.socket.write(JSONFrameStream.makeFrame(response));
80+
}]));
81+
}
82+
}
83+
84+
class RpcClient {
85+
constructor(socket) {
86+
this.socket = socket;
87+
this.socket.pipe(new JSONFrameStream(this._processFrame.bind(this)));
88+
this.requestId = 0;
89+
this.defers = new Map();
90+
this.pubsub = new EventEmitter();
91+
}
92+
_processFrame(obj) {
93+
if(obj.pubsub) {
94+
this.pubsub.emit(obj.pubsub.channel, obj.pubsub.channel, obj.pubsub.data);
95+
this.pubsub.emit('*', obj.pubsub.channel, obj.pubsub.data);
96+
return;
97+
}
98+
if(!this.defers.has(obj.id)) {
99+
console.error('invalid request id',obj.id);
100+
return;
101+
}
102+
if(obj.error) {
103+
this.defers.get(obj.id).reject(obj.error);
104+
}
105+
else {
106+
this.defers.get(obj.id).resolve(obj.result);
107+
}
108+
this.defers.delete(obj.id);
109+
}
110+
request(method, ...args) {
111+
this.requestId++;
112+
let request = {
113+
id: this.requestId,
114+
method,
115+
args
116+
};
117+
this.socket.write(JSONFrameStream.makeFrame(request));
118+
let defer = q.defer();
119+
this.defers.set(this.requestId, defer);
120+
return defer.promise;
121+
}
122+
subscribe(channel, cb) {
123+
let request = {
124+
method: 'subscribe',
125+
channel
126+
};
127+
this.socket.write(JSONFrameStream.makeFrame(request));
128+
this.pubsub.addListener(channel, cb);
129+
}
130+
}
131+
132+
exports.JSONFrameStream = JSONFrameStream;
133+
exports.RpcServer = RpcServer;
134+
exports.RpcClient = RpcClient;

lib/storage.js

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
const dnode = require('dnode');
21
const q = require('q');
32
const net = require('net');
43
const config = require('./config-manager').config;
4+
const { RpcClient } = require('./rpc');
55

66
config.common.dbCollections = [
77
'leaderboard.power',
@@ -69,8 +69,7 @@ exports._connect = function storageConnect() {
6969
console.log('Connecting to storage');
7070

7171
var socket = net.connect(process.env.STORAGE_PORT, process.env.STORAGE_HOST);
72-
var d = dnode();
73-
socket.pipe(d).pipe(socket);
72+
var rpcClient = new RpcClient(socket);
7473

7574
var defer = q.defer();
7675
var resetDefer = q.defer();
@@ -85,64 +84,64 @@ exports._connect = function storageConnect() {
8584
return fn;
8685
}
8786

88-
d.on('remote', remote => {
89-
90-
function wrapCollection(collectionName) {
91-
var wrap = {};
92-
['find', 'findOne','by','clear','count','ensureIndex','removeWhere','insert'].forEach(method => {
93-
wrap[method] = function() {
94-
return q.ninvoke(remote, 'dbRequest', collectionName, method, Array.prototype.slice.call(arguments));
95-
}
96-
});
97-
wrap.update = resetInterceptor(function(query, update, params) {
98-
return q.ninvoke(remote, 'dbUpdate', collectionName, query, update, params);
99-
});
100-
wrap.bulk = resetInterceptor(function(bulk) {
101-
return q.ninvoke(remote, 'dbBulk', collectionName, bulk);
102-
});
103-
wrap.findEx = resetInterceptor(function(query, opts) {
104-
return q.ninvoke(remote, 'dbFindEx', collectionName, query, opts);
105-
});
106-
return wrap;
107-
}
108-
109-
config.common.dbCollections.forEach(i => exports.db[i] = wrapCollection(i));
11087

111-
exports.resetAllData = () => q.ninvoke(remote, 'dbResetAllData');
112-
113-
Object.assign(exports.queue, {
114-
fetch: resetInterceptor(q.nbind(remote.queueFetch, remote)),
115-
add: resetInterceptor(q.nbind(remote.queueAdd, remote)),
116-
addMulti: resetInterceptor(q.nbind(remote.queueAddMulti, remote)),
117-
markDone: resetInterceptor(q.nbind(remote.queueMarkDone, remote)),
118-
whenAllDone: resetInterceptor(q.nbind(remote.queueWhenAllDone, remote)),
119-
reset: resetInterceptor(q.nbind(remote.queueReset, remote))
88+
function wrapCollection(collectionName) {
89+
var wrap = {};
90+
['find', 'findOne','by','clear','count','ensureIndex','removeWhere','insert'].forEach(method => {
91+
wrap[method] = function() {
92+
return rpcClient.request('dbRequest', collectionName, method, Array.prototype.slice.call(arguments));
93+
}
12094
});
121-
122-
Object.assign(exports.env, {
123-
get: resetInterceptor(q.nbind(remote.dbEnvGet, remote)),
124-
mget: resetInterceptor(q.nbind(remote.dbEnvMget, remote)),
125-
set: resetInterceptor(q.nbind(remote.dbEnvSet, remote)),
126-
setex: resetInterceptor(q.nbind(remote.dbEnvSetex, remote)),
127-
expire: resetInterceptor(q.nbind(remote.dbEnvExpire, remote)),
128-
ttl: resetInterceptor(q.nbind(remote.dbEnvTtl, remote)),
129-
del: resetInterceptor(q.nbind(remote.dbEnvDel, remote)),
130-
hmget: resetInterceptor(q.nbind(remote.dbEnvHmget, remote)),
131-
hmset: resetInterceptor(q.nbind(remote.dbEnvHmset, remote)),
132-
hget: resetInterceptor(q.nbind(remote.dbEnvHget, remote)),
133-
hset: resetInterceptor(q.nbind(remote.dbEnvHset, remote))
95+
wrap.update = resetInterceptor(function(query, update, params) {
96+
return rpcClient.request('dbUpdate', collectionName, query, update, params);
13497
});
135-
136-
Object.assign(exports.pubsub, {
137-
publish: resetInterceptor(q.nbind(remote.publish, remote)),
138-
subscribe: (channel, cb) => remote.subscribe(channel, cb)
98+
wrap.bulk = resetInterceptor(function(bulk) {
99+
return rpcClient.request('dbBulk', collectionName, bulk);
100+
});
101+
wrap.findEx = resetInterceptor(function(query, opts) {
102+
return rpcClient.request('dbFindEx', collectionName, query, opts);
139103
});
104+
return wrap;
105+
}
106+
107+
config.common.dbCollections.forEach(i => exports.db[i] = wrapCollection(i));
140108

141-
exports._connected = true;
109+
exports.resetAllData = () => rpcClient.request('dbResetAllData');
142110

143-
defer.resolve();
111+
Object.assign(exports.queue, {
112+
fetch: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueFetch')),
113+
add: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueAdd')),
114+
addMulti: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueAddMulti')),
115+
markDone: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueMarkDone')),
116+
whenAllDone: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueWhenAllDone')),
117+
reset: resetInterceptor(rpcClient.request.bind(rpcClient, 'queueReset'))
144118
});
145119

120+
Object.assign(exports.env, {
121+
get: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvGet')),
122+
mget: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvMget')),
123+
set: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvSet')),
124+
setex: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvSetex')),
125+
expire: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvExpire')),
126+
ttl: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvTtl')),
127+
del: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvDel')),
128+
hmget: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvHmget')),
129+
hmset: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvHmset')),
130+
hget: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvHget')),
131+
hset: resetInterceptor(rpcClient.request.bind(rpcClient, 'dbEnvHset'))
132+
});
133+
134+
Object.assign(exports.pubsub, {
135+
publish: resetInterceptor(rpcClient.request.bind(rpcClient, 'publish')),
136+
subscribe(channel, cb) {
137+
rpcClient.subscribe(channel, cb);
138+
}
139+
});
140+
141+
exports._connected = true;
142+
143+
defer.resolve();
144+
146145
socket.on('error', err => {
147146
console.error('Storage connection lost', err);
148147
resetDefer.resolve('reset');

package-lock.json

Lines changed: 0 additions & 51 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
"url": "https://github.com/screeps/common.git"
1111
},
1212
"dependencies": {
13-
"dnode": "^1.2.2",
1413
"eslint": "^5.12.1",
1514
"lodash": "^3.10.1",
1615
"q": "^1.4.1"

0 commit comments

Comments
 (0)