Skip to content

Commit 1c5f876

Browse files
authored
Merge pull request #3645 from withchao/3.8.3-patch
feat: replace LongConn with ClientConn interface and simplify message handling
2 parents f4171a2 + 4445718 commit 1c5f876

File tree

4 files changed

+268
-363
lines changed

4 files changed

+268
-363
lines changed

internal/msggateway/client.go

Lines changed: 9 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package msggateway
1616

1717
import (
1818
"context"
19-
"encoding/json"
2019
"fmt"
2120
"sync"
2221
"sync/atomic"
@@ -64,7 +63,7 @@ type PingPongHandler func(string) error
6463

6564
type Client struct {
6665
w *sync.Mutex
67-
conn LongConn
66+
conn ClientConn
6867
PlatformID int `json:"platformID"`
6968
IsCompress bool `json:"isCompress"`
7069
UserID string `json:"userID"`
@@ -83,7 +82,7 @@ type Client struct {
8382
}
8483

8584
// ResetClient updates the client's state with new connection and context information.
86-
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) {
85+
func (c *Client) ResetClient(ctx *UserConnContext, conn ClientConn, longConnServer LongConnServer) {
8786
c.w = new(sync.Mutex)
8887
c.conn = conn
8988
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
@@ -110,22 +109,6 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
110109
c.subUserIDs = make(map[string]struct{})
111110
}
112111

113-
func (c *Client) pingHandler(appData string) error {
114-
if err := c.conn.SetReadDeadline(pongWait); err != nil {
115-
return err
116-
}
117-
118-
log.ZDebug(c.ctx, "ping Handler Success.", "appData", appData)
119-
return c.writePongMsg(appData)
120-
}
121-
122-
func (c *Client) pongHandler(_ string) error {
123-
if err := c.conn.SetReadDeadline(pongWait); err != nil {
124-
return err
125-
}
126-
return nil
127-
}
128-
129112
// readMessage continuously reads messages from the connection.
130113
func (c *Client) readMessage() {
131114
defer func() {
@@ -136,52 +119,25 @@ func (c *Client) readMessage() {
136119
c.close()
137120
}()
138121

139-
c.conn.SetReadLimit(maxMessageSize)
140-
_ = c.conn.SetReadDeadline(pongWait)
141-
c.conn.SetPongHandler(c.pongHandler)
142-
c.conn.SetPingHandler(c.pingHandler)
143-
c.activeHeartbeat(c.hbCtx)
144-
145122
for {
146123
log.ZDebug(c.ctx, "readMessage")
147-
messageType, message, returnErr := c.conn.ReadMessage()
124+
message, returnErr := c.conn.ReadMessage()
148125
if returnErr != nil {
149-
log.ZWarn(c.ctx, "readMessage", returnErr, "messageType", messageType)
126+
log.ZWarn(c.ctx, "readMessage", returnErr)
150127
c.closedErr = returnErr
151128
return
152129
}
153130

154-
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
155131
if c.closed.Load() {
156132
// The scenario where the connection has just been closed, but the coroutine has not exited
157133
c.closedErr = ErrConnClosed
158134
return
159135
}
160136

161-
switch messageType {
162-
case MessageBinary:
163-
_ = c.conn.SetReadDeadline(pongWait)
164-
parseDataErr := c.handleMessage(message)
165-
if parseDataErr != nil {
166-
c.closedErr = parseDataErr
167-
return
168-
}
169-
case MessageText:
170-
_ = c.conn.SetReadDeadline(pongWait)
171-
parseDataErr := c.handlerTextMessage(message)
172-
if parseDataErr != nil {
173-
c.closedErr = parseDataErr
174-
return
175-
}
176-
case PingMessage:
177-
err := c.writePongMsg("")
178-
log.ZError(c.ctx, "writePongMsg", err)
179-
180-
case CloseMessage:
181-
c.closedErr = ErrClientClosed
137+
parseDataErr := c.handleMessage(message)
138+
if parseDataErr != nil {
139+
c.closedErr = parseDataErr
182140
return
183-
184-
default:
185141
}
186142
}
187143
}
@@ -356,109 +312,13 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
356312
c.w.Lock()
357313
defer c.w.Unlock()
358314

359-
err = c.conn.SetWriteDeadline(writeWait)
360-
if err != nil {
361-
return err
362-
}
363-
364315
if c.IsCompress {
365316
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
366317
if compressErr != nil {
367318
return compressErr
368319
}
369-
return c.conn.WriteMessage(MessageBinary, resultBuf)
370-
}
371-
372-
return c.conn.WriteMessage(MessageBinary, encodedBuf)
373-
}
374-
375-
// Actively initiate Heartbeat when platform in Web.
376-
func (c *Client) activeHeartbeat(ctx context.Context) {
377-
if c.PlatformID == constant.WebPlatformID {
378-
go func() {
379-
defer func() {
380-
if r := recover(); r != nil {
381-
log.ZPanic(ctx, "activeHeartbeat Panic", errs.ErrPanic(r))
382-
}
383-
}()
384-
log.ZDebug(ctx, "server initiative send heartbeat start.")
385-
ticker := time.NewTicker(pingPeriod)
386-
defer ticker.Stop()
387-
388-
for {
389-
select {
390-
case <-ticker.C:
391-
if err := c.writePingMsg(); err != nil {
392-
log.ZWarn(c.ctx, "send Ping Message error.", err)
393-
return
394-
}
395-
case <-c.hbCtx.Done():
396-
return
397-
}
398-
}
399-
}()
400-
}
401-
}
402-
func (c *Client) writePingMsg() error {
403-
if c.closed.Load() {
404-
return nil
320+
return c.conn.WriteMessage(resultBuf)
405321
}
406322

407-
c.w.Lock()
408-
defer c.w.Unlock()
409-
410-
err := c.conn.SetWriteDeadline(writeWait)
411-
if err != nil {
412-
return err
413-
}
414-
415-
return c.conn.WriteMessage(PingMessage, nil)
416-
}
417-
418-
func (c *Client) writePongMsg(appData string) error {
419-
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
420-
if c.closed.Load() {
421-
log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData, "closed err", c.closedErr)
422-
return nil
423-
}
424-
425-
c.w.Lock()
426-
defer c.w.Unlock()
427-
428-
err := c.conn.SetWriteDeadline(writeWait)
429-
if err != nil {
430-
log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData)
431-
return errs.Wrap(err)
432-
}
433-
err = c.conn.WriteMessage(PongMessage, []byte(appData))
434-
if err != nil {
435-
log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage)
436-
}
437-
438-
return errs.Wrap(err)
439-
}
440-
441-
func (c *Client) handlerTextMessage(b []byte) error {
442-
var msg TextMessage
443-
if err := json.Unmarshal(b, &msg); err != nil {
444-
return err
445-
}
446-
switch msg.Type {
447-
case TextPong:
448-
return nil
449-
case TextPing:
450-
msg.Type = TextPong
451-
msgData, err := json.Marshal(msg)
452-
if err != nil {
453-
return err
454-
}
455-
c.w.Lock()
456-
defer c.w.Unlock()
457-
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
458-
return err
459-
}
460-
return c.conn.WriteMessage(MessageText, msgData)
461-
default:
462-
return fmt.Errorf("not support message type %s", msg.Type)
463-
}
323+
return c.conn.WriteMessage(encodedBuf)
464324
}

0 commit comments

Comments
 (0)