Skip to content

Commit 511e4ab

Browse files
authored
handle sticky packet (#39)
close #38
1 parent f838e6b commit 511e4ab

File tree

7 files changed

+435
-192
lines changed

7 files changed

+435
-192
lines changed

README_CN.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,16 +414,19 @@ More examples in `/pkg/spec/encoding/pvarint_test.go|varfloat_test.go`
414414
- [x] v0.4.0 - 支持[yomo-thermometer-plugin](https://github.com/10cella/yomo-thermometer-plugin)插件
415415
- [x] 支持[]Thermometer{}的Mold形式
416416
- [x] [YoMo](https://github.com/yomorun/yomo)框架的正式切换至[Yomo-codec-golang](https://github.com/yomorun/yomo-codec-golang)
417-
- [x] v0.5.0 - 性能压测及优化重构
418-
- [ ] v0.6.0 - 支持UUID及Map类型
417+
- [x] v1.0.0 - 性能压测及优化重构
418+
- [x] YomoCodec的性能测试,与JSON实现版本的对比
419+
- [x] 通过跳KEY解码字节流优化性能
420+
- [x] 粘包处理
421+
- [ ] v1.1.0 - 支持UUID及Map类型
419422
- [ ] encoding
420423
- [ ] PrimitivePacket
421424
- [ ] ProtoCode
422-
- [ ] v0.7.0 - 支持[YoMo](https://github.com/yomorun/yomo)框架的新的解析需求
423-
- [ ] 满足处理流程:解析--监听--读取--处理--写入
424-
- [ ] 支持两种出理流程的切换
425-
- [ ] v0.9.0 - 支持简单的控制指令和错误码
426-
- [ ] v1.0.0 - 支持与QUIC-for-YoMo框架的集成
425+
- [ ] v1.2.0 - 支持[YoMo](https://github.com/yomorun/yomo)框架的新的解析需求
426+
- [ ] 满足处理流程
427+
- [ ] 支持两种处理流程的切换
428+
- [ ] v1.3.0 - 支持简单的控制指令和错误码
429+
- [ ] v2.0.0 - 支持与Quicy框架的集成
427430

428431
## YoMo Codec
429432

docs/report1.jpg

252 Bytes
Loading

docs/report2.jpg

-1.46 KB
Loading

internal/codec/tag.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,7 @@ func NewTag(b byte) *Tag {
3636
func (t *Tag) IsArray() bool {
3737
return t.raw&utils.ArrayFlag == utils.ArrayFlag
3838
}
39+
40+
func (t *Tag) Raw() byte {
41+
return t.raw
42+
}

pkg/codes/yomo_collecting.go

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
// collectingCodec: Implementation of the YomoCodec Interface
1313
type collectingCodec struct {
14-
//Value []byte
1514
Value *y3.NodePacket
1615

1716
Tag *ycodec.Tag
@@ -20,42 +19,34 @@ type collectingCodec struct {
2019
Length int32
2120
Size int32
2221

23-
//Observe string
2422
Observe byte
2523
Sbuf []byte
2624

27-
//Result [][]byte
2825
Result []*y3.NodePacket
2926
OriginResult [][]byte
3027

31-
//ProtoCodec
3228
proto ProtoCodec
3329
}
3430

3531
func NewCollectingCodec(observe string) YomoCodec {
3632
ob := packetutils.KeyOf(observe)
3733
codec := &collectingCodec{
38-
//Value: make([]byte, 0),
39-
Value: nil,
40-
Tag: nil,
41-
LengthBuf: make([]byte, 0),
42-
Length: 0,
43-
Size: 0,
44-
Sbuf: make([]byte, 0),
45-
Observe: ob,
46-
//Result: make([][]byte, 0),
34+
Value: nil,
35+
Tag: nil,
36+
LengthBuf: make([]byte, 0),
37+
Length: 0,
38+
Size: 0,
39+
Sbuf: make([]byte, 0),
40+
Observe: ob,
4741
Result: make([]*y3.NodePacket, 0),
4842
OriginResult: make([][]byte, 0),
49-
//ProtoCodec
50-
proto: NewProtoCodec(ob),
43+
proto: NewProtoCodec(ob),
5144
}
5245

5346
return codec
5447
}
5548

56-
// #2 [优化] 跳key优化
5749
func (codec *collectingCodec) Decoder(buf []byte) {
58-
//key := packetutils.KeyOf(codec.Observe)
5950
key := codec.Observe
6051
for _, c := range buf {
6152
// tag
@@ -80,33 +71,26 @@ func (codec *collectingCodec) Decoder(buf []byte) {
8071

8172
codec.Sbuf = append(codec.Sbuf, c)
8273

83-
// buf end, then handle Sbuf
8474
if int32(len(codec.Sbuf)) == 1+codec.Size+codec.Length {
85-
// Decode Packet from Sbuf
8675
packet, _, err := y3.DecodeNodePacket(codec.Sbuf)
8776
if err != nil {
8877
logger.Errorf("::Decoder DecodeNodePacket error:%v", err)
8978
codec.reset()
9079
continue
9180
}
9281

93-
// temp save Sbuf and reset
94-
//result := make([]byte, 0)
9582
var result *y3.NodePacket
9683
originResult := codec.Sbuf
9784
codec.reset()
9885

9986
//matching
10087
var matching = false
10188
flag, _, _ := packetutils.MatchingKey(key, packet)
102-
//if flag || []byte("*")[0] == key {
10389
if flag {
10490
matching = true
105-
//result = originResult
10691
result = packet
10792
}
10893

109-
// save to result
11094
if matching {
11195
codec.Result = append(codec.Result, result)
11296
codec.OriginResult = append(codec.OriginResult, placeholder)
@@ -144,16 +128,13 @@ func (codec *collectingCodec) Read(mold interface{}) (interface{}, error) {
144128
result := codec.Result[0]
145129
codec.Result = codec.Result[1:]
146130

147-
//proto := NewProtoCodec(codec.Observe)
148131
proto := codec.proto
149132
if proto.IsStruct(mold) {
150-
//err := proto.UnmarshalStruct(result, mold)
151133
err := proto.UnmarshalStructByNodePacket(result, mold)
152134
if err != nil {
153135
return nil, err
154136
}
155137
} else {
156-
//err := proto.UnmarshalBasic(result, &mold)
157138
err := proto.UnmarshalBasicByNodePacket(result, &mold)
158139
if err != nil {
159140
return nil, err
@@ -168,11 +149,6 @@ func (codec *collectingCodec) Read(mold interface{}) (interface{}, error) {
168149

169150
// Write: write interface to stream
170151
func (codec *collectingCodec) Write(w io.Writer, T interface{}, mold interface{}) (int, error) {
171-
// #1. mold --> NodePacket
172-
// #2. merge NodePacket --> codec.Value NodePacket
173-
// #3. NodePacket --> []byte
174-
// #4. Write []byte
175-
//proto := NewProtoCodec(codec.Observe)
176152
proto := codec.proto
177153
buf, err := proto.MarshalNative(T)
178154
if err != nil {
@@ -195,17 +171,14 @@ func (codec *collectingCodec) Encoder(buf []byte, mold interface{}) ([]byte, err
195171
for _, data := range codec.OriginResult {
196172
index = index + 1
197173
if codec.isDecoder(data) {
198-
//source, _, _ := y3.DecodeNodePacket(codec.Value)
199174
source := codec.Value
200175

201-
//key := packetutils.KeyOf(codec.Observe)
202176
key := codec.Observe
203177
_buf, err := codec.mergePacket(source, key, buf, mold)
204178
if err != nil {
205179
return nil, err
206180
}
207181

208-
//codec.Value = make([]byte, 0)
209182
codec.Value = nil
210183
result = append(result, _buf...)
211184
break

0 commit comments

Comments
 (0)