Skip to content

Commit ddc9a38

Browse files
authored
Merge pull request #13 from jsonjoy-com/text-demo
Text demo
2 parents e610af1 + cc43257 commit ddc9a38

File tree

9 files changed

+223
-16
lines changed

9 files changed

+223
-16
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"demo:e2e:json-crdt-server:http1": "ts-node src/__demos__/json-crdt-server/main-http1.ts",
5757
"demo:e2e:json-crdt-server:uws": "ts-node src/__demos__/json-crdt-server/main-uws.ts",
5858
"demo:ui:json": "webpack serve --config ./src/__demos__/ui-json/webpack.config.js",
59+
"demo:ui:text": "webpack serve --config ./src/__demos__/ui-text/webpack.config.js",
5960
"start:json-crdt-server:http1": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js",
6061
"start": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js --exp-backoff-restart-delay=100",
6162
"coverage": "yarn test --collectCoverage",

src/__demos__/ui-json/main.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {Model, Patch} from 'json-joy/lib/json-crdt';
99
const repo = new JsonCrdtRepo({
1010
wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc',
1111
});
12-
const id = 'block-sync-ui-demo-id';
12+
const id = 'block-sync-ui-demo-json';
1313
const session = repo.make(id);
1414

1515
const model = session.model;

src/__demos__/ui-text/main.tsx

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import * as React from 'react';
2+
import * as ReactDOM from 'react-dom/client';
3+
import {JsonCrdtRepo} from '../../json-crdt-repo/JsonCrdtRepo';
4+
import {bind} from 'collaborative-input';
5+
import {Model, Patch} from 'json-joy/lib/json-crdt';
6+
7+
/* tslint:disable no-console */
8+
9+
const main = async () => {
10+
const repo = new JsonCrdtRepo({
11+
wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc',
12+
});
13+
const id = 'block-sync-ui-demo-text-3';
14+
const session = await repo.sessions.load({id: [id], make: {}, remote: {timeout: 1000}});
15+
const model = session.model;
16+
const view = model.view();
17+
if (typeof view !== 'string') model.api.root('');
18+
19+
const Demo: React.FC = () => {
20+
const [remote, setRemote] = React.useState<Model | null>(null);
21+
const ref = React.useRef<HTMLTextAreaElement | null>(null);
22+
React.useLayoutEffect(() => {
23+
if (!ref.current) return;
24+
const unbind = bind(() => model.api.str([]), ref.current);
25+
return () => unbind();
26+
}, []);
27+
28+
return (
29+
<div style={{padding: 32}}>
30+
<textarea ref={ref} style={{width: 600, height: 300}}></textarea>
31+
<hr />
32+
<button
33+
onClick={async () => {
34+
const {block} = await repo.remote.read(id);
35+
const model = Model.fromBinary(block.snapshot.blob);
36+
for (const batch of block.tip)
37+
for (const patch of batch.patches) model.applyPatch(Patch.fromBinary(patch.blob));
38+
setRemote(model);
39+
}}
40+
>
41+
Load remote state
42+
</button>
43+
<br />
44+
{!!remote && (
45+
<code style={{fontSize: 8}}>
46+
<pre>{remote.toString()}</pre>
47+
</code>
48+
)}
49+
</div>
50+
);
51+
};
52+
53+
const div = document.createElement('div');
54+
document.body.appendChild(div);
55+
const root = ReactDOM.createRoot(div);
56+
root.render(<Demo />);
57+
};
58+
59+
main().catch(() => {});
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
const path = require('path');
2+
const HtmlWebpackPlugin = require('html-webpack-plugin');
3+
4+
module.exports = {
5+
mode: 'development',
6+
devtool: 'inline-source-map',
7+
entry: {
8+
bundle: __dirname + '/main',
9+
},
10+
plugins: [
11+
new HtmlWebpackPlugin({
12+
title: 'Development',
13+
}),
14+
],
15+
module: {
16+
rules: [
17+
{
18+
test: /\.tsx?$/,
19+
exclude: /node_modules/,
20+
loader: 'ts-loader',
21+
},
22+
],
23+
},
24+
resolve: {
25+
extensions: ['.tsx', '.ts', '.js'],
26+
},
27+
output: {
28+
filename: '[name].js',
29+
path: path.resolve(__dirname, '../../dist'),
30+
},
31+
devServer: {
32+
port: 9949,
33+
hot: false,
34+
},
35+
};

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ export class LevelLocalRepo implements LocalRepo {
715715
} catch (error) {
716716
if (error instanceof Error && error.message === 'EXISTS')
717717
// TODO: make sure reset does not happen, if models are the same.
718+
// TODO: Check for `req.time` in `_syncRead`.
718719
return await this._syncRead(id);
719720
throw error;
720721
}
@@ -732,6 +733,16 @@ export class LevelLocalRepo implements LocalRepo {
732733

733734
private async _syncMerge(req: LocalRepoSyncRequest): Promise<LocalRepoSyncResponse> {
734735
const {id, patches} = req;
736+
let lastKnownTime: number = 0;
737+
const reqTime = req.time;
738+
if (typeof reqTime === 'number') {
739+
lastKnownTime = reqTime;
740+
const firstPatch = patches?.[0];
741+
if (firstPatch?.getId()?.sid === SESSION.GLOBAL) lastKnownTime = firstPatch.getId()!.time + firstPatch.span() - 1;
742+
} else if (patches?.length) {
743+
const firstPatchTime = patches?.[0]?.getId()?.time;
744+
if (typeof firstPatchTime === 'number') lastKnownTime = firstPatchTime - 1;
745+
}
735746
const keyBase = await this.blockKeyBase(id);
736747
if (!patches || !patches.length) throw new Error('EMPTY_BATCH');
737748
const writtenPatches: Uint8Array[] = [];
@@ -740,10 +751,12 @@ export class LevelLocalRepo implements LocalRepo {
740751
// TODO: Return correct response.
741752
// TODO: Check that remote state is in sync, too.
742753
let needsReset = false;
754+
let cursorBehind = false;
743755
const didPush = await this.lockBlock(keyBase, async () => {
744756
const [tip, meta] = await Promise.all([this.readFrontierTip(keyBase), this.readMeta(keyBase)]);
745-
if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) needsReset = true;
757+
if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) cursorBehind = true;
746758
let nextTick = meta.time + 1;
759+
if (lastKnownTime < meta.time) needsReset = true;
747760
cursor = meta.seq;
748761
if (tip) {
749762
const tipTime = tip.getId()?.time ?? 0;
@@ -775,18 +788,18 @@ export class LevelLocalRepo implements LocalRepo {
775788
const op: BinStrLevelOperation = {type: 'put', key: patchKey, value: uint8};
776789
ops.push(op);
777790
}
791+
if (writtenPatches.length) {
792+
this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session});
793+
}
778794
if (ops.length) {
779795
await this.kv.batch(ops);
780796
return true;
781797
}
782798
return false;
783799
});
784-
if (writtenPatches.length) {
785-
this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session});
786-
}
787800
if (!didPush && !needsReset) {
788801
const merge = await this.readFrontier0(keyBase);
789-
return {cursor, merge};
802+
return {cursor, merge, cursorBehind};
790803
}
791804
const remote = this.markDirtyAndSync(keyBase, id)
792805
.then(() => {})
@@ -796,9 +809,9 @@ export class LevelLocalRepo implements LocalRepo {
796809
});
797810
if (needsReset) {
798811
const {cursor, model} = await this._syncRead0(keyBase);
799-
return {cursor, model, remote};
812+
return {cursor, model, remote, cursorBehind};
800813
}
801-
return {cursor, remote};
814+
return {cursor, remote, cursorBehind};
802815
}
803816

