Skip to content

Commit e02aeb9

Browse files
authored
support for subscribes (#52)
1 parent 4db043c commit e02aeb9

File tree

3 files changed

+86
-16
lines changed

3 files changed

+86
-16
lines changed

observable.go

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package y3
22

33
import (
44
"io"
5+
"sync"
56

67
"github.com/yomorun/y3-codec-golang/pkg/encoding"
78
)
@@ -21,11 +22,52 @@ type observableImpl struct {
2122
}
2223

2324
type iterableImpl struct {
24-
channel chan interface{}
25+
next chan interface{}
26+
subscribers []chan interface{}
27+
mutex sync.RWMutex
28+
producerAlreadyCreated bool
2529
}
2630

2731
func (i *iterableImpl) Observe() <-chan interface{} {
28-
return i.channel
32+
i.connect()
33+
ch := make(chan interface{})
34+
i.mutex.Lock()
35+
i.subscribers = append(i.subscribers, ch)
36+
i.mutex.Unlock()
37+
return ch
38+
}
39+
40+
func (i *iterableImpl) connect() {
41+
i.mutex.Lock()
42+
if !i.producerAlreadyCreated {
43+
go i.produce()
44+
i.producerAlreadyCreated = true
45+
}
46+
i.mutex.Unlock()
47+
}
48+
49+
func (i *iterableImpl) produce() {
50+
defer func() {
51+
i.mutex.RLock()
52+
for _, subscriber := range i.subscribers {
53+
close(subscriber)
54+
}
55+
i.mutex.RUnlock()
56+
}()
57+
58+
for {
59+
select {
60+
case item, ok := <-i.next:
61+
if !ok {
62+
return
63+
}
64+
i.mutex.RLock()
65+
for _, subscriber := range i.subscribers {
66+
subscriber <- item
67+
}
68+
i.mutex.RUnlock()
69+
}
70+
}
2971
}
3072

3173
func (o *observableImpl) Observe() <-chan interface{} {
@@ -36,6 +78,7 @@ func (o *observableImpl) Observe() <-chan interface{} {
3678
func FromStream(reader io.Reader) Observable {
3779

3880
f := func(next chan interface{}) {
81+
defer close(next)
3982
for {
4083
buf := make([]byte, 3*1024)
4184
n, err := reader.Read(buf)
@@ -173,8 +216,10 @@ func decodeLength(buf []byte) (length int32, err error) {
173216

174217
func createObservable(f func(next chan interface{})) Observable {
175218
next := make(chan interface{})
219+
subscribers := make([]chan interface{}, 0)
220+
176221
go f(next)
177-
return &observableImpl{iterable: &iterableImpl{channel: next}}
222+
return &observableImpl{iterable: &iterableImpl{next: next, subscribers: subscribers}}
178223
}
179224

180225
//filter root data from the stream

observable_test.go

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,28 @@ import (
1111

1212
func TestObservable(t *testing.T) {
1313
buf := []byte{0x81, 0x16, 0xb0, 0x14, 0x10, 0x4, 0x79, 0x6f, 0x6d, 0x6f, 0x11, 0x2, 0x43, 0xe4, 0x92, 0x8, 0x13, 0x2, 0x41, 0xf0, 0x14, 0x2, 0x42, 0x20, 0x81, 0x16, 0xb0, 0x14, 0x10, 0x4, 0x79, 0x6f, 0x6d, 0x6f, 0x11, 0x2, 0x43, 0xe4, 0x92, 0x8, 0x13, 0x2, 0x41, 0xf0, 0x14, 0x2, 0x42, 0x20, 0x81, 0x16, 0xb0, 0x14, 0x10, 0x4, 0x79, 0x6f, 0x6d, 0x6f, 0x11, 0x2, 0x43, 0xe4, 0x92, 0x8, 0x13, 0x2, 0x41, 0xf0, 0x14, 0x2, 0x42, 0x20}
14-
var err error = nil
15-
var count int = 0
14+
var err1 error = nil
15+
var err2 error = nil
16+
var count1 int = 0
17+
var count2 int = 0
1618

17-
callback := func(v []byte) (interface{}, error) {
19+
callback1 := func(v []byte) (interface{}, error) {
1820
if (v[0] == 17) && (v[1] == 2) && (v[2] == 67) && (v[3] == 228) {
19-
count++
20-
return "ok", nil
21+
count1++
22+
return "ok1", nil
2123
} else {
22-
err = errors.New("fail")
24+
err1 = errors.New("fail")
25+
return nil, errors.New("fail")
26+
}
27+
28+
}
29+
30+
callback2 := func(v []byte) (interface{}, error) {
31+
if (v[0] == 19) && (v[1] == 2) && (v[2] == 65) && (v[3] == 240) {
32+
count2++
33+
return "ok2", nil
34+
} else {
35+
err2 = errors.New("fail")
2336
return nil, errors.New("fail")
2437
}
2538

@@ -29,15 +42,26 @@ func TestObservable(t *testing.T) {
2942

3043
source := FromStream(reader)
3144

32-
consumer := source.Subscribe(0x11).OnObserve(callback)
45+
consumer1 := source.Subscribe(0x11).OnObserve(callback1)
3346

34-
for range consumer {
35-
if count == 3 || err != nil {
47+
consumer2 := source.Subscribe(0x13).OnObserve(callback2)
48+
49+
for range consumer1 {
50+
if count1 == 3 || err1 != nil {
51+
break
52+
}
53+
}
54+
55+
for range consumer2 {
56+
if count2 == 3 || err2 != nil {
3657
break
3758
}
3859
}
39-
assert.NoError(t, err, fmt.Sprintf("subscribe error:%v", err))
40-
assert.Equal(t, 3, count, fmt.Sprintf("testing observable %v: %v", 3, count))
41-
testPrintf("count=%v, observable_result=%v, err=%v\n", 3, count, err)
60+
61+
assert.NoError(t, err1, fmt.Sprintf("subscribe2 error:%v", err1))
62+
assert.Equal(t, 3, count1, fmt.Sprintf("testing observable1 %v: %v", 3, count1))
63+
assert.NoError(t, err2, fmt.Sprintf("subscribe2 error:%v", err2))
64+
assert.Equal(t, 3, count2, fmt.Sprintf("testing observable2 %v: %v", 3, count2))
65+
testPrintf("count1=%v,count2=%v, observable_result=%v, err1=%v,err2=%v\n", 3, 3, "ok", err1, err2)
4266

4367
}

tester.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ func testDecoder(observe byte, buf []byte, callback func(v []byte) (interface{},
3333
// Init create a channel for testing
3434
func (t *observableTester) Init(callback func(v []byte) (interface{}, error)) *observableTester {
3535
t.sourceChannel = make(chan interface{})
36+
subscribers := make([]chan interface{}, 0)
3637

37-
t.source = &observableImpl{iterable: &iterableImpl{channel: t.sourceChannel}}
38+
t.source = &observableImpl{iterable: &iterableImpl{next: t.sourceChannel, subscribers: subscribers}}
3839

3940
consumer := t.source.Subscribe(t.observe).OnObserve(callback)
4041

0 commit comments

Comments
 (0)