Skip to content

Commit acd6638

Browse files
authored
Replace language server WebSocket connection with Ydoc (#14438)
- close #14142 PR replaces binary and JSON-RPC WebSockets with Yjs based communication. ``` +------------------+ +--------------------------------------------------------------+ | | | | | +--------------+ | WebSocket | +-----------------+ YjsChannel.Server +-----------------+ | | | IDE Client | |<-------------->| | Ydoc Server |<-------------------->| Language Server | | | +--------------+ | | +-----------------+ +-----------------+ | | Electron | | GraalVM | +------------------+ +--------------------------------------------------------------+ ``` Workflow: - IDE client creates a `YjsChannel` backed by Yjs `Y.Array` allowing bidirectional message passing - IDE client connects to the Ydoc server using the [Yjs WebSocket provider](https://docs.yjs.dev/ecosystem/connection-provider/y-websocket) - Ydoc server creates a corresponding `YjsChannel` that is synchronized with the client's channel and passes it to the Language server via `YjsChannelCallbacks` callback This way IDE client and the Language Server establish `YjsChannel` connection and start exchanging messages.
1 parent abe8c88 commit acd6638

File tree

101 files changed

+4656
-702
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+4656
-702
lines changed

.bazelignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ app/lezer-markdown/node_modules
88
app/project-manager-shim/node_modules
99
app/table-expression/node_modules
1010
app/rust-ffi/node_modules
11+
app/ydoc-channel/node_modules
1112
app/ydoc-server/node_modules
1213
app/ydoc-server-nodejs/node_modules
1314
app/ydoc-server-polyglot/node_modules

BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ write_source_files(
5353
"//app/lezer-markdown:write_tsconfigs",
5454
"//app/project-manager-shim:write_tsconfigs",
5555
"//app/table-expression:write_tsconfigs",
56+
"//app/ydoc-channel:write_tsconfigs",
5657
"//app/ydoc-server:write_tsconfigs",
5758
"//app/ydoc-shared:write_tsconfigs",
5859
],
@@ -176,6 +177,7 @@ ENGINE_DIST_SOURCES = STDLIB_SOURCES + SBT_PROJECT_FILES + [
176177
"lib/java/poi-wrapper/src/**",
177178
"lib/java/runtime-utils/src/**",
178179
"lib/java/scala-libs-wrapper/src/**",
180+
"lib/java/ydoc-api/src/**",
179181
"lib/java/ydoc-polyfill/src/**",
180182
"lib/java/ydoc-server/src/**",
181183
"lib/java/ydoc-server-registration/src/**",

app/gui/integration-test/mock/dataServer.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,16 @@ export function mockDataWSHandler(
106106
response = createError(builder, LanguageServerErrorCode.NotFile, 'Invalid Path')
107107
break
108108
}
109-
const file = await readFile(pathSegments(path))
109+
let segments: (string | null)[]
110+
try {
111+
segments = pathSegments(path)
112+
} catch {
113+
// If path parsing fails, return file not found
114+
// This can happen with malformed messages (e.g., visualization data being misinterpreted)
115+
response = createError(builder, LanguageServerErrorCode.FileNotFound, 'File not found')
116+
break
117+
}
118+
const file = await readFile(segments)
110119
if (!file) {
111120
response = createError(builder, LanguageServerErrorCode.FileNotFound, 'File not found')
112121
break

app/gui/integration-test/mock/localApi.ts

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { test } from 'integration-test/base'
2626
import { uuidv4 } from 'lib0/random.js'
2727
import { join } from 'node:path'
2828
import type { Page, WebSocketRoute } from 'playwright'
29+
import { YjsChannel } from 'ydoc-channel'
2930
import { WSSharedDoc, YjsConnection, type YjsSocket } from 'ydoc-server'
3031
import { makeVisUpdates, mockDataHandler, mockLSHandler, mockYdocProvider } from './lsHandler'
3132

@@ -37,6 +38,8 @@ const ROOT_PARENT_PATH = Path('/home/user/enso')
3738
const ROOT_PATH = Path('/home/user/enso/enso-projects')
3839
const DOWNLOAD_PATH = Path('/home/user/enso/Downloads')
3940

41+
// These addresses are kept for backward compatibility in OpenProject response
42+
// but the actual communication now flows through YjsChannels
4043
const languageServerJsonAddress = { host: '127.0.0.1', port: 1235 }
4144
const languageServerBinaryAddress = { host: '127.0.0.1', port: 1234 }
4245
const languageServerYdocAddress = { host: '127.0.0.1', port: 1233 }
@@ -241,7 +244,8 @@ export async function mockLocalApi(page: Page) {
241244
addDirectory({ path: ROOT_PATH })
242245
addDirectory({ path: DOWNLOAD_PATH })
243246

244-
let languageServerBinaryWs: WebSocketRoute | null = null
247+
// Data channel for visualization updates, set up when client connects
248+
let dataChannel: YjsChannel<Uint8Array> | null = null
245249

246250
await test.step('Mock Local API', async () => {
247251
const toJSONRPCResult = (result: unknown): JSONRPCResponse<unknown> => ({
@@ -340,40 +344,6 @@ export async function mockLocalApi(page: Page) {
340344
})
341345
})
342346

343-
await page.routeWebSocket(
344-
`ws://${languageServerBinaryAddress.host}:${languageServerBinaryAddress.port}/`,
345-
(ws) => {
346-
languageServerBinaryWs = ws
347-
ws.onMessage(async (messageRaw) => {
348-
const response = await mockDataHandler(new Uint8Array(Buffer.from(messageRaw)).buffer)
349-
if (response) {
350-
ws.send(Buffer.from(response))
351-
}
352-
})
353-
},
354-
)
355-
await page.routeWebSocket(
356-
`ws://${languageServerJsonAddress.host}:${languageServerJsonAddress.port}/`,
357-
(ws) => {
358-
ws.onMessage(async (messageRaw) => {
359-
const { method, params, jsonrpc, id } = JSON.parse(messageRaw.toString())
360-
try {
361-
const result =
362-
(await mockLSHandler(
363-
method,
364-
params,
365-
(message) => ws.send(JSON.stringify({ jsonrpc, ...message })),
366-
(binaryData?: ArrayBuffer) => {
367-
if (binaryData) languageServerBinaryWs?.send(Buffer.from(binaryData))
368-
},
369-
)) ?? null
370-
ws.send(JSON.stringify({ jsonrpc, id, result }))
371-
} catch (error) {
372-
ws.send(JSON.stringify({ jsonrpc, id, error }))
373-
}
374-
})
375-
},
376-
)
377347
const ydocAddressBase = `ws://${languageServerYdocAddress.host}:${languageServerYdocAddress.port}`
378348

379349
class MockWs implements YjsSocket {
@@ -413,14 +383,76 @@ export async function mockLocalApi(page: Page) {
413383
}
414384
}
415385

386+
/** Set up the mock JSON-RPC channel handler with its associated data channel */
387+
function setupMockLsChannel(
388+
channel: YjsChannel<string>,
389+
associatedDataChannel: YjsChannel<Uint8Array> | null,
390+
) {
391+
channel.subscribe(async (messageRaw) => {
392+
const { method, params, jsonrpc, id } = JSON.parse(messageRaw)
393+
try {
394+
const result =
395+
(await mockLSHandler(
396+
method,
397+
params,
398+
(message) => channel.send(JSON.stringify({ jsonrpc, ...message })),
399+
(binaryData?: ArrayBuffer) => {
400+
// Use the associated data channel for this connection
401+
if (binaryData && associatedDataChannel) {
402+
associatedDataChannel.send(new Uint8Array(binaryData))
403+
}
404+
},
405+
)) ?? null
406+
channel.send(JSON.stringify({ jsonrpc, id, result }))
407+
} catch (error) {
408+
channel.send(JSON.stringify({ jsonrpc, id, error }))
409+
}
410+
})
411+
}
412+
413+
/** Set up the mock binary data channel handler */
414+
function setupMockDataChannel(channel: YjsChannel<Uint8Array>) {
415+
// Also set the global dataChannel for updateVisualization API
416+
dataChannel = channel
417+
channel.subscribe(async (messageRaw) => {
418+
// Important: Use slice to get a copy of just the relevant portion
419+
// because messageRaw.buffer may include data beyond the Uint8Array's view
420+
const data = messageRaw.buffer.slice(
421+
messageRaw.byteOffset,
422+
messageRaw.byteOffset + messageRaw.byteLength,
423+
) as ArrayBuffer
424+
const response = await mockDataHandler(data)
425+
if (response) {
426+
channel.send(new Uint8Array(response))
427+
}
428+
})
429+
}
430+
416431
await page.routeWebSocket(`${ydocAddressBase}/**`, (wsRoute) => {
417432
const parsedUrl = new URL(wsRoute.url())
418433
const room = parsedUrl.pathname.substring('/project/'.length)
434+
const lsChannelName = parsedUrl.searchParams.get('ls')
435+
const dataChannelName = parsedUrl.searchParams.get('data')
419436

420437
const mockWs = new MockWs(wsRoute)
421438
const wsDoc = new WSSharedDoc()
422439
const _connection = new YjsConnection(mockWs, wsDoc)
423440
mockYdocProvider(room, wsDoc.doc)
441+
442+
let binaryChannel: YjsChannel<Uint8Array> | null = null
443+
if (dataChannelName) {
444+
binaryChannel = new YjsChannel<Uint8Array>(wsDoc.doc, dataChannelName)
445+
// Only set the global dataChannel for the main 'index' room connection.
446+
// Subdoc connections should not overwrite it, as the client's DataServer
447+
// only listens on the main document's channel.
448+
if (room === 'index') {
449+
setupMockDataChannel(binaryChannel)
450+
}
451+
}
452+
if (lsChannelName) {
453+
const lsChannel = new YjsChannel<string>(wsDoc.doc, lsChannelName)
454+
setupMockLsChannel(lsChannel, binaryChannel)
455+
}
424456
})
425457

426458
await page.route('/api/root-directory-path', async (route, request) => {
@@ -638,7 +670,7 @@ export async function mockLocalApi(page: Page) {
638670

639671
async function updateVisualization(preprocessor: string, data: unknown) {
640672
for (const update of makeVisUpdates(preprocessor, data)) {
641-
languageServerBinaryWs?.send(Buffer.from(update))
673+
dataChannel?.send(new Uint8Array(update))
642674
}
643675
}
644676

app/gui/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
"y-protocols": "^1.0.6",
127127
"y-textarea": "^1.0.2",
128128
"y-websocket": "^1.5.4",
129+
"ydoc-channel": "workspace:*",
129130
"ydoc-shared": "workspace:*",
130131
"yjs": "^13.6.21",
131132
"zod": "catalog:",

app/gui/src/project-view/util/crdt.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ interface SubdocsEvent {
4747
export type ProviderParams = {
4848
/** URL for the project's language server RPC connection. */
4949
ls: string
50+
/** URL for the project's data connection. */
51+
data: string
5052
}
5153

5254
/** TODO: Add docs */

app/gui/src/project-view/util/net.ts

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,19 @@
11
import { onScopeDispose } from 'vue'
2+
import { YjsChannel } from 'ydoc-channel'
23
import { AbortScope } from 'ydoc-shared/util/net'
3-
import {
4-
ReconnectingWebSocket,
5-
ReconnectingWebSocketTransport,
6-
} from 'ydoc-shared/util/net/ReconnectingWSTransport'
4+
import { YjsTransport } from 'ydoc-shared/util/net/YjsTransport'
5+
import * as Y from 'yjs'
76

87
export { AbortScope }
98

10-
const WS_OPTIONS = {
11-
// We do not want to enqueue any messages, because after reconnecting we have to initProtocol again.
12-
maxEnqueuedMessages: 0,
13-
}
14-
159
/** TODO: Add docs */
16-
export function createRpcTransport(url: string): ReconnectingWebSocketTransport {
17-
return new ReconnectingWebSocketTransport(url, WS_OPTIONS)
10+
export function createRpcTransport(indexDoc: Y.Doc, url: string): YjsTransport {
11+
return new YjsTransport(indexDoc, url)
1812
}
1913

2014
/** TODO: Add docs */
21-
export function createDataWebsocket(url: string, binaryType: 'arraybuffer' | 'blob'): WebSocket {
22-
const websocket = new ReconnectingWebSocket(url, undefined, WS_OPTIONS)
23-
websocket.binaryType = binaryType
24-
return websocket as WebSocket
15+
export function createDataSocket(indexDoc: Y.Doc, url: string): YjsChannel {
16+
return new YjsChannel(indexDoc, url)
2517
}
2618

2719
export interface WebSocketHandler {

app/gui/src/project-view/util/net/dataServer.ts

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Err, Ok, type Result } from 'enso-common/src/utilities/data/result'
22
import { ObservableV2 } from 'lib0/observable'
3+
import type { YjsChannel } from 'ydoc-channel'
34
import {
45
Builder,
56
ByteBuffer,
@@ -57,23 +58,30 @@ export class DataServer extends ObservableV2<DataServerEvents> {
5758
private initializationScheduled = false
5859
resolveCallbacks = new Map<string, (data: any) => void>()
5960

60-
/** `websocket.binaryType` should be `ArrayBuffer`. */
61+
/** TODO: Add docs */
6162
constructor(
6263
public clientId: string,
63-
public websocket: WebSocket,
64+
public channel: YjsChannel,
6465
abort: AbortScope,
6566
) {
6667
super()
6768
abort.handleDispose(this)
6869

69-
websocket.addEventListener('message', ({ data: rawPayload }) => {
70-
if (!(rawPayload instanceof ArrayBuffer)) {
70+
channel.addEventListener('message', ({ data: rawPayload }) => {
71+
if (!ArrayBuffer.isView(rawPayload)) {
7172
console.warn('Data Server: Data type was invalid:', rawPayload)
7273
// Ignore all non-binary messages. If the messages are `Blob`s instead, this is a
7374
// misconfiguration and should also be ignored.
7475
return
7576
}
76-
const binaryMessage = OutboundMessage.getRootAsOutboundMessage(new ByteBuffer(rawPayload))
77+
const binaryMessage = OutboundMessage.getRootAsOutboundMessage(
78+
new ByteBuffer(
79+
rawPayload.buffer.slice(
80+
rawPayload.byteOffset,
81+
rawPayload.byteOffset + rawPayload.byteLength,
82+
),
83+
),
84+
)
7785
const payloadType = binaryMessage.payloadType()
7886
const payload = binaryMessage.payload(new PAYLOAD_CONSTRUCTOR[payloadType]())
7987
if (!payload) return
@@ -94,20 +102,19 @@ export class DataServer extends ObservableV2<DataServerEvents> {
94102
callback?.(payload)
95103
}
96104
})
97-
websocket.addEventListener('error', (error) =>
105+
channel.addEventListener('error', (error) =>
98106
console.error('Language Server Binary socket error:', error),
99107
)
100-
websocket.addEventListener('close', () => {
108+
channel.addEventListener('close', () => {
101109
this.scheduleInitializationAfterConnect()
102110
})
103111

104-
if (websocket.readyState === WebSocket.OPEN) this.initialized = this.initialize()
105-
else this.initialized = this.scheduleInitializationAfterConnect()
112+
this.initialized = this.initialize()
106113
}
107114

108115
/** TODO: Add docs */
109116
dispose() {
110-
this.websocket.close()
117+
this.channel.close()
111118
this.resolveCallbacks.clear()
112119
}
113120

@@ -116,11 +123,11 @@ export class DataServer extends ObservableV2<DataServerEvents> {
116123
this.initializationScheduled = true
117124
this.initialized = new Promise((resolve) => {
118125
const cb = () => {
119-
this.websocket.removeEventListener('open', cb)
126+
this.channel.removeEventListener('open', cb)
120127
this.initializationScheduled = false
121128
resolve(this.initialize())
122129
}
123-
this.websocket.addEventListener('open', cb)
130+
this.channel.addEventListener('open', cb)
124131
})
125132
return this.initialized
126133
}
@@ -166,7 +173,7 @@ export class DataServer extends ObservableV2<DataServerEvents> {
166173
this.resolveCallbacks.set(messageUuid, resolve)
167174
})
168175
try {
169-
this.websocket.send(builder.finish(rootTable).toArrayBuffer())
176+
this.channel.send(builder.finish(rootTable).toArrayBuffer())
170177
} catch (e: unknown) {
171178
this.resolveCallbacks.delete(messageUuid)
172179
throw e

0 commit comments

Comments
 (0)