Skip to content

Commit 32186c9

Browse files
committed
Added typedrest-reactive
1 parent 590430d commit 32186c9

20 files changed

+572
-2
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ Artifact group: [`net.typedrest`](https://mvnrepository.com/artifact/net.typedre
6363
[![typedrest](https://img.shields.io/maven-central/v/net.typedrest/typedrest.svg?label=typedrest)](https://mvnrepository.com/artifact/net.typedrest/typedrest)
6464
The main TypedRest library.
6565

66+
[![typedrest-reactive](https://img.shields.io/maven-central/v/net.typedrest/typedrest-reactive.svg?label=typedrest-reactive)](https://mvnrepository.com/artifact/net.typedrest/typedrest-reactive)
67+
Adds support for streaming with [ReactiveX (Rx)](http://reactivex.io/).
68+
Create endpoints using the types in the `net.typedrest.endpoints.reactive` package.
69+
6670
[![typedrest-serializers-jackson](https://img.shields.io/maven-central/v/net.typedrest/typedrest-serializers-jackson.svg?label=typedrest-serializers-jackson)](https://mvnrepository.com/artifact/net.typedrest/typedrest-serializers-jackson)
6771
Adds support for serializing using [Jackson](https://github.com/FasterXML/jackson) instead of [kotlinx.serialization](https://kotlinlang.org/docs/serialization.html).
6872
Pass `new JacksonJsonSerializer()` to the `EntryEndpoint` constructor.

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[versions]
22
okhttp3 = "4.12.0"
33
uri-templates = "2.1.7"
4+
rxkotlin = "3.0.1"
45
kotlinx-serialization = "1.8.0"
56
jackson = "2.19.0"
67
moshi = "1.15.2"
@@ -9,6 +10,7 @@ moshi = "1.15.2"
910
okhttp3 = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp3" }
1011
okhttp3-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp3" }
1112
uri-templates = { module = "com.damnhandy:handy-uri-templates", version.ref = "uri-templates" }
13+
rxkotlin = { module = "io.reactivex.rxjava3:rxkotlin", version.ref = "rxkotlin" }
1214
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" }
1315
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
1416
jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }

settings.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
rootProject.name = "typedrest"
2-
include("typedrest", "typedrest-serializers-jackson", "typedrest-serializers-moshi")
2+
include("typedrest", "typedrest-reactive", "typedrest-serializers-jackson", "typedrest-serializers-moshi")

typedrest-reactive/build.gradle.kts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
kotlin.jvmToolchain(21)
2+
tasks.test { useJUnitPlatform() }
3+
4+
dependencies {
5+
api(libs.okhttp3)
6+
api(libs.rxkotlin)
7+
api(project(":typedrest"))
8+
implementation(libs.kotlinx.serialization.json)
9+
10+
testImplementation(kotlin("test"))
11+
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
12+
testImplementation(libs.okhttp3.mockwebserver)
13+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.ObservableEmitter
4+
import java.time.Duration
5+
import kotlin.math.min
6+
7+
/**
8+
* Sleep for the specified duration unless the emitter is disposed in the meantime.
9+
*/
10+
fun ObservableEmitter<*>.sleep(duration: Duration) {
11+
val waitMillis = duration.toMillis()
12+
var slept = 0L
13+
while (slept < waitMillis && !isDisposed) {
14+
val step = min(500L, waitMillis - slept)
15+
Thread.sleep(step)
16+
slept += step
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import net.typedrest.endpoints.generic.ElementEndpoint
5+
import net.typedrest.endpoints.generic.GenericCollectionEndpoint
6+
import java.time.Duration
7+
8+
/**
9+
* Endpoint for a collection of [TEntity]s observable as an append-only stream.
10+
*
11+
* Use the more constrained [StreamingCollectionEndpoint] when possible.
12+
*
13+
* @param TEntity The type of individual elements in the collection.
14+
* @param TElementEndpoint The type of [ElementEndpoint] to provide for individual [TEntity]s.
15+
*/
16+
interface GenericStreamingCollectionEndpoint<TEntity : Any, TElementEndpoint : ElementEndpoint<TEntity>>
17+
: GenericCollectionEndpoint<TEntity, TElementEndpoint> {
18+
/**
19+
* Interval in which requests are sent to the server.
20+
* The server may update this interval via the `Retry-After` response header.
21+
*/
22+
var pollingInterval: Duration
23+
24+
/**
25+
* Provides an observable stream of elements.
26+
*
27+
* @param startIndex The index of the first element to return in the stream. Use negative values to start counting from the end of the stream.
28+
* @return A cold observable. HTTP communication only starts once [Observable.subscribe] is invoked
29+
*/
30+
fun getObservable(startIndex: Long = 0): Observable<TEntity>
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import io.reactivex.rxjava3.schedulers.Schedulers
5+
import net.typedrest.endpoints.Endpoint
6+
import net.typedrest.endpoints.generic.ElementEndpoint
7+
import net.typedrest.endpoints.generic.GenericCollectionEndpointImpl
8+
import net.typedrest.errors.ConflictException
9+
import net.typedrest.http.retryAfterDuration
10+
import okhttp3.Response
11+
import java.net.URI
12+
import java.time.Duration
13+
import java.time.Duration.ofSeconds
14+
15+
/**
16+
* Endpoint for a collection of [TEntity]s observable as an append-only stream.
17+
*
18+
* Use the more constrained [StreamingCollectionEndpointImpl] when possible.
19+
*
20+
* @param referrer The endpoint used to navigate to this one.
21+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s.
22+
* @param entityType The type of individual elements in the collection.
23+
* @param elementEndpointFactory The factory for constructing [TElementEndpoint]s to provide for individual elements.
24+
* @param TEntity The type of individual elements in the collection.
25+
* @param TElementEndpoint The type of [ElementEndpoint] to provide for individual [TEntity]s.
26+
*/
27+
open class GenericStreamingCollectionEndpointImpl<TEntity : Any, TElementEndpoint : ElementEndpoint<TEntity>>(
28+
referrer: Endpoint,
29+
relativeUri: URI,
30+
entityType: Class<TEntity>,
31+
elementEndpointFactory: (referrer: Endpoint, relativeUri: URI) -> TElementEndpoint
32+
) : GenericCollectionEndpointImpl<TEntity, TElementEndpoint>(referrer, relativeUri, entityType, elementEndpointFactory),
33+
GenericStreamingCollectionEndpoint<TEntity, TElementEndpoint> {
34+
/**
35+
* Creates a new streaming collection endpoint.
36+
*
37+
* @param referrer The endpoint used to navigate to this one.
38+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s. Add a `./` prefix here to imply a trailing slash on referrer's URI.
39+
* @param entityType The type of individual elements in the collection.
40+
* @param elementEndpointFactory The factory for constructing [TElementEndpoint]s to provide for individual elements.
41+
*/
42+
constructor(referrer: Endpoint, relativeUri: String, entityType: Class<TEntity>, elementEndpointFactory: (referrer: Endpoint, relativeUri: URI) -> TElementEndpoint) :
43+
this(referrer, URI(relativeUri), entityType, elementEndpointFactory)
44+
45+
override fun handle(response: Response): Response {
46+
val response = super.handle(response)
47+
response.retryAfterDuration()?.let { pollingInterval = it }
48+
return response
49+
}
50+
51+
override var pollingInterval: Duration = ofSeconds(3)
52+
53+
override fun getObservable(startIndex: Long): Observable<TEntity> =
54+
Observable.create<TEntity> { emitter ->
55+
var currentStartIndex: Long = startIndex
56+
57+
while (!emitter.isDisposed) {
58+
val response = try {
59+
if (currentStartIndex >= 0) readRange(from = currentStartIndex, to = null) // Offset
60+
else readRange(from = null, to = -currentStartIndex) // Tail
61+
} catch (_: ConflictException) {
62+
// No new data available yet, keep polling
63+
continue
64+
}
65+
66+
response.elements.forEach(emitter::onNext)
67+
68+
if (response.endReached) {
69+
emitter.onComplete()
70+
return@create
71+
}
72+
73+
// Continue polling for more data
74+
response.range?.to?.let { to -> currentStartIndex = to + 1 }
75+
?: return@create
76+
77+
emitter.sleep(pollingInterval)
78+
}
79+
}
80+
.subscribeOn(Schedulers.io())
81+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import net.typedrest.endpoints.generic.ElementEndpoint
5+
import java.time.Duration
6+
7+
/**
8+
* Endpoint for a resource that can be polled for state changes.
9+
*
10+
* @param TEntity The type of entity the endpoint represents.
11+
*/
12+
interface PollingEndpoint<TEntity : Any> : ElementEndpoint<TEntity> {
13+
/**
14+
* Interval in which requests are sent to the server.
15+
* The server may update this interval via the `Retry-After` response header.
16+
*/
17+
var pollingInterval: Duration
18+
19+
/**
20+
* Returns an [Observable] stream of entity states.
21+
* Consecutive items are emitted only when the server-supplied entity
22+
* differs from the previously emitted one according to [Any.equals].
23+
*/
24+
fun getObservable(): Observable<TEntity>
25+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import io.reactivex.rxjava3.schedulers.Schedulers
5+
import net.typedrest.endpoints.Endpoint
6+
import net.typedrest.endpoints.generic.ElementEndpointImpl
7+
import net.typedrest.http.retryAfterDuration
8+
import okhttp3.Response
9+
import java.net.URI
10+
import java.time.Duration
11+
import java.time.Duration.ofSeconds
12+
13+
/**
14+
* Endpoint for a resource that can be polled for state changes.
15+
*
16+
* @param referrer The endpoint used to navigate to this one.
17+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s.
18+
* @param entityType The type of entity the endpoint represents.
19+
* @param endCondition A check to determine whether the entity has reached its final state and no further polling is required.
20+
* @param TEntity The type of entity the endpoint represents.
21+
*/
22+
class PollingEndpointImpl<TEntity : Any>(
23+
referrer: Endpoint,
24+
relativeUri: URI,
25+
entityType: Class<TEntity>,
26+
private val endCondition: ((TEntity) -> Boolean)? = null
27+
) : ElementEndpointImpl<TEntity>(referrer, relativeUri, entityType), PollingEndpoint<TEntity> {
28+
/**
29+
* Creates a new polling endpoint.
30+
*
31+
* @param referrer The endpoint used to navigate to this one.
32+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s. Add a `./` prefix here to imply a trailing slash on referrer's URI.
33+
* @param entityType The type of entity the endpoint represents.
34+
* @param endCondition A check to determine whether the entity has reached its final state and no further polling is required.
35+
*/
36+
constructor(
37+
referrer: Endpoint,
38+
relativeUri: String,
39+
entityType: Class<TEntity>,
40+
endCondition: ((TEntity) -> Boolean)? = null
41+
) : this(referrer, URI(relativeUri), entityType, endCondition)
42+
43+
override fun handle(response: Response): Response {
44+
val response = super.handle(response)
45+
response.retryAfterDuration()?.let { pollingInterval = it }
46+
return response
47+
}
48+
49+
override var pollingInterval: Duration = ofSeconds(3)
50+
51+
override fun getObservable(): Observable<TEntity> =
52+
Observable.create<TEntity> { emitter ->
53+
while (!emitter.isDisposed) {
54+
val entity = read()
55+
emitter.onNext(entity)
56+
if (endCondition?.invoke(entity) == true) break
57+
58+
emitter.sleep(pollingInterval)
59+
}
60+
61+
emitter.onComplete()
62+
}
63+
.distinctUntilChanged()
64+
.subscribeOn(Schedulers.io())
65+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import net.typedrest.endpoints.generic.ElementEndpoint
4+
5+
/**
6+
* Endpoint for a collection of [TEntity]s observable as an append-only stream.
7+
*
8+
* Use [GenericStreamingCollectionEndpoint] instead if you wish to customize the element endpoint type.
9+
*
10+
* @param TEntity The type of entity the endpoint represents.
11+
*/
12+
interface StreamingCollectionEndpoint<TEntity : Any>
13+
: GenericStreamingCollectionEndpoint<TEntity, ElementEndpoint<TEntity>>

0 commit comments

Comments
 (0)