@@ -21,6 +21,7 @@ public class ClusterWS {
21
21
private PingHandler mPingHandler ;
22
22
private List <Channel > mChannels ;
23
23
private ReconnectionParams mReconnectionParams ;
24
+ private static final byte [] PONG = "A" .getBytes ();
24
25
25
26
public ClusterWS (String url ) {
26
27
if (url == null ) {
@@ -130,7 +131,10 @@ private void createSocket() {
130
131
mSocket = new Socket (URI .create (mUrl ), new ISocketEvents () {
131
132
@ Override
132
133
public void onOpen () {
133
- mClusterWSListener .onConnected ();
134
+ for (Channel channel :
135
+ mChannels ) {
136
+ channel .subscribe ();
137
+ }
134
138
}
135
139
136
140
@ Override
@@ -145,8 +149,10 @@ public void onClose(int code, String reason) {
145
149
if (mPingHandler .getPingTimer () != null ) {
146
150
mPingHandler .getPingTimer ().cancel ();
147
151
}
148
- if (mReconnectionParams .isAutoReconnect () && code != 1000 && (mReconnectionParams .getReconnectionAttempts () == 0 || mReconnectionParams .getReconnectionsAttempted () < mReconnectionParams .getReconnectionAttempts ())) {
149
- if (mSocket .getReadyState () == WebSocket .READYSTATE .CLOSED || mSocket .getReadyState () == WebSocket .READYSTATE .NOT_YET_CONNECTED ) {
152
+ if (mReconnectionParams .isAutoReconnect ()
153
+ && code != 1000
154
+ && (mReconnectionParams .getReconnectionAttempts () == 0 || mReconnectionParams .getReconnectionsAttempted () < mReconnectionParams .getReconnectionAttempts ())) {
155
+ if (mSocket .getReadyState () == WebSocket .READYSTATE .CLOSED || mSocket .getReadyState () == WebSocket .READYSTATE .NOT_YET_CONNECTED || mSocket .getReadyState () == WebSocket .READYSTATE .CLOSING ) {
150
156
mReconnectionParams .incrementReconnectionsAttempted ();
151
157
int randomDelay = ThreadLocalRandom .current ().nextInt (1 ,
152
158
mReconnectionParams .getReconnectionIntervalMax () -
@@ -165,8 +171,17 @@ public void run() {
165
171
166
172
@ Override
167
173
public void onBinaryMessage (ByteBuffer bytes ) {
168
- String message = StandardCharsets .UTF_8 .decode (bytes ).toString ();
169
- onMessageReceived (message );
174
+ System .out .println ("GOT MESSAGE" );
175
+ byte [] arr = new byte [bytes .remaining ()];
176
+ bytes .get (arr );
177
+ if (arr .length == 1 && arr [0 ] == 57 ) {
178
+ mPingHandler .setMissedPingToZero ();
179
+ mSocket .send (PONG );
180
+ } else {
181
+ String message = new String (arr , StandardCharsets .UTF_8 );
182
+ onMessageReceived (message );
183
+ }
184
+
170
185
}
171
186
172
187
@ Override
@@ -181,11 +196,7 @@ public void onMessage(String message) {
181
196
}
182
197
183
198
private void onMessageReceived (String message ) {
184
- if (message .equals ("#0" )) {
185
- mPingHandler .setMissedPingToZero ();
186
- send ("#1" , null , "ping" );
187
- } else {
188
- mMessageHandler .messageDecode (ClusterWS .this , message );
189
- }
199
+ System .out .println ("MESSAGE IS " + message );
200
+ mMessageHandler .messageDecode (ClusterWS .this , message );
190
201
}
191
202
}
0 commit comments