From 5057771ea2534953e12fb90233fb2b9bcc3ab554 Mon Sep 17 00:00:00 2001 From: phy Date: Fri, 15 Nov 2019 10:26:10 +0800 Subject: [PATCH 1/2] Realize room and broadcast function --- server.go | 13 +++++++++++++ socket.go | 46 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/server.go b/server.go index 317db96..04f1319 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package socketio import ( + "log" "net/http" "sync" "time" @@ -15,12 +16,14 @@ type Server struct { sockLock sync.RWMutex onError func(err error) nsps map[string]*namespace + rooms map[string]map[string]*socket } // NewServer creates a socket.io server instance upon underlying engine.io transport func NewServer(interval, timeout time.Duration, parser Parser) (server *Server, err error) { e, err := engine.NewServer(interval, timeout, func(ß *engine.Socket) { socket := newSocket(ß, parser) + socket.server = server socket.attachnsp("/") nsp := server.creatensp("/") if err := socket.emitPacket(&Packet{ @@ -108,6 +111,7 @@ func (s *Server) OnError(fn func(err error)) { s.onError = fn } // process is the Packet process handle on server side func (s *Server) process(sock *socket, p *Packet) { + sock.namespace = p.Namespace nsp, ok := s.getnsp(p.Namespace) if !ok { if p.Type > PacketTypeDisconnect { @@ -189,6 +193,15 @@ func (s *Server) process(sock *socket, p *Packet) { } } +func (s *Server) BroadcastToRoom(room string, event string, args ...interface{}) { + for sid := range s.rooms[room] { + so := s.rooms[room][sid] + if err := so.Emit(event, args); err != nil { + log.Println("[BroadcastToRoom]", room, event, args, err) + } + } +} + var ( WebsocketTransport = engine.WebsocketTransport ) diff --git a/socket.go b/socket.go index 2c0743c..a367aaf 100644 --- a/socket.go +++ b/socket.go @@ -25,6 +25,10 @@ type Socket interface { GetHeader(key string) string Sid() string io.Closer + + Join(room string) + Leave(room string) + BroadcastToRoom(room string, event string, args ...interface{}) } type nspSock struct { @@ -46,11 +50,39 @@ func (n *nspSock) EmitError(arg interface{}) (err error) { } type socket struct { - ß *engine.Socket - encoder Encoder - decoder Decoder - acks map[string]*ackHandle - mutex sync.RWMutex + ß *engine.Socket + encoder Encoder + decoder Decoder + acks map[string]*ackHandle + mutex sync.RWMutex + server *Server + namespace string +} + +func (s *socket) Join(room string) { + server := s.server + server.sockLock.Lock() + if server.rooms == nil { + server.rooms = map[string]map[string]*socket{} + } + if server.rooms[room] == nil { + server.rooms[room] = map[string]*socket{} + } + server.rooms[room][s.Sid()] = s + server.sockLock.Unlock() +} + +func (s *socket) Leave(room string) { + server := s.server + server.sockLock.Lock() + if server.rooms[room] != nil { + delete(server.rooms[room], s.Sid()) + } + server.sockLock.Unlock() +} + +func (s *socket) BroadcastToRoom(room string, event string, args ...interface{}) { + s.server.BroadcastToRoom(room, event, args) } func newSocket(ß *engine.Socket, parser Parser) *socket { @@ -106,14 +138,14 @@ func (s *socket) fireAck(nsp string, id uint64, data []byte, buffer [][]byte, au // Emit implements Socket.Emit func (s *socket) Emit(event string, args ...interface{}) (err error) { - return s.emit("/", event, args...) + return s.emit(s.namespace, event, args...) } // EmitError implements Socket.EmitError func (s *socket) EmitError(arg interface{}) (err error) { return s.emitError("/", arg) } // Namespace implements Socket.Namespace -func (*socket) Namespace() string { return "/" } +func (s *socket) Namespace() string { return s.namespace } func (s *socket) emit(nsp string, event string, args ...interface{}) (err error) { s.mutex.RLock() From 7c88c34548acc07e0b2703f7de3f17379a0de21c Mon Sep 17 00:00:00 2001 From: phy Date: Wed, 27 Nov 2019 18:33:39 +0800 Subject: [PATCH 2/2] Room broadcast function realization --- server.go | 17 ++++++++++++----- socket.go | 12 +++++++++++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index 04f1319..4468381 100644 --- a/server.go +++ b/server.go @@ -1,11 +1,11 @@ package socketio import ( - "log" "net/http" "sync" "time" + "github.com/sirupsen/logrus" "github.com/zyxar/socketio/engine" ) @@ -194,10 +194,17 @@ func (s *Server) process(sock *socket, p *Packet) { } func (s *Server) BroadcastToRoom(room string, event string, args ...interface{}) { - for sid := range s.rooms[room] { - so := s.rooms[room][sid] - if err := so.Emit(event, args); err != nil { - log.Println("[BroadcastToRoom]", room, event, args, err) + for sid, so := range s.rooms[room] { + if sid == "" || so == nil { + continue + } + + if err := so.Emit(event, args...); err != nil { + logrus.Error("[BroadcastToRoom] sid="+sid+", ", err, " ,args=", args) + if err == ErrorNamespaceUnavaialble { + so.LeaveAll() + so.Close() + } } } } diff --git a/socket.go b/socket.go index a367aaf..bc605c8 100644 --- a/socket.go +++ b/socket.go @@ -28,6 +28,7 @@ type Socket interface { Join(room string) Leave(room string) + LeaveAll() BroadcastToRoom(room string, event string, args ...interface{}) } @@ -81,8 +82,17 @@ func (s *socket) Leave(room string) { server.sockLock.Unlock() } +func (s *socket) LeaveAll() { + server := s.server + server.sockLock.Lock() + for _, room := range server.rooms { + delete(room, s.Sid()) + } + server.sockLock.Unlock() +} + func (s *socket) BroadcastToRoom(room string, event string, args ...interface{}) { - s.server.BroadcastToRoom(room, event, args) + s.server.BroadcastToRoom(room, event, args...) } func newSocket(ß *engine.Socket, parser Parser) *socket {