Skip to content

Commit 829be55

Browse files
authored
Make Remoting select commandFactory base on Protocol (#330)
* make Remoting select commandFactory base on Protocol
1 parent 9b98a47 commit 829be55

File tree

3 files changed

+174
-20
lines changed

3 files changed

+174
-20
lines changed

src/main/java/com/alipay/remoting/BaseRemoting.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ public abstract class BaseRemoting {
4141
.getLogger("CommonDefault");
4242
private final static long ABANDONING_REQUEST_THRESHOLD = 0L;
4343

44-
protected CommandFactory commandFactory;
44+
private CommandFactory defalutCommandFactory;
4545

4646
public BaseRemoting(CommandFactory commandFactory) {
47-
this.commandFactory = commandFactory;
47+
this.defalutCommandFactory = commandFactory;
4848
}
4949

5050
/**
@@ -69,7 +69,7 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman
6969
request.getId(),
7070
conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
7171
.getChannel()));
72-
return this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
72+
return this.getCommandFactory(conn).createTimeoutResponse(conn.getRemoteAddress());
7373
}
7474

7575
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
@@ -86,7 +86,7 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman
8686
public void operationComplete(ChannelFuture f) throws Exception {
8787
if (!f.isSuccess()) {
8888
conn.removeInvokeFuture(requestId);
89-
future.putResponse(commandFactory.createSendFailedResponse(
89+
future.putResponse(getCommandFactory(conn).createSendFailedResponse(
9090
conn.getRemoteAddress(), f.cause()));
9191
LOGGER.error("Invoke send failed, id={}", requestId, f.cause());
9292
}
@@ -98,7 +98,8 @@ public void operationComplete(ChannelFuture f) throws Exception {
9898
}
9999
} catch (Exception e) {
100100
conn.removeInvokeFuture(requestId);
101-
future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
101+
future.putResponse(getCommandFactory(conn).createSendFailedResponse(
102+
conn.getRemoteAddress(), e));
102103
LOGGER.error("Exception caught when sending invocation, id={}", requestId, e);
103104
}
104105
RemotingCommand response = future.waitResponse(remainingTime);
@@ -109,7 +110,7 @@ public void operationComplete(ChannelFuture f) throws Exception {
109110

110111
if (response == null) {
111112
conn.removeInvokeFuture(requestId);
112-
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
113+
response = this.getCommandFactory(conn).createTimeoutResponse(conn.getRemoteAddress());
113114
LOGGER.warn("Wait response, request id={} timeout!", requestId);
114115
}
115116

@@ -137,7 +138,8 @@ protected void invokeWithCallback(final Connection conn, final RemotingCommand r
137138
request.getId(),
138139
conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
139140
.getChannel()));
140-
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
141+
future.putResponse(getCommandFactory(conn).createTimeoutResponse(
142+
conn.getRemoteAddress()));
141143
future.tryAsyncExecuteInvokeCallbackAbnormally();
142144
return;
143145
}
@@ -149,8 +151,8 @@ protected void invokeWithCallback(final Connection conn, final RemotingCommand r
149151
public void run(Timeout timeout) throws Exception {
150152
InvokeFuture future = conn.removeInvokeFuture(requestId);
151153
if (future != null) {
152-
future.putResponse(commandFactory.createTimeoutResponse(conn
153-
.getRemoteAddress()));
154+
future.putResponse(getCommandFactory(conn).createTimeoutResponse(
155+
conn.getRemoteAddress()));
154156
future.tryAsyncExecuteInvokeCallbackAbnormally();
155157
}
156158
}
@@ -165,7 +167,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
165167
InvokeFuture f = conn.removeInvokeFuture(requestId);
166168
if (f != null) {
167169
f.cancelTimeout();
168-
f.putResponse(commandFactory.createSendFailedResponse(
170+
f.putResponse(getCommandFactory(conn).createSendFailedResponse(
169171
conn.getRemoteAddress(), cf.cause()));
170172
f.tryAsyncExecuteInvokeCallbackAbnormally();
171173
}
@@ -179,7 +181,8 @@ public void operationComplete(ChannelFuture cf) throws Exception {
179181
InvokeFuture f = conn.removeInvokeFuture(requestId);
180182
if (f != null) {
181183
f.cancelTimeout();
182-
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
184+
f.putResponse(getCommandFactory(conn).createSendFailedResponse(
185+
conn.getRemoteAddress(), e));
183186
f.tryAsyncExecuteInvokeCallbackAbnormally();
184187
}
185188
LOGGER.error("Exception caught when sending invocation. The address is {}",
@@ -208,7 +211,8 @@ protected InvokeFuture invokeWithFuture(final Connection conn, final RemotingCom
208211
request.getId(),
209212
conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
210213
.getChannel()));
211-
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
214+
future.putResponse(getCommandFactory(conn).createTimeoutResponse(
215+
conn.getRemoteAddress()));
212216
return future;
213217
}
214218

@@ -219,8 +223,8 @@ protected InvokeFuture invokeWithFuture(final Connection conn, final RemotingCom
219223
public void run(Timeout timeout) throws Exception {
220224
InvokeFuture future = conn.removeInvokeFuture(requestId);
221225
if (future != null) {
222-
future.putResponse(commandFactory.createTimeoutResponse(conn
223-
.getRemoteAddress()));
226+
future.putResponse(getCommandFactory(conn).createTimeoutResponse(
227+
conn.getRemoteAddress()));
224228
}
225229
}
226230

@@ -235,7 +239,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
235239
InvokeFuture f = conn.removeInvokeFuture(requestId);
236240
if (f != null) {
237241
f.cancelTimeout();
238-
f.putResponse(commandFactory.createSendFailedResponse(
242+
f.putResponse(getCommandFactory(conn).createSendFailedResponse(
239243
conn.getRemoteAddress(), cf.cause()));
240244
}
241245
LOGGER.error("Invoke send failed. The address is {}",
@@ -248,7 +252,8 @@ public void operationComplete(ChannelFuture cf) throws Exception {
248252
InvokeFuture f = conn.removeInvokeFuture(requestId);
249253
if (f != null) {
250254
f.cancelTimeout();
251-
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
255+
f.putResponse(getCommandFactory(conn).createSendFailedResponse(
256+
conn.getRemoteAddress(), e));
252257
}
253258
LOGGER.error("Exception caught when sending invocation. The address is {}",
254259
RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
@@ -323,8 +328,27 @@ protected abstract InvokeFuture createInvokeFuture(final Connection conn,
323328
final InvokeContext invokeContext,
324329
final InvokeCallback invokeCallback);
325330

331+
@Deprecated
326332
protected CommandFactory getCommandFactory() {
327-
return commandFactory;
333+
LOGGER
334+
.warn("The method getCommandFactory() is deprecated. Please use getCommandFactory(ProtocolCode/Connection) instead.");
335+
return defalutCommandFactory;
336+
}
337+
338+
protected CommandFactory getCommandFactory(Connection conn) {
339+
ProtocolCode protocolCode = conn.getChannel().attr(Connection.PROTOCOL).get();
340+
return getCommandFactory(protocolCode);
341+
}
342+
343+
protected CommandFactory getCommandFactory(ProtocolCode protocolCode) {
344+
if (protocolCode == null) {
345+
return getCommandFactory();
346+
}
347+
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
348+
if (protocol == null) {
349+
return getCommandFactory();
350+
}
351+
return protocol.getCommandFactory();
328352
}
329353

330354
private int remainingTime(RemotingCommand request, int timeout) {

src/main/java/com/alipay/remoting/rpc/RpcRemoting.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public void invokeWithCallback(final Connection conn, final Object request,
319319
protected RemotingCommand toRemotingCommand(Object request, Connection conn,
320320
InvokeContext invokeContext, int timeoutMillis)
321321
throws SerializationException {
322-
RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);
322+
RpcRequestCommand command = this.getCommandFactory(conn).createRequestCommand(request);
323323

324324
if (null != invokeContext) {
325325
// set client custom serializer for request command if not null
@@ -370,7 +370,7 @@ private void logDebugInfo(RemotingCommand requestCommand) {
370370
@Override
371371
protected InvokeFuture createInvokeFuture(RemotingCommand request, InvokeContext invokeContext) {
372372
return new DefaultInvokeFuture(request.getId(), null, null, request.getProtocolCode()
373-
.getFirstByte(), this.getCommandFactory(), invokeContext);
373+
.getFirstByte(), this.getCommandFactory(request.getProtocolCode()), invokeContext);
374374
}
375375

376376
/**
@@ -382,6 +382,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
382382
InvokeCallback invokeCallback) {
383383
return new DefaultInvokeFuture(request.getId(), new RpcInvokeCallbackListener(
384384
RemotingUtil.parseRemoteAddress(conn.getChannel())), invokeCallback, request
385-
.getProtocolCode().getFirstByte(), this.getCommandFactory(), invokeContext);
385+
.getProtocolCode().getFirstByte(), this.getCommandFactory(conn), invokeContext);
386386
}
387387
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.alipay.remoting;
18+
19+
import com.alipay.remoting.rpc.RpcCommandFactory;
20+
import io.netty.channel.local.LocalChannel;
21+
import org.junit.Test;
22+
23+
import static org.junit.Assert.assertSame;
24+
25+
public class BaseRemotingTest {
26+
27+
@Test
28+
public void getCommandFactory() {
29+
RpcCommandFactory commandFactory = new RpcCommandFactory();
30+
BaseRemoting baseRemoting = new EmptyRemoting(commandFactory);
31+
assertSame(commandFactory, baseRemoting.getCommandFactory());
32+
}
33+
34+
@Test
35+
public void getCommandFactoryFromProtocolCode() {
36+
RpcCommandFactory defaultCommand = new RpcCommandFactory();
37+
BaseRemoting baseRemoting = new EmptyRemoting(defaultCommand);
38+
39+
// no 3a protocol
40+
CommandFactory commandFactory = baseRemoting.getCommandFactory(ProtocolCode
41+
.fromBytes((byte) 0x3a));
42+
assertSame(defaultCommand, commandFactory);
43+
44+
// register 3a protocol
45+
RpcCommandFactory my3aCommandFactory = new RpcCommandFactory();
46+
ProtocolManager.registerProtocol(new MyProtocol(my3aCommandFactory), (byte) 0x3a);
47+
// get 3a
48+
commandFactory = baseRemoting.getCommandFactory(ProtocolCode.fromBytes((byte) 0x3a));
49+
assertSame(my3aCommandFactory, commandFactory);
50+
51+
ProtocolManager.unRegisterProtocol((byte) 0x3a);
52+
}
53+
54+
@Test
55+
public void getCommandFactoryFromConnection() {
56+
RpcCommandFactory defaultCommand = new RpcCommandFactory();
57+
BaseRemoting baseRemoting = new EmptyRemoting(defaultCommand);
58+
59+
Connection connection = new Connection(new LocalChannel());
60+
61+
// no 3a protocol
62+
CommandFactory commandFactory = baseRemoting.getCommandFactory(connection);
63+
assertSame(defaultCommand, commandFactory);
64+
65+
// register 3a protocol
66+
RpcCommandFactory my3aCommandFactory = new RpcCommandFactory();
67+
ProtocolManager.registerProtocol(new MyProtocol(my3aCommandFactory), (byte) 0x3a);
68+
connection.getChannel().attr(Connection.PROTOCOL).set(ProtocolCode.fromBytes((byte) 0x3a));
69+
// get 3a
70+
commandFactory = baseRemoting.getCommandFactory(connection);
71+
assertSame(my3aCommandFactory, commandFactory);
72+
73+
ProtocolManager.unRegisterProtocol((byte) 0x3a);
74+
}
75+
76+
static class EmptyRemoting extends BaseRemoting {
77+
78+
public EmptyRemoting(CommandFactory commandFactory) {
79+
super(commandFactory);
80+
}
81+
82+
@Override
83+
protected InvokeFuture createInvokeFuture(RemotingCommand request,
84+
InvokeContext invokeContext) {
85+
return null;
86+
}
87+
88+
@Override
89+
protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand request,
90+
InvokeContext invokeContext,
91+
InvokeCallback invokeCallback) {
92+
return null;
93+
}
94+
}
95+
96+
static class MyProtocol implements Protocol {
97+
98+
private CommandFactory commandFactory;
99+
100+
public MyProtocol(CommandFactory commandFactory) {
101+
this.commandFactory = commandFactory;
102+
}
103+
104+
@Override
105+
public CommandEncoder getEncoder() {
106+
return null;
107+
}
108+
109+
@Override
110+
public CommandDecoder getDecoder() {
111+
return null;
112+
}
113+
114+
@Override
115+
public HeartbeatTrigger getHeartbeatTrigger() {
116+
return null;
117+
}
118+
119+
@Override
120+
public CommandHandler getCommandHandler() {
121+
return null;
122+
}
123+
124+
@Override
125+
public CommandFactory getCommandFactory() {
126+
return commandFactory;
127+
}
128+
}
129+
130+
}

0 commit comments

Comments
 (0)