Skip to content

Commit c7c0515

Browse files
committed
adds frames reassembly support
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 3f41137 commit c7c0515

File tree

3 files changed

+156
-1
lines changed

3 files changed

+156
-1
lines changed

packages/rsocket-core/src/RSocketClient.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {createClientMachine} from './RSocketMachine';
3535
import {Lease, Leases} from './RSocketLease';
3636
import {RequesterLeaseHandler, ResponderLeaseHandler} from './RSocketLease';
3737
import {IdentitySerializers} from './RSocketSerialization';
38+
import {ReassemblyDuplexConnection} from './ReassemblyDuplexConnection';
3839

3940
export type ClientConfig<D, M> = {|
4041
serializers?: PayloadSerializers<D, M>,
@@ -92,7 +93,10 @@ export default class RSocketClient<D, M> {
9293
if (status.kind === 'CONNECTED') {
9394
subscription && subscription.cancel();
9495
subscriber.onComplete(
95-
new RSocketClientSocket(this._config, transport),
96+
new RSocketClientSocket(
97+
this._config,
98+
new ReassemblyDuplexConnection(transport),
99+
),
96100
);
97101
} else if (status.kind === 'ERROR') {
98102
subscription && subscription.cancel();

packages/rsocket-core/src/RSocketServer.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import {IdentitySerializers} from './RSocketSerialization';
4545
import {createServerMachine} from './RSocketMachine';
4646
import {Leases} from './RSocketLease';
4747
import {RequesterLeaseHandler, ResponderLeaseHandler} from './RSocketLease';
48+
import {ReassemblyDuplexConnection} from './ReassemblyDuplexConnection';
4849

4950
export interface TransportServer {
5051
start(): Flowable<DuplexConnection>,
@@ -121,6 +122,7 @@ export default class RSocketServer<D, M> {
121122
_handleTransportConnection = (connection: DuplexConnection): void => {
122123
const swapper: SubscriberSwapper<Frame> = new SubscriberSwapper();
123124
let subscription;
125+
connection = new ReassemblyDuplexConnection(connection);
124126
connection.receive().subscribe(
125127
swapper.swap({
126128
onError: error => console.error(error),
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import type {
2+
DuplexConnection,
3+
Frame,
4+
ISubscriber,
5+
ISubscription,
6+
Encodable,
7+
ConnectionStatus,
8+
} from 'rsocket-types';
9+
import {LiteBuffer as Buffer} from './LiteBuffer';
10+
import {Flowable} from 'rsocket-flowable';
11+
import {
12+
CONNECTION_STREAM_ID,
13+
isComplete,
14+
isFollows,
15+
FRAME_TYPES,
16+
FLAGS,
17+
} from './RSocketFrame';
18+
19+
export class ReassemblyDuplexConnection implements DuplexConnection {
20+
_source: DuplexConnection;
21+
22+
constructor(source: DuplexConnection) {
23+
this._source = source;
24+
}
25+
26+
sendOne(frame: Frame): void {
27+
this._source.sendOne(frame);
28+
}
29+
30+
send(input: Flowable<Frame>): void {
31+
this._source.send(input);
32+
}
33+
34+
receive(): Flowable<Frame> {
35+
return this._source
36+
.receive()
37+
.lift(actual => new ReassemblySubscriber(actual));
38+
}
39+
40+
close(): void {
41+
this._source.close();
42+
}
43+
44+
connect(): void {
45+
this._source.connect();
46+
}
47+
48+
connectionStatus(): Flowable<ConnectionStatus> {
49+
return this._source.connectionStatus();
50+
}
51+
}
52+
53+
class ReassemblySubscriber implements ISubscriber<Frame>, ISubscription {
54+
_framesReassemblyMap: Map<number, Frame> = new Map();
55+
56+
_actual: ISubscriber<Frame>;
57+
_subscription: ISubscription;
58+
59+
constructor(actual: ISubscriber<Frame>) {
60+
this._actual = actual;
61+
}
62+
63+
request(n: number) {
64+
this._subscription.request(n);
65+
}
66+
67+
cancel() {
68+
this._subscription.cancel();
69+
this._framesReassemblyMap.clear();
70+
}
71+
72+
onSubscribe(s: ISubscription): void {
73+
if (this._subscription == null) {
74+
this._subscription = s;
75+
this._actual.onSubscribe(this);
76+
} else {
77+
s.cancel();
78+
}
79+
}
80+
81+
onComplete(): void {
82+
this._actual.onComplete();
83+
}
84+
85+
onError(error: Error): void {
86+
this._actual.onError(error);
87+
}
88+
89+
onNext(frame: Frame): void {
90+
const streamId = frame.streamId;
91+
if (streamId !== CONNECTION_STREAM_ID) {
92+
const hasFollowsFlag = isFollows(frame.flags);
93+
const hasCompleteFlag = isComplete(frame.flags);
94+
const isCancelOrError =
95+
frame.type === FRAME_TYPES.ERROR || frame.type === FRAME_TYPES.CANCEL;
96+
97+
const storedFrame = this._framesReassemblyMap.get(streamId);
98+
if (storedFrame) {
99+
if (isCancelOrError) {
100+
this._framesReassemblyMap.delete(streamId);
101+
} else {
102+
if (storedFrame.metadata && frame.metadata) {
103+
storedFrame.metadata = concatContent(
104+
storedFrame.metadata,
105+
frame.metadata,
106+
);
107+
}
108+
109+
if (storedFrame.data && frame.data) {
110+
storedFrame.data = concatContent(storedFrame.data, frame.data);
111+
} else if (!storedFrame.data && frame.data) {
112+
storedFrame.data = frame.data;
113+
}
114+
115+
if (!hasFollowsFlag || hasCompleteFlag) {
116+
if (hasCompleteFlag) {
117+
storedFrame.flags |= FLAGS.COMPLETE;
118+
}
119+
120+
this._framesReassemblyMap.delete(streamId);
121+
this._actual.onNext(storedFrame);
122+
}
123+
124+
return;
125+
}
126+
} else if (hasFollowsFlag && !hasCompleteFlag && !isCancelOrError) {
127+
this._framesReassemblyMap.set(streamId, frame);
128+
129+
return;
130+
}
131+
}
132+
133+
this._actual.onNext(frame);
134+
}
135+
}
136+
137+
const concatContent = (a: Encodable, b: Encodable): Encodable => {
138+
switch (a.constructor.name) {
139+
case 'String':
140+
return a + b;
141+
case 'Uint8Array':
142+
const result = new Uint8Array(a.length + b.length);
143+
result.set(a);
144+
result.set(b, a.length);
145+
return result;
146+
default:
147+
return Buffer.concat([a, b]);
148+
}
149+
};

0 commit comments

Comments
 (0)