Skip to content

Commit 3cc897e

Browse files
authored
Optimize lock protection to prevent data synchronization problems under high concurrency (#46)
close #45
1 parent 09fb119 commit 3cc897e

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

pkg/codes/yomo_streaming.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ type streamingCodec struct {
1919
SourceBuf []byte
2020
Status decoderStatus
2121

22-
Matching [][]byte
23-
matchingMutex sync.Mutex
24-
OriginResult [][]byte
22+
Matching [][]byte
23+
matchingMutex sync.Mutex
24+
OriginResult [][]byte
25+
OriginResultMutex sync.Mutex
2526

2627
CollectedStatus collectedStatus
2728
CollectedTag *ycodec.Tag
@@ -186,17 +187,21 @@ func (d *streamingCodec) decode(buf []byte) {
186187
if tag != nil && key == tag.SeqID() && int32(len(curBuf)) == 1+size+length {
187188
d.matchingMutex.Lock()
188189
d.Matching = append(d.Matching, curBuf)
189-
d.inform <- true
190190
d.matchingMutex.Unlock()
191+
d.inform <- true
191192
d.Status = decoderMatching
192193
}
193194

194195
if int32(len(d.SourceBuf)) == 1+d.Size+d.Length {
195196
if d.Status == decoderMatching {
196197
//d.Result = append(d.Result, d.SourceBuf)
198+
d.OriginResultMutex.Lock()
197199
d.OriginResult = append(d.OriginResult, placeholder)
200+
d.OriginResultMutex.Unlock()
198201
} else {
202+
d.OriginResultMutex.Lock()
199203
d.OriginResult = append(d.OriginResult, d.SourceBuf)
204+
d.OriginResultMutex.Unlock()
200205
}
201206

202207
d.Status = decoderFinished
@@ -224,8 +229,10 @@ func (d *streamingCodec) Read(mold interface{}) (interface{}, error) {
224229
if len(d.Matching) == 0 {
225230
return nil, nil
226231
}
232+
d.matchingMutex.Lock()
227233
matching := d.Matching[0]
228234
d.Matching = d.Matching[1:]
235+
d.matchingMutex.Unlock()
229236

230237
proto := NewProtoCodec(d.Observe)
231238
if proto.IsStruct(mold) {
@@ -251,7 +258,7 @@ func (d *streamingCodec) Write(w io.Writer, T interface{}, mold interface{}) (in
251258
return w.Write(result)
252259
}
253260

254-
func (d *streamingCodec) isDecoder(buf []byte) bool {
261+
func (d *streamingCodec) isPlaceholder(buf []byte) bool {
255262
if len(buf) != len(placeholder) {
256263
return false
257264
}
@@ -268,9 +275,13 @@ func (d *streamingCodec) Refresh(w io.Writer) (int, error) {
268275
if len(d.OriginResult) == 0 {
269276
return 0, nil
270277
}
278+
279+
d.OriginResultMutex.Lock()
271280
originResult := d.OriginResult[0]
272-
if !d.isDecoder(originResult) {
273-
d.OriginResult = d.OriginResult[1:]
281+
d.OriginResult = d.OriginResult[1:]
282+
d.OriginResultMutex.Unlock()
283+
284+
if !d.isPlaceholder(originResult) {
274285
return w.Write(originResult)
275286
}
276287
return 0, nil

pkg/codes/yomo_streaming_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ func testInformBy(t *testing.T, mold interface{}, input []byte, handle func(v in
129129
l := testLoopTimes
130130
for i := 0; i < l; i++ {
131131
codec.Decoder(input)
132-
time.Sleep(100 * time.Millisecond)
133132
}
134133
}()
135134

0 commit comments

Comments
 (0)