804817
protected async readLocal0(keyBase: string): Promise<[model: Model, cursor: LevelLocalRepoCursor]> {

src/json-crdt-repo/local/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ export interface LocalRepoSyncRequest {
8787
*/
8888
id: BlockId;
8989

90+
/**
91+
* Logical clock time of the local operations which the client has caught up
92+
* to.
93+
*/
94+
time?: number;
95+
9096
/**
9197
* The last known cursor returned in the `.sync()` call response. The cursor
9298
* should be omitted in the first `.sync()` call, and then set to the value
@@ -115,6 +121,13 @@ export interface LocalRepoSyncResponse {
115121
*/
116122
cursor: undefined | unknown;
117123

124+
/**
125+
* Set to true if the client is behind the remote. When true, the client
126+
* should call `.getIf()` after a short wait period to check if the remote
127+
* is indeed ahead.
128+
*/
129+
cursorBehind?: boolean;
130+
118131
/**
119132
* Model snapshot that the client should reset its "start" state to. The
120133
* `Model` is sent when the *sync* call detects that the client is behind the

src/json-crdt-repo/session/EditSession.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,14 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
8787
const length = patches.length;
8888
// TODO: After async call check that sync state is still valid. New patches, might have been added.
8989
if (length || this.cursor === undefined) {
90-
const res = await this.repo.sync({id: this.id, patches, cursor: this.cursor, session: this.session});
90+
const time = this.start.clock.time - 1;
91+
const res = await this.repo.sync({
92+
id: this.id,
93+
patches,
94+
time,
95+
cursor: this.cursor,
96+
session: this.session,
97+
});
9198
if (this._stopped) return null;
9299
// TODO: After sync call succeeds, remove the patches from the log.
93100
if (length) {
@@ -96,16 +103,33 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
96103
if (lastId) this.log.advanceTo(lastId);
97104
this.start.applyBatch(patches);
98105
}
99-
if (typeof res.cursor !== undefined) this.cursor = res.cursor;
106+
// "cursor" shall not be returned from .sync() call. The cursor shall update
107+
// only when remote model changes are received, during local .sync() write
108+
// only the local model is updated.
109+
if (typeof res.cursor !== undefined) {
110+
this.cursor = res.cursor;
111+
}
100112
if (res.model) {
101113
this._syncRace(() => {
102114
this.reset(<any>res.model!);
103115
});
104-
} else if (res.merge) {
116+
} else if (res.merge && res.merge) {
105117
this._syncRace(() => {
106118
this.merge(<any>res.merge!);
107119
});
108120
}
121+
if (res.cursorBehind) {
122+
setTimeout(async () => {
123+
if (this._stopped) return;
124+
const get = await this.repo.getIf({
125+
id: this.id,
126+
cursor: this.cursor,
127+
});
128+
if (this._stopped) return;
129+
if (!get) return;
130+
this.reset(<any>get.model);
131+
}, 50);
132+
}
109133
return {remote: res.remote};
110134
} else {
111135
const res = await this.repo.getIf({id: this.id, time: this.model.clock.time - 1, cursor: this.cursor});
@@ -125,9 +149,13 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
125149
public syncLog(): void {
126150
if (!this.log.patches.size()) return;
127151
this._syncRace(() => {
128-
this.sync().then((error) => {
129-
this.onsyncerror?.(error);
130-
});
152+
this.sync()
153+
.then((error) => {
154+
this.onsyncerror?.(error);
155+
})
156+
.catch((error) => {
157+
this.onsyncerror?.(error);
158+
});
131159
});
132160
}
133161

@@ -209,6 +237,12 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
209237

210238
private onEvent = (event: LocalRepoEvent): void => {
211239
if (this._stopped) return;
240+
if ((event as LocalRepoMergeEvent).merge) {
241+
const cursor = (event as LocalRepoMergeEvent).cursor;
242+
if (cursor !== undefined) {
243+
this.cursor = cursor;
244+
}
245+
}
212246
if ((event as LocalRepoRebaseEvent).rebase) {
213247
if ((event as LocalRepoRebaseEvent).session === this.session) return;
214248
}

src/json-crdt-repo/session/EditSessionFactory.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,14 @@ export class EditSessionFactory {
6666
session.log.end.api.autoFlush();
6767
return session;
6868
} catch (error) {
69-
if (error instanceof Error && error.message === 'TIMEOUT') {
69+
if (!!error && typeof error === 'object' && (error as Record<string, unknown>).message === 'TIMEOUT') {
7070
if (!opts.make) throw error;
71-
} else if (error instanceof Error && error.message === 'NOT_FOUND') {
71+
} else if (
72+
!!error &&
73+
typeof error === 'object' &&
74+
((error as Record<string, unknown>).message === 'NOT_FOUND' ||
75+
(error as Record<string, unknown>).code === 'NOT_FOUND')
76+
) {
7277
if (remote.throwIf === 'missing') throw error;
7378
} else throw error;
7479
}

src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,38 @@ describe('sync', () => {
118118
await kit.stop();
119119
});
120120

121+
test('sessions created just after the first one, converges in state', async () => {
122+
const kit = await setup();
123+
const schema = s.obj({a: s.con('a')});
124+
const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1});
125+
const {session: session2} = kit.sessions.make({id: kit.blockId, schema, session: 2});
126+
await session1.sync();
127+
await session2.sync();
128+
expect(session1.log.patches.size()).toBe(0);
129+
session1.model.api.obj([]).set({b: 'b'});
130+
session1.model.api.obj([]).set({c: 'c'});
131+
await tick(5);
132+
session1.model.api.obj([]).set({d: 'd'});
133+
const {session: session3} = kit.sessions.make({id: kit.blockId, pull: true, schema, session: 3});
134+
await tick(5);
135+
session1.model.api.obj([]).set({e: 'e'});
136+
await tick(5);
137+
session1.model.api.obj([]).set({f: 'f'});
138+
await session1.sync();
139+
await until(async () => {
140+
try {
141+
expect(session1.model.view()).toEqual(session3.model.view());
142+
return true;
143+
} catch {
144+
return false;
145+
}
146+
});
147+
await session1.dispose();
148+
await session2.dispose();
149+
await session3.dispose();
150+
await kit.stop();
151+
});
152+
121153
test('sessions converge to the same view', async () => {
122154
const kit = await setup();
123155
const schema = s.obj({a: s.con('a')});
@@ -141,7 +173,21 @@ describe('sync', () => {
141173
await until(async () => {
142174
try {
143175
expect(session1.model.view()).toEqual(session2.model.view());
176+
return true;
177+
} catch {
178+
return false;
179+
}
180+
});
181+
await until(async () => {
182+
try {
144183
expect(session1.model.view()).toEqual(session3.model.view());
184+
return true;
185+
} catch {
186+
return false;
187+
}
188+
});
189+
await until(async () => {
190+
try {
145191
expect(session1.model.view()).toEqual(session4.model.view());
146192
return true;
147193
} catch {
@@ -155,6 +201,7 @@ describe('sync', () => {
155201
await session1.dispose();
156202
await session2.dispose();
157203
await session3.dispose();
204+
await session4.dispose();
158205
await kit.stop();
159206
});
160207
});

0 commit comments

Comments
 (0)