Skip to content

Commit 66791ac

Browse files
committed
feat: basic support to reassign partitions
1 parent 624bdf9 commit 66791ac

File tree

4 files changed

+320
-2
lines changed

4 files changed

+320
-2
lines changed

src/client/controller.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@ use crate::{
1212
protocol::{
1313
error::Error as ProtocolError,
1414
messages::{
15-
CreateTopicRequest, CreateTopicsRequest, ElectLeadersRequest, ElectLeadersTopicRequest,
15+
AlterPartitionReassignmentsPartitionRequest, AlterPartitionReassignmentsRequest,
16+
AlterPartitionReassignmentsTopicRequest, CreateTopicRequest, CreateTopicsRequest,
17+
ElectLeadersRequest, ElectLeadersTopicRequest,
18+
},
19+
primitives::{
20+
Array, CompactArray, CompactString, Int16, Int32, Int8, NullableString, String_,
21+
TaggedFields,
1622
},
17-
primitives::{Array, Int16, Int32, Int8, NullableString, String_},
1823
},
1924
validation::ExactlyOne,
2025
};
@@ -93,6 +98,62 @@ impl ControllerClient {
9398
.await
9499
}
95100

101+
/// Re-assign partitions.
102+
pub async fn reassign_partitions(
103+
&self,
104+
topic: impl Into<String> + Send,
105+
partition: i32,
106+
replicas: Vec<i32>,
107+
timeout_ms: i32,
108+
) -> Result<()> {
109+
let request = &AlterPartitionReassignmentsRequest {
110+
topics: vec![AlterPartitionReassignmentsTopicRequest {
111+
name: CompactString(topic.into()),
112+
partitions: vec![AlterPartitionReassignmentsPartitionRequest {
113+
partition_index: Int32(partition),
114+
replicas: CompactArray(Some(replicas.into_iter().map(Int32).collect())),
115+
tagged_fields: TaggedFields::default(),
116+
}],
117+
tagged_fields: TaggedFields::default(),
118+
}],
119+
timeout_ms: Int32(timeout_ms),
120+
tagged_fields: TaggedFields::default(),
121+
};
122+
123+
maybe_retry(
124+
&self.backoff_config,
125+
self,
126+
"reassign_partitions",
127+
|| async move {
128+
let broker = self.get().await?;
129+
let response = broker.request(request).await?;
130+
131+
if let Some(protocol_error) = response.error {
132+
return Err(Error::ServerError(protocol_error, Default::default()));
133+
}
134+
135+
let topic = response
136+
.responses
137+
.exactly_one()
138+
.map_err(Error::exactly_one_topic)?;
139+
140+
let partition = topic
141+
.partitions
142+
.exactly_one()
143+
.map_err(Error::exactly_one_partition)?;
144+
145+
match partition.error {
146+
None => Ok(()),
147+
Some(protocol_error) => Err(Error::ServerError(
148+
protocol_error,
149+
partition.error_message.0.unwrap_or_default(),
150+
)),
151+
}
152+
},
153+
)
154+
.await
155+
}
156+
96157
/// Elect leaders for given topic and partition.
97158
pub async fn elect_leaders(
98159
&self,
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
use std::io::{Read, Write};
2+
3+
use crate::protocol::{
4+
api_key::ApiKey,
5+
api_version::{ApiVersion, ApiVersionRange},
6+
error::Error,
7+
messages::{read_compact_versioned_array, write_compact_versioned_array},
8+
primitives::{CompactArray, CompactNullableString, CompactString, Int16, Int32, TaggedFields},
9+
traits::{ReadType, WriteType},
10+
};
11+
12+
use super::{
13+
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
14+
};
15+
16+
#[derive(Debug)]
17+
pub struct AlterPartitionReassignmentsRequest {
18+
/// The time in ms to wait for the request to complete.
19+
pub timeout_ms: Int32,
20+
21+
/// The topics to reassign.
22+
pub topics: Vec<AlterPartitionReassignmentsTopicRequest>,
23+
24+
/// The tagged fields.
25+
pub tagged_fields: TaggedFields,
26+
}
27+
28+
impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsRequest
29+
where
30+
W: Write,
31+
{
32+
fn write_versioned(
33+
&self,
34+
writer: &mut W,
35+
version: ApiVersion,
36+
) -> Result<(), WriteVersionedError> {
37+
let v = version.0 .0;
38+
assert!(v <= 0);
39+
40+
self.timeout_ms.write(writer)?;
41+
write_compact_versioned_array(writer, version, Some(&self.topics))?;
42+
self.tagged_fields.write(writer)?;
43+
44+
Ok(())
45+
}
46+
}
47+
48+
impl RequestBody for AlterPartitionReassignmentsRequest {
49+
type ResponseBody = AlterPartitionReassignmentsResponse;
50+
51+
const API_KEY: ApiKey = ApiKey::AlterPartitionReassignments;
52+
53+
/// All versions.
54+
const API_VERSION_RANGE: ApiVersionRange =
55+
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(0)));
56+
57+
const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(0));
58+
}
59+
60+
#[derive(Debug)]
61+
pub struct AlterPartitionReassignmentsTopicRequest {
62+
/// The topic name.
63+
pub name: CompactString,
64+
65+
/// The partitions to reassign.
66+
pub partitions: Vec<AlterPartitionReassignmentsPartitionRequest>,
67+
68+
/// The tagged fields.
69+
pub tagged_fields: TaggedFields,
70+
}
71+
72+
impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsTopicRequest
73+
where
74+
W: Write,
75+
{
76+
fn write_versioned(
77+
&self,
78+
writer: &mut W,
79+
version: ApiVersion,
80+
) -> Result<(), WriteVersionedError> {
81+
let v = version.0 .0;
82+
assert!(v <= 0);
83+
84+
self.name.write(writer)?;
85+
write_compact_versioned_array(writer, version, Some(&self.partitions))?;
86+
self.tagged_fields.write(writer)?;
87+
88+
Ok(())
89+
}
90+
}
91+
92+
#[derive(Debug)]
93+
pub struct AlterPartitionReassignmentsPartitionRequest {
94+
/// The partition index.
95+
pub partition_index: Int32,
96+
97+
/// The replicas to place the partitions on, or null to cancel a pending reassignment for this partition.
98+
pub replicas: CompactArray<Int32>,
99+
100+
/// The tagged fields.
101+
pub tagged_fields: TaggedFields,
102+
}
103+
104+
impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsPartitionRequest
105+
where
106+
W: Write,
107+
{
108+
fn write_versioned(
109+
&self,
110+
writer: &mut W,
111+
version: ApiVersion,
112+
) -> Result<(), WriteVersionedError> {
113+
let v = version.0 .0;
114+
assert!(v <= 0);
115+
116+
self.partition_index.write(writer)?;
117+
self.replicas.write(writer)?;
118+
self.tagged_fields.write(writer)?;
119+
120+
Ok(())
121+
}
122+
}
123+
124+
#[derive(Debug)]
125+
pub struct AlterPartitionReassignmentsResponse {
126+
/// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
127+
/// request did not violate any quota.
128+
pub throttle_time_ms: Int32,
129+
130+
/// The top-level error code, or 0 if there was no error.
131+
pub error: Option<Error>,
132+
133+
/// The top-level error message, or null if there was no error.
134+
pub error_message: CompactNullableString,
135+
136+
/// The responses to topics to reassign.
137+
pub responses: Vec<AlterPartitionReassignmentsTopicResponse>,
138+
139+
/// The tagged fields.
140+
pub tagged_fields: TaggedFields,
141+
}
142+
143+
impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsResponse
144+
where
145+
R: Read,
146+
{
147+
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
148+
let v = version.0 .0;
149+
assert!(v <= 0);
150+
151+
let throttle_time_ms = Int32::read(reader)?;
152+
let error = Error::new(Int16::read(reader)?.0);
153+
let error_message = CompactNullableString::read(reader)?;
154+
let responses = read_compact_versioned_array(reader, version)?.unwrap_or_default();
155+
let tagged_fields = TaggedFields::read(reader)?;
156+
157+
Ok(Self {
158+
throttle_time_ms,
159+
error,
160+
error_message,
161+
responses,
162+
tagged_fields,
163+
})
164+
}
165+
}
166+
167+
#[derive(Debug)]
168+
pub struct AlterPartitionReassignmentsTopicResponse {
169+
/// The topic name
170+
pub name: CompactString,
171+
172+
/// The responses to partitions to reassign
173+
pub partitions: Vec<AlterPartitionReassignmentsPartitionResponse>,
174+
175+
/// The tagged fields.
176+
pub tagged_fields: TaggedFields,
177+
}
178+
179+
impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsTopicResponse
180+
where
181+
R: Read,
182+
{
183+
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
184+
let v = version.0 .0;
185+
assert!(v <= 0);
186+
187+
let name = CompactString::read(reader)?;
188+
let partitions = read_compact_versioned_array(reader, version)?.unwrap_or_default();
189+
let tagged_fields = TaggedFields::read(reader)?;
190+
191+
Ok(Self {
192+
name,
193+
partitions,
194+
tagged_fields,
195+
})
196+
}
197+
}
198+
199+
#[derive(Debug)]
200+
pub struct AlterPartitionReassignmentsPartitionResponse {
201+
/// The partition index.
202+
pub partition_index: Int32,
203+
204+
/// The error code for this partition, or 0 if there was no error.
205+
pub error: Option<Error>,
206+
207+
/// The error message for this partition, or null if there was no error.
208+
pub error_message: CompactNullableString,
209+
210+
/// The tagged fields.
211+
pub tagged_fields: TaggedFields,
212+
}
213+
214+
impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsPartitionResponse
215+
where
216+
R: Read,
217+
{
218+
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
219+
let v = version.0 .0;
220+
assert!(v <= 0);
221+
222+
let partition_index = Int32::read(reader)?;
223+
let error = Error::new(Int16::read(reader)?.0);
224+
let error_message = CompactNullableString::read(reader)?;
225+
let tagged_fields = TaggedFields::read(reader)?;
226+
227+
Ok(Self {
228+
partition_index,
229+
error,
230+
error_message,
231+
tagged_fields,
232+
})
233+
}
234+
}

src/protocol/messages/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use super::{
1616
vec_builder::VecBuilder,
1717
};
1818

19+
mod alter_partition_reassignments;
20+
pub use alter_partition_reassignments::*;
1921
mod api_versions;
2022
pub use api_versions::*;
2123
mod constants;

tests/client.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,27 @@ async fn test_delete_records() {
541541
);
542542
}
543543

544+
#[tokio::test]
545+
async fn test_reassign_partitions() {
546+
maybe_start_logging();
547+
548+
let connection = maybe_skip_kafka_integration!();
549+
let topic_name = random_topic_name();
550+
551+
let client = ClientBuilder::new(connection).build().await.unwrap();
552+
553+
let controller_client = client.controller_client().unwrap();
554+
controller_client
555+
.create_topic(&topic_name, 1, 1, 5_000)
556+
.await
557+
.unwrap();
558+
559+
controller_client
560+
.reassign_partitions(&topic_name, 0, vec![0, 1], 5_000)
561+
.await
562+
.unwrap();
563+
}
564+
544565
#[tokio::test]
545566
async fn test_elect_leaders() {
546567
maybe_start_logging();

0 commit comments

Comments
 (0)