Skip to content

Commit 7f873c2

Browse files
author
Adrián García
committed
(#4) Fix flowable not desubscribing correctly from the store when disposed. Add optional backpressure strategy to Store.flowable() (#5)
* (#4) Fix flowable not desubscribing correctly from the store when disposed. Add test cases to verify this behaviour * (#4) Change flowable() default backpressure strategy to Buffer
1 parent 6c884fe commit 7f873c2

File tree

7 files changed

+58
-23
lines changed

7 files changed

+58
-23
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ Add the following dependencies to your app's `build.gradle`:
178178

179179
```groovy
180180
dependencies {
181-
def mini_version = "1.0.8"
181+
def mini_version = "1.0.9"
182182
// Minimum working dependencies
183183
implementation "com.github.bq.mini-kotlin:mini-android:$mini_version"
184184
kapt "com.github.bq.mini-kotlin:mini-processor:$mini_version"

mini-common/src/main/java/mini/Store.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ abstract class Store<S> : Closeable {
2121
}
2222
}
2323

24+
internal val listeners = Vector<(S) -> Unit>()
2425
private var _state: Any? = NO_STATE
25-
private val listeners = Vector<(S) -> Unit>()
2626

2727
/** Set new state, equivalent to [asNewState]*/
2828
protected fun setState(state: S) {

mini-common/src/test/kotlin/mini/SampleStore.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ class SampleStore : Store<String>() {
66
fun updateState(s: String) {
77
newState = s
88
}
9+
10+
val storeSubscriptions = listeners
911
}

mini-rx2/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99

1010
implementation "io.reactivex.rxjava2:rxjava:$rx_version"
1111

12+
testApi files(project(":mini-common").sourceSets.test.runtimeClasspath) // Add mini-common test classes as dependency
1213
testCompileOnly 'junit:junit:4.12'
1314
testImplementation "org.amshove.kluent:kluent:1.44"
1415
}

mini-rx2/src/main/java/mini/rx/RxUtils.kt

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package mini.rx
22

3+
import io.reactivex.BackpressureStrategy
4+
import io.reactivex.BackpressureStrategy.BUFFER
35
import io.reactivex.Flowable
46
import io.reactivex.Observable
57
import io.reactivex.disposables.CompositeDisposable
68
import io.reactivex.disposables.Disposable
7-
import io.reactivex.processors.PublishProcessor
89
import io.reactivex.subjects.PublishSubject
910
import mini.Store
1011

@@ -57,20 +58,17 @@ class DefaultSubscriptionTracker : SubscriptionTracker {
5758
}
5859
}
5960

60-
fun <S> Store<S>.flowable(hotStart: Boolean = true): Flowable<S> {
61-
val processor = PublishProcessor.create<S>()
62-
val subscription = subscribe(hotStart = false) {
63-
processor.offer(it)
64-
}
65-
return processor.doOnTerminate { subscription.close() }
66-
.let { if (hotStart) it.startWith(state) else it }
67-
}
68-
6961
fun <S> Store<S>.observable(hotStart: Boolean = true): Observable<S> {
7062
val subject = PublishSubject.create<S>()
7163
val subscription = subscribe(hotStart = false) {
7264
subject.onNext(it)
7365
}
74-
return subject.doOnTerminate { subscription.close() }
66+
return subject
67+
.doOnDispose { subscription.close() }
68+
.doOnTerminate { subscription.close() }
7569
.let { if (hotStart) it.startWith(state) else it }
76-
}
70+
}
71+
72+
fun <S> Store<S>.flowable(hotStart: Boolean = true,
73+
backpressureStrategy: BackpressureStrategy = BUFFER): Flowable<S> =
74+
observable(hotStart).toFlowable(backpressureStrategy)

mini-rx2/src/test/kotlin/mini/rx/RxUtilsKtTest.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mini.rx
22

33
import mini.SampleStore
4+
import org.amshove.kluent.`should be empty`
45
import org.amshove.kluent.`should be equal to`
56
import org.junit.Test
67

@@ -27,6 +28,27 @@ class RxUtilsKtTest {
2728
sentState `should be equal to` "abc"
2829
}
2930

31+
@Test
32+
fun `flowable completes`() {
33+
val store = SampleStore()
34+
var sentState = ""
35+
val disposable = store.flowable(hotStart = false).subscribe {
36+
sentState = it
37+
}
38+
disposable.dispose() //Clear it
39+
store.updateState("abc")
40+
sentState `should be equal to` "" //No change should be made
41+
}
42+
43+
@Test
44+
fun `flowable disposes correctly`() {
45+
val store = SampleStore()
46+
val disposable = store.flowable(hotStart = false).subscribe()
47+
disposable.dispose() //Clear it
48+
49+
store.storeSubscriptions.`should be empty`()
50+
}
51+
3052
@Test
3153
fun `observable sends initial state`() {
3254
val store = SampleStore()
@@ -48,4 +70,25 @@ class RxUtilsKtTest {
4870
store.updateState("abc") //Set before subscribe
4971
sentState `should be equal to` "abc"
5072
}
73+
74+
@Test
75+
fun `observable completes`() {
76+
val store = SampleStore()
77+
var sentState = ""
78+
val disposable = store.observable(hotStart = false).subscribe {
79+
sentState = it
80+
}
81+
disposable.dispose() //Clear it
82+
store.updateState("abc")
83+
sentState `should be equal to` "" //No change should be made
84+
}
85+
86+
@Test
87+
fun `observable disposes correctly`() {
88+
val store = SampleStore()
89+
val disposable = store.observable(hotStart = false).subscribe()
90+
disposable.dispose() //Clear it
91+
92+
store.storeSubscriptions.`should be empty`()
93+
}
5194
}

mini-rx2/src/test/kotlin/mini/rx/SampleStore.kt

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)