@@ -3,7 +3,10 @@ package mainline
33import (
44 "crypto/rand"
55 "crypto/sha1"
6+ "github.com/boramalper/magnetico/pkg/util"
67 "net"
8+ "sort"
9+ "strconv"
710 "sync"
811 "time"
912
@@ -16,6 +19,8 @@ type Protocol struct {
1619 transport * Transport
1720 eventHandlers ProtocolEventHandlers
1821 started bool
22+
23+ stats protocolStats
1924}
2025
2126type ProtocolEventHandlers struct {
@@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
3843 p = new (Protocol )
3944 p .eventHandlers = eventHandlers
4045 p .transport = NewTransport (laddr , p .onMessage , p .eventHandlers .OnCongestion )
46+ p .stats = protocolStats {
47+ messageTypeCount : make (map [string ]map [string ]int ),
48+ }
4149
4250 p .currentTokenSecret , p .previousTokenSecret = make ([]byte , 20 ), make ([]byte , 20 )
4351 _ , err := rand .Read (p .currentTokenSecret )
@@ -56,6 +64,7 @@ func (p *Protocol) Start() {
5664 p .started = true
5765
5866 p .transport .Start ()
67+ go p .printStats ()
5968 go p .updateTokenSecret ()
6069}
6170
@@ -67,7 +76,113 @@ func (p *Protocol) Terminate() {
6776 p .transport .Terminate ()
6877}
6978
79+ //statistics
80+ type protocolStats struct {
81+ sync.RWMutex
82+ messageTypeCount map [string ]map [string ]int //type=>subtype=>count
83+ }
84+
85+ func (ps * protocolStats ) Reset () {
86+ ps .Lock ()
87+ defer ps .Unlock ()
88+ ps .messageTypeCount = make (map [string ]map [string ]int )
89+ }
90+
91+ type messageTypeCountOrdered struct {
92+ messageType string
93+ messageCount int
94+ percentageOverTotal float64
95+ subMessages orderedMessagesCount
96+ }
97+ type orderedMessagesCount []* messageTypeCountOrdered
98+
99+ func (omc orderedMessagesCount ) Len () int {
100+ return len (omc )
101+ }
102+ func (omc orderedMessagesCount ) Swap (i , j int ) {
103+ omc [i ], omc [j ] = omc [j ], omc [i ]
104+ }
105+ func (omc orderedMessagesCount ) Less (i , j int ) bool {
106+ return omc [i ].messageCount > omc [j ].messageCount
107+ }
108+ func (omc orderedMessagesCount ) CalculatePercentagesOverTotal (totalMessages int ) {
109+ for _ , mtco := range omc {
110+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
111+ mtco .subMessages .CalculatePercentagesOverTotal (totalMessages )
112+ }
113+ mtco .percentageOverTotal = util .RoundToDecimal (
114+ (float64 (mtco .messageCount )/ float64 (totalMessages ))* 100 , 2 )
115+ }
116+ }
117+ func (omc orderedMessagesCount ) Sort () {
118+ for _ , mtco := range omc {
119+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
120+ mtco .subMessages .Sort ()
121+ }
122+ }
123+ sort .Sort (omc )
124+ }
125+ func (omc orderedMessagesCount ) String () string {
126+ /*
127+ string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
128+ be a problem and it will be much easier to write down and read
129+ */
130+ mostReceivedMessageTypes := ""
131+ for mIdx , m := range omc {
132+ if mIdx > 0 {
133+ mostReceivedMessageTypes += ", "
134+ }
135+ mostReceivedMessageTypes += m .messageType
136+ mostReceivedMessageTypes +=
137+ " (" + strconv .Itoa (m .messageCount ) + ", " + strconv .FormatFloat (m .percentageOverTotal , 'f' , - 1 , 64 ) + "%)"
138+
139+ if m .subMessages != nil && len (m .subMessages ) > 0 {
140+ //add stats for submessages unless there is only 1 submessage with len 0 (empty)
141+ if ! (len (m .subMessages ) == 1 && len (m .subMessages [0 ].messageType ) == 0 ) {
142+ mostReceivedMessageTypes += "[ " + m .subMessages .String () + " ]"
143+ }
144+ }
145+ }
146+ return mostReceivedMessageTypes
147+ }
148+ func (p * Protocol ) printStats () {
149+ for {
150+ time .Sleep (StatsPrintClock )
151+ p .stats .RLock ()
152+ orderedMessages := make (orderedMessagesCount , 0 , len (p .stats .messageTypeCount ))
153+ totalMessages := 0
154+ for mType , mSubTypes := range p .stats .messageTypeCount {
155+ mCount := 0
156+ orderedSubMessages := make (orderedMessagesCount , 0 , len (mSubTypes ))
157+ for mSubType , mSubCount := range mSubTypes {
158+ mCount += mSubCount
159+ totalMessages += mSubCount
160+ orderedSubMessages = append (orderedSubMessages , & messageTypeCountOrdered {
161+ messageType : mSubType ,
162+ messageCount : mSubCount ,
163+ })
164+ }
165+ orderedMessages = append (orderedMessages , & messageTypeCountOrdered {
166+ messageType : mType ,
167+ messageCount : mCount ,
168+ subMessages : orderedSubMessages ,
169+ })
170+ }
171+ p .stats .RUnlock ()
172+ orderedMessages .CalculatePercentagesOverTotal (totalMessages )
173+ orderedMessages .Sort ()
174+
175+ zap .L ().Info ("Protocol stats (on " + StatsPrintClock .String ()+ "):" ,
176+ zap .String ("message type" , orderedMessages .String ()),
177+ )
178+
179+ p .stats .Reset ()
180+ }
181+ }
182+
70183func (p * Protocol ) onMessage (msg * Message , addr * net.UDPAddr ) {
184+ temporaryQ := msg .Q
185+
71186 switch msg .Y {
72187 case "q" :
73188 switch msg .Q {
@@ -140,6 +255,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
140255 //
141256 // sample_infohashes > get_peers > find_node > ping / announce_peer
142257 if len (msg .R .Samples ) != 0 { // The message should be a sample_infohashes response.
258+ temporaryQ = "sample_infohashes"
143259 if ! validateSampleInfohashesResponseMessage (msg ) {
144260 // zap.L().Debug("An invalid sample_infohashes response received!")
145261 return
@@ -148,6 +264,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
148264 p .eventHandlers .OnSampleInfohashesResponse (msg , addr )
149265 }
150266 } else if len (msg .R .Token ) != 0 { // The message should be a get_peers response.
267+ temporaryQ = "get_peers"
151268 if ! validateGetPeersResponseMessage (msg ) {
152269 // zap.L().Debug("An invalid get_peers response received!")
153270 return
@@ -156,6 +273,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
156273 p .eventHandlers .OnGetPeersResponse (msg , addr )
157274 }
158275 } else if len (msg .R .Nodes ) != 0 { // The message should be a find_node response.
276+ temporaryQ = "find_node"
159277 if ! validateFindNodeResponseMessage (msg ) {
160278 // zap.L().Debug("An invalid find_node response received!")
161279 return
@@ -164,6 +282,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
164282 p .eventHandlers .OnFindNodeResponse (msg , addr )
165283 }
166284 } else { // The message should be a ping or an announce_peer response.
285+ temporaryQ = "ping_or_announce"
167286 if ! validatePingORannouncePeerResponseMessage (msg ) {
168287 // zap.L().Debug("An invalid ping OR announce_peer response received!")
169288 return
@@ -182,6 +301,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
182301 zap.String("type", msg.Y))
183302 */
184303 }
304+
305+ //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
306+ p .stats .Lock ()
307+ if _ , ok := p .stats .messageTypeCount [msg .Y ]; ! ok {
308+ p .stats .messageTypeCount [msg .Y ] = make (map [string ]int )
309+ }
310+ p .stats .messageTypeCount [msg .Y ][temporaryQ ]++
311+ p .stats .Unlock ()
185312}
186313
187314func (p * Protocol ) SendMessage (msg * Message , addr * net.UDPAddr ) {
0 commit comments