Skip to content

Commit 0fa7b41

Browse files
realloveleilbbniu
authored andcommitted
async
1 parent 2b38744 commit 0fa7b41

File tree

10 files changed

+177
-67
lines changed

10 files changed

+177
-67
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module StressTest{
22
interface ContextTest{
33

4-
int Add(int a,int b,out int c); // Some example function
4+
int Add(int a,int b,out int c); // Some example function
55
int Sub(int a,int b,out int c); // Some example function
66
};
77
};
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
#!/bin/bash
2-
#make
3-
go build -o ContextTestServer
2+
make
43
./ContextTestServer --config=ContextTestServer.conf

tars/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (c *AdapterProxy) doKeepAlive() {
258258
IRequestId: reqId,
259259
SServantName: c.obj.name,
260260
SFuncName: "tars_ping",
261-
ITimeout: int32(c.obj.timeout),
261+
ITimeout: int32(c.obj.asyncTimeout),
262262
}
263263
msg := &Message{Req: &req, Ser: c.obj}
264264
msg.Init()

tars/application.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ func initConfig() {
196196
cltCfg.Stat = cMap["stat"]
197197
cltCfg.Property = cMap["property"]
198198
cltCfg.ModuleName = cMap["modulename"]
199+
cltCfg.SyncInvokeTimeout = c.GetIntWithDef("/tars/application/client<sync-invoke-timeout>", SyncInvokeTimeout)
199200
cltCfg.AsyncInvokeTimeout = c.GetIntWithDef("/tars/application/client<async-invoke-timeout>", AsyncInvokeTimeout)
200201
cltCfg.RefreshEndpointInterval = c.GetIntWithDef("/tars/application/client<refresh-endpoint-interval>", refreshEndpointInterval)
201202
cltCfg.ReportInterval = c.GetIntWithDef("/tars/application/client<report-interval>", reportInterval)

tars/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ type clientConfig struct {
9595
ReportInterval int
9696
CheckStatusInterval int
9797
KeepAliveInterval int
98-
AsyncInvokeTimeout int
9998
// add client timeout
99+
SyncInvokeTimeout int
100+
AsyncInvokeTimeout int
100101
ClientQueueLen int
101102
ClientIdleTimeout time.Duration
102103
ClientReadTimeout time.Duration
@@ -157,6 +158,7 @@ func newClientConfig() *clientConfig {
157158
ReportInterval: reportInterval,
158159
CheckStatusInterval: checkStatusInterval,
159160
KeepAliveInterval: keepAliveInterval,
161+
SyncInvokeTimeout: SyncInvokeTimeout,
160162
AsyncInvokeTimeout: AsyncInvokeTimeout,
161163
ClientQueueLen: ClientQueueLen,
162164
ClientIdleTimeout: tools.ParseTimeOut(ClientIdleTimeout),

tars/message.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package tars
22

33
import (
4+
"context"
45
"time"
56

7+
"github.com/TarsCloud/TarsGo/tars/model"
8+
"github.com/TarsCloud/TarsGo/tars/protocol/res/basef"
69
"github.com/TarsCloud/TarsGo/tars/protocol/res/requestf"
710
"github.com/TarsCloud/TarsGo/tars/selector"
11+
"github.com/TarsCloud/TarsGo/tars/util/current"
12+
"github.com/TarsCloud/TarsGo/tars/util/tools"
813
)
914

1015
// HashType is the hash type
@@ -31,6 +36,8 @@ type Message struct {
3136
hashCode uint32
3237
hashType HashType
3338
isHash bool
39+
Async bool
40+
Callback model.Callback
3441
}
3542

3643
// Init define the beginTime
@@ -66,3 +73,57 @@ func (m *Message) HashType() selector.HashType {
6673
func (m *Message) IsHash() bool {
6774
return m.isHash
6875
}
76+
77+
func buildMessage(ctx context.Context, cType byte,
78+
sFuncName string,
79+
buf []byte,
80+
status map[string]string,
81+
reqContext map[string]string,
82+
resp *requestf.ResponsePacket,
83+
s *ServantProxy) *Message {
84+
85+
// 将ctx中的dyeing信息传入到request中
86+
var msgType int32
87+
if dyeingKey, ok := current.GetDyeingKey(ctx); ok {
88+
TLOG.Debug("dyeing debug: find dyeing key:", dyeingKey)
89+
if status == nil {
90+
status = make(map[string]string)
91+
}
92+
status[current.StatusDyedKey] = dyeingKey
93+
msgType |= basef.TARSMESSAGETYPEDYED
94+
}
95+
96+
// 将ctx中的trace信息传入到request中
97+
if traceData, ok := current.GetTraceData(ctx); ok && traceData.TraceCall {
98+
traceKey := traceData.GetTraceKeyFull(false)
99+
TLOG.Debug("trace debug: find trace key:", traceKey)
100+
if status == nil {
101+
status = make(map[string]string)
102+
}
103+
status[current.StatusTraceKey] = traceKey
104+
msgType |= basef.TARSMESSAGETYPETRACE
105+
}
106+
107+
req := requestf.RequestPacket{
108+
IVersion: s.version,
109+
CPacketType: int8(cType),
110+
IMessageType: msgType,
111+
IRequestId: s.genRequestID(),
112+
SServantName: s.name,
113+
SFuncName: sFuncName,
114+
ITimeout: int32(s.syncTimeout),
115+
SBuffer: tools.ByteToInt8(buf),
116+
Context: reqContext,
117+
Status: status,
118+
}
119+
msg := &Message{Req: &req, Ser: s, Resp: resp}
120+
msg.Init()
121+
122+
if ok, hashType, hashCode, isHash := current.GetClientHash(ctx); ok {
123+
msg.isHash = isHash
124+
msg.hashType = HashType(hashType)
125+
msg.hashCode = hashCode
126+
}
127+
128+
return msg
129+
}

tars/model/Servant.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import (
66
"github.com/TarsCloud/TarsGo/tars/protocol/res/requestf"
77
)
88

9+
type Callback interface {
10+
Dispatch(context.Context, *requestf.RequestPacket, *requestf.ResponsePacket, error) (int32, error)
11+
}
12+
913
// Servant is interface for call the remote server.
1014
type Servant interface {
1115
TarsInvoke(ctx context.Context, cType byte,
@@ -14,6 +18,15 @@ type Servant interface {
1418
status map[string]string,
1519
context map[string]string,
1620
resp *requestf.ResponsePacket) error
21+
22+
TarsInvokeAsync(ctx context.Context, cType byte,
23+
sFuncName string,
24+
buf []byte,
25+
status map[string]string,
26+
context map[string]string,
27+
resp *requestf.ResponsePacket,
28+
callback Callback) error
29+
1730
TarsSetTimeout(t int)
1831
TarsSetProtocol(Protocol)
1932
Name() string

tars/servant.go

Lines changed: 92 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/TarsCloud/TarsGo/tars/util/current"
1616
"github.com/TarsCloud/TarsGo/tars/util/endpoint"
1717
"github.com/TarsCloud/TarsGo/tars/util/rtimer"
18-
"github.com/TarsCloud/TarsGo/tars/util/tools"
1918
)
2019

2120
var (
@@ -31,13 +30,14 @@ const (
3130

3231
// ServantProxy tars servant proxy instance
3332
type ServantProxy struct {
34-
name string
35-
comm *Communicator
36-
manager EndpointManager
37-
timeout int
38-
version int16
39-
proto model.Protocol
40-
queueLen int32
33+
name string
34+
comm *Communicator
35+
manager EndpointManager
36+
syncTimeout int
37+
asyncTimeout int
38+
version int16
39+
proto model.Protocol
40+
queueLen int32
4141

4242
pushCallback func([]byte)
4343
}
@@ -64,7 +64,8 @@ func newServantProxy(comm *Communicator, objName string, opts ...EndpointManager
6464
s.manager = GetManager(comm, objName, opts...)
6565
s.comm = comm
6666
s.proto = &protocol.TarsProtocol{}
67-
s.timeout = s.comm.Client.AsyncInvokeTimeout
67+
s.syncTimeout = s.comm.Client.SyncInvokeTimeout
68+
s.asyncTimeout = s.comm.Client.AsyncInvokeTimeout
6869
s.version = basef.TARSVERSION
6970
return s
7071
}
@@ -76,7 +77,7 @@ func (s *ServantProxy) Name() string {
7677

7778
// TarsSetTimeout sets the timeout for client calling the server , which is in ms.
7879
func (s *ServantProxy) TarsSetTimeout(t int) {
79-
s.timeout = t
80+
s.syncTimeout = t
8081
}
8182

8283
// TarsSetVersion set tars version
@@ -116,53 +117,45 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte,
116117
resp *requestf.ResponsePacket) error {
117118
defer CheckPanic()
118119

119-
// 将ctx中的dyeing信息传入到request中
120-
var msgType int32
121-
if dyeingKey, ok := current.GetDyeingKey(ctx); ok {
122-
TLOG.Debug("dyeing debug: find dyeing key:", dyeingKey)
123-
if status == nil {
124-
status = make(map[string]string)
125-
}
126-
status[current.StatusDyedKey] = dyeingKey
127-
msgType |= basef.TARSMESSAGETYPEDYED
128-
}
120+
msg := buildMessage(ctx, cType, sFuncName, buf, status, reqContext, resp, s)
129121

130-
// 将ctx中的trace信息传入到request中
131-
if traceData, ok := current.GetTraceData(ctx); ok && traceData.TraceCall {
132-
traceKey := traceData.GetTraceKeyFull(false)
133-
TLOG.Debug("trace debug: find trace key:", traceKey)
134-
if status == nil {
135-
status = make(map[string]string)
136-
}
137-
status[current.StatusTraceKey] = traceKey
138-
msgType |= basef.TARSMESSAGETYPETRACE
139-
}
122+
timeout := time.Duration(s.syncTimeout) * time.Millisecond
123+
err := s.invokeFilters(ctx, msg, timeout)
140124

141-
req := requestf.RequestPacket{
142-
IVersion: s.version,
143-
CPacketType: int8(cType),
144-
IRequestId: s.genRequestID(),
145-
SServantName: s.name,
146-
SFuncName: sFuncName,
147-
SBuffer: tools.ByteToInt8(buf),
148-
ITimeout: int32(s.timeout),
149-
Context: reqContext,
150-
Status: status,
151-
IMessageType: msgType,
125+
if err != nil {
126+
return err
152127
}
153-
msg := &Message{Req: &req, Ser: s, Resp: resp}
154-
msg.Init()
155-
156-
timeout := time.Duration(s.timeout) * time.Millisecond
157-
if ok, hashType, hashCode, isHash := current.GetClientHash(ctx); ok {
158-
msg.isHash = isHash
159-
msg.hashType = HashType(hashType)
160-
msg.hashCode = hashCode
128+
*resp = *msg.Resp
129+
return nil
130+
}
131+
132+
// TarsInvokeAsync is used for client invoking server.
133+
func (s *ServantProxy) TarsInvokeAsync(ctx context.Context, cType byte,
134+
sFuncName string,
135+
buf []byte,
136+
status map[string]string,
137+
reqContext map[string]string,
138+
resp *requestf.ResponsePacket,
139+
callback model.Callback) error {
140+
defer CheckPanic()
141+
142+
msg := buildMessage(ctx, cType, sFuncName, buf, status, reqContext, resp, s)
143+
msg.Req.ITimeout = int32(s.asyncTimeout)
144+
if callback == nil {
145+
msg.Req.CPacketType = basef.TARSONEWAY
146+
} else {
147+
msg.Async = true
148+
msg.Callback = callback
161149
}
162150

151+
timeout := time.Duration(s.asyncTimeout) * time.Millisecond
152+
return s.invokeFilters(ctx, msg, timeout)
153+
}
154+
155+
func (s *ServantProxy) invokeFilters(ctx context.Context, msg *Message, timeout time.Duration) error {
163156
if ok, to, isTimeout := current.GetClientTimeout(ctx); ok && isTimeout {
164157
timeout = time.Duration(to) * time.Millisecond
165-
req.ITimeout = int32(to)
158+
msg.Req.ITimeout = int32(to)
166159
}
167160

168161
var err error
@@ -189,27 +182,32 @@ func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte,
189182
}
190183
}
191184
}
192-
s.manager.postInvoke()
185+
// no async rpc call
186+
if !msg.Async {
187+
s.manager.postInvoke()
188+
msg.End()
189+
s.reportStat(msg, err)
190+
}
193191

192+
return err
193+
}
194+
195+
func (s *ServantProxy) reportStat(msg *Message, err error) {
194196
if err != nil {
195-
msg.End()
196-
TLOG.Errorf("Invoke error: %s, %s, %v, cost:%d", s.name, sFuncName, err.Error(), msg.Cost())
197+
TLOG.Errorf("Invoke error: %s, %s, %v, cost:%d", s.name, msg.Req.SFuncName, err.Error(), msg.Cost())
197198
if msg.Resp == nil {
198199
ReportStat(msg, StatSuccess, StatSuccess, StatFailed)
199200
} else if msg.Status == basef.TARSINVOKETIMEOUT {
200201
ReportStat(msg, StatSuccess, StatFailed, StatSuccess)
201202
} else {
202203
ReportStat(msg, StatSuccess, StatSuccess, StatFailed)
203204
}
204-
return err
205+
return
205206
}
206-
msg.End()
207-
*resp = *msg.Resp
208207
ReportStat(msg, StatFailed, StatSuccess, StatSuccess)
209-
return err
210208
}
211209

212-
func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) error {
210+
func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) (err error) {
213211
adp, needCheck := s.manager.SelectAdapterProxy(msg)
214212
if adp == nil {
215213
return errors.New("no adapter Proxy selected:" + msg.Req.SServantName)
@@ -232,19 +230,53 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.
232230
atomic.AddInt32(&s.queueLen, 1)
233231
readCh := make(chan *requestf.ResponsePacket)
234232
adp.resp.Store(msg.Req.IRequestId, readCh)
235-
defer func() {
233+
var releaseFunc = func() {
236234
CheckPanic()
237235
atomic.AddInt32(&s.queueLen, -1)
238236
adp.resp.Delete(msg.Req.IRequestId)
237+
}
238+
defer func() {
239+
if !msg.Async || err != nil {
240+
releaseFunc()
241+
}
239242
}()
240-
if err := adp.Send(msg.Req); err != nil {
243+
244+
if err = adp.Send(msg.Req); err != nil {
241245
adp.failAdd()
242246
return err
243247
}
248+
244249
if msg.Req.CPacketType == basef.TARSONEWAY {
245250
adp.successAdd()
246251
return nil
247252
}
253+
254+
// async call rpc
255+
if msg.Async {
256+
go func() {
257+
defer releaseFunc()
258+
err := s.waitInvoke(msg, adp, timeout, needCheck)
259+
s.manager.postInvoke()
260+
msg.End()
261+
s.reportStat(msg, err)
262+
if msg.Status != basef.TARSINVOKETIMEOUT {
263+
current.SetResponseContext(ctx, msg.Resp.Context)
264+
current.SetResponseStatus(ctx, msg.Resp.Status)
265+
}
266+
if _, err := msg.Callback.Dispatch(ctx, msg.Req, msg.Resp, err); err != nil {
267+
TLOG.Errorf("Callback error: %s, %s, %+v", s.name, msg.Req.SFuncName, err)
268+
}
269+
}()
270+
return nil
271+
}
272+
273+
return s.waitInvoke(msg, adp, timeout, needCheck)
274+
}
275+
276+
func (s *ServantProxy) waitInvoke(msg *Message, adp *AdapterProxy, timeout time.Duration, needCheck bool) error {
277+
ch, _ := adp.resp.Load(msg.Req.IRequestId)
278+
readCh := ch.(chan *requestf.ResponsePacket)
279+
248280
select {
249281
case <-rtimer.After(timeout):
250282
msg.Status = basef.TARSINVOKETIMEOUT

0 commit comments

Comments
 (0)