1
1
/*
2
- * Copyright 2016-2021 the original author or authors.
2
+ * Copyright 2025 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
6
6
* You may obtain a copy of the License at
7
7
*
8
- * https ://www.apache.org/licenses/LICENSE-2.0
8
+ * http ://www.apache.org/licenses/LICENSE-2.0
9
9
*
10
10
* Unless required by applicable law or agreed to in writing, software
11
11
* distributed under the License is distributed on an "AS IS" BASIS,
15
15
*/
16
16
package example .springdata .redis .commands ;
17
17
18
+ import example .springdata .redis .RedisTestConfiguration ;
19
+ import reactor .core .publisher .Flux ;
20
+ import reactor .core .scheduler .Schedulers ;
21
+ import reactor .test .StepVerifier ;
22
+
18
23
import java .nio .ByteBuffer ;
19
24
import java .time .Duration ;
20
25
import java .util .Collections ;
21
26
import java .util .UUID ;
22
- import java .util .concurrent .ExecutorService ;
23
- import java .util .concurrent .Executors ;
24
27
25
- import example .springdata .redis .RedisTestConfiguration ;
26
28
import org .junit .jupiter .api .BeforeEach ;
27
29
import org .junit .jupiter .api .Test ;
30
+
28
31
import org .springframework .beans .factory .annotation .Autowired ;
29
32
import org .springframework .boot .test .context .SpringBootTest ;
30
33
import org .springframework .data .redis .connection .ReactiveRedisConnection ;
33
36
import org .springframework .data .redis .serializer .RedisSerializer ;
34
37
import org .springframework .data .redis .serializer .StringRedisSerializer ;
35
38
import org .springframework .data .redis .util .ByteUtils ;
36
- import reactor .core .publisher .Flux ;
37
- import reactor .test .StepVerifier ;
38
39
39
40
/**
40
41
* Show usage of reactive operations on Redis keys using low level API provided by
@@ -48,7 +49,6 @@ class KeyCommandsTests {
48
49
49
50
private static final String PREFIX = KeyCommandsTests .class .getSimpleName ();
50
51
private static final String KEY_PATTERN = PREFIX + "*" ;
51
- private final ExecutorService executor = Executors .newSingleThreadExecutor ();
52
52
53
53
@ Autowired ReactiveRedisConnectionFactory connectionFactory ;
54
54
@@ -75,7 +75,7 @@ void iterateOverKeysMatchingPrefixUsingKeysCommand() {
75
75
.flatMapMany (Flux ::fromIterable ) //
76
76
.doOnNext (byteBuffer -> System .out .println (toString (byteBuffer ))) //
77
77
.count () //
78
- .doOnSuccess (count -> System .out .println ( String . format ( "Total No. found: %s" , count ) ));
78
+ .doOnSuccess (count -> System .out .printf ( "Total No. found: %s%n " , count ));
79
79
80
80
keyCount .as (StepVerifier ::create ).expectNext (50L ).verifyComplete ();
81
81
}
@@ -86,8 +86,8 @@ void iterateOverKeysMatchingPrefixUsingKeysCommand() {
86
86
@ Test
87
87
void storeToListAndPop () {
88
88
89
- var popResult = connection .listCommands ()
90
- . brPop ( Collections . singletonList ( ByteBuffer . wrap ( "list" . getBytes ())), Duration .ofSeconds (5 ));
89
+ var popResult = connection .listCommands (). brPop ( Collections . singletonList ( ByteBuffer . wrap ( "list" . getBytes ())),
90
+ Duration .ofSeconds (5 ));
91
91
92
92
var llen = connection .listCommands ().lLen (ByteBuffer .wrap ("list" .getBytes ()));
93
93
@@ -96,23 +96,21 @@ void storeToListAndPop() {
96
96
.flatMap (l -> popResult ) //
97
97
.doOnNext (result -> System .out .println (toString (result .getValue ()))) //
98
98
.flatMap (result -> llen ) //
99
- .doOnNext (count -> System .out .println ( String . format ( "Total items in list left: %s" , count ) ));//
99
+ .doOnNext (count -> System .out .printf ( "Total items in list left: %s%n " , count ));//
100
100
101
101
popAndLlen .as (StepVerifier ::create ).expectNext (0L ).verifyComplete ();
102
102
}
103
103
104
104
private void generateRandomKeys (int nrKeys ) {
105
105
106
- executor .execute (() -> {
107
- var keyFlux = Flux .range (0 , nrKeys ).map (i -> (PREFIX + "-" + i ));
108
-
109
- var generator = keyFlux .map (String ::getBytes ).map (ByteBuffer ::wrap ) //
110
- .map (key -> SetCommand .set (key ) //
111
- .value (ByteBuffer .wrap (UUID .randomUUID ().toString ().getBytes ())));
106
+ var keyFlux = Flux .range (0 , nrKeys ).map (i -> (PREFIX + "-" + i )) //
107
+ .publishOn (Schedulers .single ()) //
108
+ .map (it -> SetCommand .set (ByteBuffer .wrap (it .getBytes ())) //
109
+ .value (ByteBuffer .wrap (UUID .randomUUID ().toString ().getBytes ())));
112
110
113
- connection .stringCommands ().set (generator ).as (StepVerifier ::create ) //
111
+ connection .stringCommands ().set (keyFlux ).as (StepVerifier ::create ) //
114
112
.expectNextCount (nrKeys ) //
115
- .verifyComplete ();});
113
+ .verifyComplete ();
116
114
117
115
}
118
116
0 commit comments