Skip to content

Commit cd2de1b

Browse files
committed
Added typedrest-reactive
1 parent c53f2d1 commit cd2de1b

File tree

14 files changed

+337
-2
lines changed

14 files changed

+337
-2
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ Artifact group: [`net.typedrest`](https://mvnrepository.com/artifact/net.typedre
6363
[![typedrest](https://img.shields.io/maven-central/v/net.typedrest/typedrest.svg?label=typedrest)](https://mvnrepository.com/artifact/net.typedrest/typedrest)
6464
The main TypedRest library.
6565

66+
[![typedrest-reactive](https://img.shields.io/maven-central/v/net.typedrest/typedrest-reactive.svg?label=typedrest-reactive)](https://mvnrepository.com/artifact/net.typedrest/typedrest-reactive)
67+
Adds support for streaming with [ReactiveX (Rx)](http://reactivex.io/).
68+
Create endpoints using the types in the `net.typedrest.endpoints.reactive` package.
69+
6670
[![typedrest-serializers-jackson](https://img.shields.io/maven-central/v/net.typedrest/typedrest-serializers-jackson.svg?label=typedrest-serializers-jackson)](https://mvnrepository.com/artifact/net.typedrest/typedrest-serializers-jackson)
6771
Adds support for serializing using [Jackson](https://github.com/FasterXML/jackson) instead of [kotlinx.serialization](https://kotlinlang.org/docs/serialization.html).
6872
Pass `new JacksonJsonSerializer()` to the `EntryEndpoint` constructor.

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[versions]
22
okhttp3 = "4.12.0"
33
uri-templates = "2.1.7"
4+
rxkotlin = "3.0.1"
45
kotlinx-serialization = "1.8.0"
56
jackson = "2.19.0"
67
moshi = "1.15.2"
@@ -9,6 +10,7 @@ moshi = "1.15.2"
910
okhttp3 = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp3" }
1011
okhttp3-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp3" }
1112
uri-templates = { module = "com.damnhandy:handy-uri-templates", version.ref = "uri-templates" }
13+
rxkotlin = { module = "io.reactivex.rxjava3:rxkotlin", version.ref = "rxkotlin" }
1214
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" }
1315
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
1416
jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }

settings.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
rootProject.name = "typedrest"
2-
include("typedrest", "typedrest-serializers-jackson", "typedrest-serializers-moshi")
2+
include("typedrest", "typedrest-reactive", "typedrest-serializers-jackson", "typedrest-serializers-moshi")

typedrest-reactive/build.gradle.kts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
kotlin.jvmToolchain(21)
2+
tasks.test { useJUnitPlatform() }
3+
4+
dependencies {
5+
api(libs.okhttp3)
6+
api(libs.rxkotlin)
7+
api(project(":typedrest"))
8+
9+
testImplementation(kotlin("test"))
10+
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
11+
testImplementation(libs.okhttp3.mockwebserver)
12+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import net.typedrest.endpoints.generic.ElementEndpoint
5+
import java.time.Duration
6+
7+
/**
8+
* Endpoint for a resource that can be polled for state changes.
9+
*
10+
* @param TEntity The type of entity the endpoint represents.
11+
*/
12+
interface PollingEndpoint<TEntity : Any> : ElementEndpoint<TEntity> {
13+
/**
14+
* Interval in which requests are sent to the server.
15+
* The server may update this interval via the `Retry-After` response header.
16+
*/
17+
var pollingInterval: Duration
18+
19+
/**
20+
* Returns an [Observable] stream of entity states.
21+
* Consecutive items are emitted only when the server-supplied entity
22+
* differs from the previously emitted one according to [Any.equals].
23+
*/
24+
fun getObservable(): Observable<TEntity>
25+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import io.reactivex.rxjava3.schedulers.Schedulers
5+
import net.typedrest.endpoints.Endpoint
6+
import net.typedrest.endpoints.generic.ElementEndpointImpl
7+
import okhttp3.Response
8+
import java.net.URI
9+
import java.time.Duration
10+
import java.time.Duration.ofSeconds
11+
import java.time.ZonedDateTime
12+
import java.time.format.DateTimeFormatter
13+
import kotlin.math.min
14+
import java.time.Instant
15+
16+
/**
17+
* Endpoint for a resource that can be polled for state changes.
18+
*
19+
* @param referrer The endpoint used to navigate to this one.
20+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s.
21+
* @param entityType The type of entity the endpoint represents.
22+
* @param endCondition A check to determine whether the entity has reached its final state and no further polling is required.
23+
* @param TEntity The type of entity the endpoint represents.
24+
*/
25+
class PollingEndpointImpl<TEntity : Any>(
26+
referrer: Endpoint,
27+
relativeUri: URI,
28+
entityType: Class<TEntity>,
29+
private val endCondition: ((TEntity) -> Boolean)? = null
30+
) : ElementEndpointImpl<TEntity>(referrer, relativeUri, entityType), PollingEndpoint<TEntity> {
31+
/**
32+
* Creates a new polling endpoint.
33+
*
34+
* @param referrer The endpoint used to navigate to this one.
35+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s. Add a `./` prefix here to imply a trailing slash on referrer's URI.
36+
* @param entityType The type of entity the endpoint represents.
37+
* @param endCondition A check to determine whether the entity has reached its final state and no further polling is required.
38+
*/
39+
constructor(
40+
referrer: Endpoint,
41+
relativeUri: String,
42+
entityType: Class<TEntity>,
43+
endCondition: ((TEntity) -> Boolean)? = null
44+
) : this(referrer, URI(relativeUri), entityType, endCondition)
45+
46+
override fun handle(response: Response): Response {
47+
val response = super.handle(response)
48+
response.header("Retry-After")?.let { value ->
49+
val newInterval = value.toLongOrNull()
50+
?.takeIf { it > 0 }
51+
?.let { ofSeconds(it) }
52+
?: runCatching {
53+
val date = DateTimeFormatter.RFC_1123_DATE_TIME.parse(value, ZonedDateTime::from)
54+
Duration.between(Instant.now(), date.toInstant())
55+
}.getOrNull()
56+
57+
if (newInterval != null && !newInterval.isNegative)
58+
pollingInterval = newInterval
59+
}
60+
61+
return response
62+
}
63+
64+
override var pollingInterval: Duration = ofSeconds(3)
65+
66+
override fun getObservable(): Observable<TEntity> =
67+
Observable.create<TEntity> { emitter ->
68+
while (!emitter.isDisposed) {
69+
val entity = read()
70+
emitter.onNext(entity)
71+
if (endCondition?.invoke(entity) == true) break
72+
73+
// TODO: Make non-blocking
74+
val waitMillis = pollingInterval.toMillis()
75+
var slept = 0L
76+
while (slept < waitMillis && !emitter.isDisposed) {
77+
val step = min(500L, waitMillis - slept)
78+
Thread.sleep(step)
79+
slept += step
80+
}
81+
}
82+
83+
emitter.onComplete()
84+
}
85+
.distinctUntilChanged()
86+
.subscribeOn(Schedulers.io())
87+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import net.typedrest.endpoints.Endpoint
5+
6+
/**
7+
* Endpoint for a stream of [TEntity]s.
8+
*
9+
* @param TEntity The type of individual elements in the stream.
10+
*/
11+
interface StreamingEndpoint<TEntity : Any> : Endpoint {
12+
/**
13+
* Provides an [Observable] stream of entities.
14+
*
15+
* @return A cold observable. HTTP communication only starts once [Observable.subscribe] is invoked.
16+
*/
17+
fun getObservable(): Observable<TEntity>
18+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package net.typedrest.endpoints.reactive
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import io.reactivex.rxjava3.schedulers.Schedulers
5+
import net.typedrest.endpoints.Endpoint
6+
import net.typedrest.endpoints.AbstractEndpoint
7+
import net.typedrest.http.entitySequence
8+
import net.typedrest.http.uri
9+
import okhttp3.Request
10+
import okio.ByteString.Companion.encodeUtf8
11+
import java.net.URI
12+
13+
/**
14+
* Endpoint for a stream of [TEntity]s using a persistent HTTP connection.
15+
*
16+
* @param referrer The endpoint used to navigate to this one.
17+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s.
18+
* @param entityType The type of individual elements in the stream.
19+
* @param separator The character sequence used to detect that a new element starts in an HTTP stream.
20+
* @param TEntity The type of individual elements in the stream.
21+
*/
22+
class StreamingEndpointImpl<TEntity : Any>(
23+
referrer: Endpoint,
24+
relativeUri: URI,
25+
private val entityType: Class<TEntity>,
26+
private val separator: String = "\n"
27+
) : AbstractEndpoint(referrer, relativeUri), StreamingEndpoint<TEntity> {
28+
/**
29+
* Creates a new streaming endpoint.
30+
*
31+
* @param referrer The endpoint used to navigate to this one.
32+
* @param relativeUri The URI of this endpoint relative to the [referrer]'s. Add a `./` prefix here to imply a trailing slash on referrer's URI.
33+
* @param entityType The type of individual elements in the stream.
34+
* @param separator The character sequence used to detect that a new element starts in an HTTP stream.
35+
*/
36+
constructor(
37+
referrer: Endpoint,
38+
relativeUri: String,
39+
entityType: Class<TEntity>,
40+
separator: String = "\n"
41+
) : this(referrer, URI(relativeUri), entityType, separator)
42+
43+
/**
44+
* The size of the buffer used to collect data for deserialization in bytes.
45+
*/
46+
var bufferSize: Int = DEFAULT_BUFFER_SIZE
47+
48+
override fun getObservable(): Observable<TEntity> = Observable.create<TEntity> { emitter ->
49+
val call = httpClient.newCall(Request.Builder().get().uri(uri).build())
50+
emitter.setCancellable(call::cancel)
51+
52+
call.execute()
53+
.use { response ->
54+
response
55+
.entitySequence(serializers.first(), separator.encodeUtf8(), bufferSize, entityType)
56+
.takeWhile { !emitter.isDisposed }
57+
.forEach(emitter::onNext)
58+
}
59+
60+
emitter.onComplete()
61+
}.subscribeOn(Schedulers.io())
62+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package net.typedrest.http
2+
3+
import net.typedrest.serializers.Serializer
4+
import okhttp3.MediaType
5+
import okhttp3.Response
6+
import okhttp3.ResponseBody.Companion.toResponseBody
7+
import okio.Buffer
8+
import okio.ByteString
9+
10+
/**
11+
* Streams deserialized entities from an HTTP response body.
12+
*
13+
* @param serializer Used to deserialize entities in the body.
14+
* @param separator The character sequence used to detect that a new element starts in an HTTP stream.
15+
* @param bufferSize The size of the buffer used to collect data for deserialization in bytes.
16+
* @param entityType The entity type this stream provides.
17+
* @param TEntity The entity type this stream provides.
18+
*/
19+
fun <TEntity : Any> Response.entitySequence(
20+
serializer: Serializer,
21+
separator: ByteString,
22+
bufferSize: Int,
23+
entityType: Class<TEntity>
24+
): Sequence<TEntity> = sequence {
25+
val src = body!!.source()
26+
val buf = Buffer()
27+
28+
while (src.read(buf, bufferSize.toLong()) != -1L || !buf.exhausted()) {
29+
var separatorIndex = buf.indexOf(separator)
30+
while (separatorIndex != -1L) {
31+
val bytes = buf.readByteArray(separatorIndex)
32+
buf.skip(separator.size.toLong())
33+
34+
deserialize(serializer, bytes, entityType)?.let { yield(it) }
35+
separatorIndex = buf.indexOf(separator)
36+
}
37+
}
38+
39+
if (!buf.exhausted()) {
40+
deserialize(serializer, buf.readByteArray(), entityType)?.let { yield(it) }
41+
}
42+
}
43+
44+
private fun <T : Any> deserialize(
45+
serializer: Serializer,
46+
bytes: ByteArray,
47+
type: Class<T>
48+
): T? {
49+
if (bytes.isEmpty()) return null
50+
val mediaType: MediaType? = serializer.supportedMediaTypes.firstOrNull()
51+
return serializer.deserialize(bytes.toResponseBody(mediaType), type)
52+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package net.typedrest
2+
3+
data class MockEntity(val id: Long, val name: String)

0 commit comments

Comments
 (0)