Skip to content

Commit d30bdc9

Browse files
committed
add createPartitions test
1 parent 8cd004d commit d30bdc9

File tree

1 file changed

+104
-5
lines changed

1 file changed

+104
-5
lines changed

app/src/test/java/io/hstream/kafka/testing/TopicTest.java

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package io.hstream.kafka.testing;
22

3-
import static org.assertj.core.api.Assertions.assertThatNoException;
4-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3+
import static org.assertj.core.api.Assertions.*;
54

65
import java.util.Collections;
76
import java.util.List;
87
import java.util.Properties;
98
import java.util.concurrent.ExecutionException;
10-
import org.apache.kafka.clients.admin.AdminClient;
11-
import org.apache.kafka.clients.admin.AdminClientConfig;
12-
import org.apache.kafka.clients.admin.NewTopic;
9+
import org.apache.kafka.clients.admin.*;
10+
import org.apache.kafka.common.errors.InvalidPartitionsException;
11+
import org.apache.kafka.common.errors.InvalidRequestException;
1312
import org.junit.jupiter.api.AfterEach;
1413
import org.junit.jupiter.api.BeforeEach;
1514
import org.junit.jupiter.api.Test;
@@ -68,4 +67,104 @@ void testCreateTopic() {
6867
.get());
6968
}
7069
}
70+
71+
@Test
72+
void testCreatePartition() throws ExecutionException, InterruptedException {
73+
NewTopic topic1 = new NewTopic("test_create_partition_topic1", 1, (short) 1);
74+
NewTopic topic2 = new NewTopic("test_create_partition_topic2", 3, (short) 3);
75+
try {
76+
assertThatNoException()
77+
.as("create topics should success")
78+
.isThrownBy(() -> client.createTopics(List.of(topic1, topic2)).all().get());
79+
80+
assertThatThrownBy(
81+
() ->
82+
client
83+
.createPartitions(
84+
Collections.singletonMap(
85+
topic1.name(),
86+
NewPartitions.increaseTo(2, Collections.singletonList(List.of(0)))))
87+
.all()
88+
.get())
89+
.as("create partitions with assignment should fail")
90+
.cause()
91+
.isInstanceOf(InvalidRequestException.class);
92+
assertThatThrownBy(
93+
() ->
94+
client
95+
.createPartitions(
96+
Collections.singletonMap(topic1.name(), NewPartitions.increaseTo(1)))
97+
.all()
98+
.get())
99+
.as("new partition count should be greater than old partition count")
100+
.cause()
101+
.isInstanceOf(InvalidPartitionsException.class);
102+
assertThatThrownBy(
103+
() ->
104+
client
105+
.createPartitions(
106+
Collections.singletonMap(topic2.name(), NewPartitions.increaseTo(2)))
107+
.all()
108+
.get())
109+
.as("new partition count should be greater than old partition count")
110+
.cause()
111+
.isInstanceOf(InvalidPartitionsException.class);
112+
assertThatNoException()
113+
.as("create partitions should success")
114+
.isThrownBy(
115+
() ->
116+
client
117+
.createPartitions(
118+
Collections.singletonMap(topic2.name(), NewPartitions.increaseTo(6)))
119+
.all()
120+
.get());
121+
assertThatNoException()
122+
.as("create partitions with validateOnly should success")
123+
.isThrownBy(
124+
() ->
125+
client
126+
.createPartitions(
127+
Collections.singletonMap(topic1.name(), NewPartitions.increaseTo(6)),
128+
new CreatePartitionsOptions().validateOnly(true))
129+
.all()
130+
.get());
131+
132+
assertThat(
133+
client
134+
.describeTopics(List.of(topic1.name()))
135+
.allTopicNames()
136+
.get()
137+
.get(topic1.name())
138+
.partitions()
139+
.size())
140+
.as("partition count for topic1 should be 1")
141+
.isEqualTo(1);
142+
143+
var partitions =
144+
client
145+
.describeTopics(List.of(topic2.name()))
146+
.allTopicNames()
147+
.get()
148+
.get(topic2.name())
149+
.partitions();
150+
assertThat(partitions.size()).as("partition count should be 6").isEqualTo(6);
151+
// check failed because https://emqx.atlassian.net/browse/HS-4482
152+
// for (var partition : partitions) {
153+
// logger.info("partition: {}, replicas: {}", partition.partition(),
154+
// partition.replicas().size());
155+
// assertThat(partition.replicas().size()).as("replica count should be
156+
// 3").isEqualTo(3);
157+
// }
158+
159+
} finally {
160+
assertThatNoException()
161+
.isThrownBy(
162+
() ->
163+
client
164+
.deleteTopics(
165+
List.of("test_create_partition_topic1", "test_create_partition_topic2"))
166+
.all()
167+
.get());
168+
}
169+
}
71170
}

0 commit comments

Comments
 (0)