Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cluster admin operations #157

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -267,6 +267,10 @@ jobs:
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
# Kafka supports ElectLeaders
TEST_ELECT_LEADERS: 1
# Kafka supports ReassignPartitions
TEST_REASSIGN_PARTITIONS: 1
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
131 changes: 129 additions & 2 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -11,12 +11,32 @@ use crate::{
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::{Int16, Int32, NullableString, String_},
messages::{
AlterPartitionReassignmentsPartitionRequest, AlterPartitionReassignmentsRequest,
AlterPartitionReassignmentsTopicRequest, CreateTopicRequest, CreateTopicsRequest,
ElectLeadersRequest, ElectLeadersTopicRequest,
},
primitives::{
Array, CompactArray, CompactString, Int16, Int32, Int8, NullableString, String_,
TaggedFields,
},
},
validation::ExactlyOne,
};

/// Election type of [`ControllerClient::elect_leaders`].
///
/// The names in this enum are borrowed from the
/// [Kafka source code](https://github.com/a0x8o/kafka/blob/5383311a5cfbdaf147411004106449e3ad8081fb/core/src/main/scala/kafka/controller/KafkaController.scala#L2186-L2194>).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ElectionType {
/// Elects the preferred replica.
Preferred,

/// Elects the first live replica if there are no in-sync replica.
Unclean,
}

#[derive(Debug)]
pub struct ControllerClient {
brokers: Arc<BrokerConnector>,
@@ -78,6 +98,113 @@ impl ControllerClient {
.await
}

/// Re-assign partitions.
pub async fn reassign_partitions(
&self,
topic: impl Into<String> + Send,
partition: i32,
replicas: Vec<i32>,
timeout_ms: i32,
) -> Result<()> {
let request = &AlterPartitionReassignmentsRequest {
topics: vec![AlterPartitionReassignmentsTopicRequest {
name: CompactString(topic.into()),
partitions: vec![AlterPartitionReassignmentsPartitionRequest {
partition_index: Int32(partition),
replicas: CompactArray(Some(replicas.into_iter().map(Int32).collect())),
tagged_fields: TaggedFields::default(),
}],
tagged_fields: TaggedFields::default(),
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: TaggedFields::default(),
};

maybe_retry(
&self.backoff_config,
self,
"reassign_partitions",
|| async move {
let broker = self.get().await?;
let response = broker.request(request).await?;

if let Some(protocol_error) = response.error {
return Err(Error::ServerError(protocol_error, Default::default()));
}

let topic = response
.responses
.exactly_one()
.map_err(Error::exactly_one_topic)?;

let partition = topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;

match partition.error {
None => Ok(()),
Some(protocol_error) => Err(Error::ServerError(
protocol_error,
partition.error_message.0.unwrap_or_default(),
)),
}
},
)
.await
}

/// Elect leaders for given topic and partition.
pub async fn elect_leaders(
&self,
topic: impl Into<String> + Send,
partition: i32,
election_type: ElectionType,
timeout_ms: i32,
) -> Result<()> {
let request = &ElectLeadersRequest {
election_type: Int8(match election_type {
ElectionType::Preferred => 0,
ElectionType::Unclean => 1,
}),
topic_partitions: vec![ElectLeadersTopicRequest {
topic: String_(topic.into()),
partitions: Array(Some(vec![Int32(partition)])),
tagged_fields: None,
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: None,
};

maybe_retry(&self.backoff_config, self, "elect_leaders", || async move {
let broker = self.get().await?;
let response = broker.request(request).await?;

if let Some(protocol_error) = response.error {
return Err(Error::ServerError(protocol_error, Default::default()));
}

let topic = response
.replica_election_results
.exactly_one()
.map_err(Error::exactly_one_topic)?;

let partition = topic
.partition_results
.exactly_one()
.map_err(Error::exactly_one_partition)?;

match partition.error {
None => Ok(()),
Some(protocol_error) => Err(Error::ServerError(
protocol_error,
partition.error_message.0.unwrap_or_default(),
)),
}
})
.await
}

/// Retrieve the broker ID of the controller
async fn get_controller_id(&self) -> Result<i32> {
let metadata = self.brokers.request_metadata(None, Some(vec![])).await?;
50 changes: 50 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ use thiserror::Error;
use crate::{
client::partition::PartitionClient,
connection::{BrokerConnector, TlsConfig},
metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic},
protocol::primitives::Boolean,
topic::Topic,
};
@@ -145,4 +146,53 @@ impl Client {
})
.collect())
}

/// Return cluster-wide metadata.
pub async fn metadata(&self) -> Result<Metadata> {
let response = self.brokers.request_metadata(None, None).await?;

Ok(Metadata {
brokers: response
.brokers
.into_iter()
.map(|response| MetadataBroker {
node_id: response.node_id.0,
host: response.host.0,
port: response.port.0,
rack: response.rack.and_then(|s| s.0),
})
.collect(),
controller_id: response.controller_id.map(|id| id.0),
topics: response
.topics
.into_iter()
.map(|response| MetadataTopic {
name: response.name.0,
is_internal: response.is_internal.map(|b| b.0),
partitions: response
.partitions
.into_iter()
.map(|response| MetadataPartition {
partition_index: response.partition_index.0,
leader_id: response.leader_id.0,
replica_nodes: response
.replica_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
isr_nodes: response
.isr_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
})
.collect(),
})
.collect(),
})
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -22,11 +22,14 @@ mod backoff;
pub mod client;

mod connection;

#[cfg(feature = "unstable-fuzzing")]
pub mod messenger;
#[cfg(not(feature = "unstable-fuzzing"))]
mod messenger;

pub mod metadata;

#[cfg(feature = "unstable-fuzzing")]
pub mod protocol;
#[cfg(not(feature = "unstable-fuzzing"))]
59 changes: 59 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Cluster-wide Kafka metadata.

/// Metadata container for the entire cluster.
#[derive(Debug, PartialEq)]
pub struct Metadata {
/// Brokers.
pub brokers: Vec<MetadataBroker>,

/// The ID of the controller broker.
pub controller_id: Option<i32>,

/// Topics.
pub topics: Vec<MetadataTopic>,
}

/// Metadata for a certain broker.
#[derive(Debug, PartialEq)]
pub struct MetadataBroker {
/// The broker ID
pub node_id: i32,

/// The broker hostname
pub host: String,

/// The broker port
pub port: i32,

/// Rack.
pub rack: Option<String>,
}

/// Metadata for a certain topic.
#[derive(Debug, PartialEq)]
pub struct MetadataTopic {
/// The topic name
pub name: String,

/// True if the topic is internal
pub is_internal: Option<bool>,

/// Each partition in the topic
pub partitions: Vec<MetadataPartition>,
}

/// Metadata for a certain partition.
#[derive(Debug, PartialEq)]
pub struct MetadataPartition {
/// The partition index
pub partition_index: i32,

/// The ID of the leader broker
pub leader_id: i32,

/// The set of all nodes that host this partition
pub replica_nodes: Vec<i32>,

/// The set of all nodes that are in sync with the leader for this partition
pub isr_nodes: Vec<i32>,
}
234 changes: 234 additions & 0 deletions src/protocol/messages/alter_partition_reassignments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::io::{Read, Write};

use crate::protocol::{
api_key::ApiKey,
api_version::{ApiVersion, ApiVersionRange},
error::Error,
messages::{read_compact_versioned_array, write_compact_versioned_array},
primitives::{CompactArray, CompactNullableString, CompactString, Int16, Int32, TaggedFields},
traits::{ReadType, WriteType},
};

use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};

#[derive(Debug)]
pub struct AlterPartitionReassignmentsRequest {
/// The time in ms to wait for the request to complete.
pub timeout_ms: Int32,

/// The topics to reassign.
pub topics: Vec<AlterPartitionReassignmentsTopicRequest>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.timeout_ms.write(writer)?;
write_compact_versioned_array(writer, version, Some(&self.topics))?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

impl RequestBody for AlterPartitionReassignmentsRequest {
type ResponseBody = AlterPartitionReassignmentsResponse;

const API_KEY: ApiKey = ApiKey::AlterPartitionReassignments;

/// All versions.
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(0)));

const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(0));
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsTopicRequest {
/// The topic name.
pub name: CompactString,

/// The partitions to reassign.
pub partitions: Vec<AlterPartitionReassignmentsPartitionRequest>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsTopicRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.name.write(writer)?;
write_compact_versioned_array(writer, version, Some(&self.partitions))?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsPartitionRequest {
/// The partition index.
pub partition_index: Int32,

/// The replicas to place the partitions on, or null to cancel a pending reassignment for this partition.
pub replicas: CompactArray<Int32>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<W> WriteVersionedType<W> for AlterPartitionReassignmentsPartitionRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

self.partition_index.write(writer)?;
self.replicas.write(writer)?;
self.tagged_fields.write(writer)?;

Ok(())
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsResponse {
/// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
/// request did not violate any quota.
pub throttle_time_ms: Int32,

/// The top-level error code, or 0 if there was no error.
pub error: Option<Error>,

/// The top-level error message, or null if there was no error.
pub error_message: CompactNullableString,

/// The responses to topics to reassign.
pub responses: Vec<AlterPartitionReassignmentsTopicResponse>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let throttle_time_ms = Int32::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let error_message = CompactNullableString::read(reader)?;
let responses = read_compact_versioned_array(reader, version)?.unwrap_or_default();
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
throttle_time_ms,
error,
error_message,
responses,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsTopicResponse {
/// The topic name
pub name: CompactString,

/// The responses to partitions to reassign
pub partitions: Vec<AlterPartitionReassignmentsPartitionResponse>,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsTopicResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let name = CompactString::read(reader)?;
let partitions = read_compact_versioned_array(reader, version)?.unwrap_or_default();
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
name,
partitions,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct AlterPartitionReassignmentsPartitionResponse {
/// The partition index.
pub partition_index: Int32,

/// The error code for this partition, or 0 if there was no error.
pub error: Option<Error>,

/// The error message for this partition, or null if there was no error.
pub error_message: CompactNullableString,

/// The tagged fields.
pub tagged_fields: TaggedFields,
}

impl<R> ReadVersionedType<R> for AlterPartitionReassignmentsPartitionResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 0);

let partition_index = Int32::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let error_message = CompactNullableString::read(reader)?;
let tagged_fields = TaggedFields::read(reader)?;

Ok(Self {
partition_index,
error,
error_message,
tagged_fields,
})
}
}
281 changes: 281 additions & 0 deletions src/protocol/messages/elect_leaders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
use std::io::{Read, Write};

use crate::protocol::{
api_key::ApiKey,
api_version::{ApiVersion, ApiVersionRange},
error::Error,
messages::{
read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
write_versioned_array,
},
primitives::{
Array, CompactArrayRef, CompactNullableString, CompactString, CompactStringRef, Int16,
Int32, Int8, NullableString, String_, TaggedFields,
},
traits::{ReadType, WriteType},
};

use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};

#[derive(Debug)]
pub struct ElectLeadersRequest {
/// Type of elections to conduct for the partition.
///
/// A value of `0` elects the preferred replica. A value of `1` elects the first live replica if there are no
/// in-sync replica.
///
/// Added in version 1.
pub election_type: Int8,

/// The topic partitions to elect leaders.
pub topic_partitions: Vec<ElectLeadersTopicRequest>,

/// The time in ms to wait for the election to complete.
pub timeout_ms: Int32,

/// The tagged fields.
///
/// Added in version 2
pub tagged_fields: Option<TaggedFields>,
}

impl<W> WriteVersionedType<W> for ElectLeadersRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 2);

if v >= 1 {
self.election_type.write(writer)?;
}

if v >= 2 {
write_compact_versioned_array(writer, version, Some(&self.topic_partitions))?;
} else {
write_versioned_array(writer, version, Some(&self.topic_partitions))?;
}

self.timeout_ms.write(writer)?;

if v >= 2 {
match self.tagged_fields.as_ref() {
Some(tagged_fields) => {
tagged_fields.write(writer)?;
}
None => {
TaggedFields::default().write(writer)?;
}
}
}

Ok(())
}
}

impl RequestBody for ElectLeadersRequest {
type ResponseBody = ElectLeadersResponse;

const API_KEY: ApiKey = ApiKey::ElectLeaders;

/// All versions.
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(2)));

const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(2));
}

#[derive(Debug)]
pub struct ElectLeadersTopicRequest {
/// The name of a topic.
pub topic: String_,

/// The partitions of this topic whose leader should be elected.
pub partitions: Array<Int32>,

/// The tagged fields.
///
/// Added in version 2
pub tagged_fields: Option<TaggedFields>,
}

impl<W> WriteVersionedType<W> for ElectLeadersTopicRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0 .0;
assert!(v <= 2);

if v >= 2 {
CompactStringRef(&self.topic.0).write(writer)?;
} else {
self.topic.write(writer)?;
}

if v >= 2 {
CompactArrayRef(self.partitions.0.as_deref()).write(writer)?;
} else {
self.partitions.write(writer)?;
}

if v >= 2 {
match self.tagged_fields.as_ref() {
Some(tagged_fields) => {
tagged_fields.write(writer)?;
}
None => {
TaggedFields::default().write(writer)?;
}
}
}

Ok(())
}
}

#[derive(Debug)]
pub struct ElectLeadersResponse {
/// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
/// request did not violate any quota.
pub throttle_time_ms: Int32,

/// The top level response error code.
///
/// Added in version 1.
pub error: Option<Error>,

/// The election results, or an empty array if the requester did not have permission and the request asks for all
/// partitions.
pub replica_election_results: Vec<ElectLeadersTopicResponse>,

/// The tagged fields.
///
/// Added in version 2
pub tagged_fields: Option<TaggedFields>,
}

impl<R> ReadVersionedType<R> for ElectLeadersResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 2);

let throttle_time_ms = Int32::read(reader)?;
let error = (v >= 1)
.then(|| Int16::read(reader))
.transpose()?
.and_then(|e| Error::new(e.0));
let replica_election_results = if v >= 2 {
read_compact_versioned_array(reader, version)?.unwrap_or_default()
} else {
read_versioned_array(reader, version)?.unwrap_or_default()
};
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;

Ok(Self {
throttle_time_ms,
error,
replica_election_results,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct ElectLeadersTopicResponse {
/// The topic name.
pub topic: String_,

/// The results for each partition.
pub partition_results: Vec<ElectLeadersPartitionResponse>,

/// The tagged fields.
///
/// Added in version 2
pub tagged_fields: Option<TaggedFields>,
}

impl<R> ReadVersionedType<R> for ElectLeadersTopicResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 2);

let topic = if v >= 2 {
String_(CompactString::read(reader)?.0)
} else {
String_::read(reader)?
};
let partition_results = if v >= 2 {
read_compact_versioned_array(reader, version)?.unwrap_or_default()
} else {
read_versioned_array(reader, version)?.unwrap_or_default()
};
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;

Ok(Self {
topic,
partition_results,
tagged_fields,
})
}
}

#[derive(Debug)]
pub struct ElectLeadersPartitionResponse {
/// The partition id.
pub partition_id: Int32,

/// The result error, or zero if there was no error.
pub error: Option<Error>,

/// The result message, or null if there was no error.
pub error_message: NullableString,

/// The tagged fields.
///
/// Added in version 2
pub tagged_fields: Option<TaggedFields>,
}

impl<R> ReadVersionedType<R> for ElectLeadersPartitionResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0 .0;
assert!(v <= 2);

let partition_id = Int32::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let error_message = if v >= 2 {
NullableString(CompactNullableString::read(reader)?.0)
} else {
NullableString::read(reader)?
};
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;

Ok(Self {
partition_id,
error,
error_message,
tagged_fields,
})
}
}
4 changes: 4 additions & 0 deletions src/protocol/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ use super::{
vec_builder::VecBuilder,
};

mod alter_partition_reassignments;
pub use alter_partition_reassignments::*;
mod api_versions;
pub use api_versions::*;
mod constants;
@@ -24,6 +26,8 @@ mod create_topics;
pub use create_topics::*;
mod delete_records;
pub use delete_records::*;
mod elect_leaders;
pub use elect_leaders::*;
mod fetch;
pub use fetch::*;
mod header;
96 changes: 95 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use assert_matches::assert_matches;
use rskafka::{
client::{
error::{Error as ClientError, ProtocolError},
controller::ElectionType,
error::{Error as ClientError, ProtocolError, RequestError},
partition::{Compression, OffsetAt},
ClientBuilder,
},
@@ -540,6 +541,99 @@ async fn test_delete_records() {
);
}

#[tokio::test]
async fn test_metadata() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

let md = client.metadata().await.unwrap();
assert!(!md.brokers.is_empty());

// topic metadata might take a while to converge
tokio::time::timeout(Duration::from_millis(1_000), async {
loop {
let md = client.metadata().await.unwrap();
let topic = md.topics.into_iter().find(|topic| topic.name == topic_name);
if topic.is_some() {
return;
}

tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}

#[tokio::test]
async fn test_reassign_partitions() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

let res = controller_client
.reassign_partitions(&topic_name, 0, vec![0, 1], 5_000)
.await;
match res {
Ok(()) => {}
Err(ClientError::Request(RequestError::NoVersionMatch { .. }))
if std::env::var("TEST_REASSIGN_PARTITIONS").is_err() =>
{
println!("Skip test_elect_leaders");
}
Err(e) => panic!("Unexpected error: {e}"),
}
}

#[tokio::test]
async fn test_elect_leaders() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 3, 5_000)
.await
.unwrap();

let res = controller_client
.elect_leaders(&topic_name, 0, ElectionType::Preferred, 5_000)
.await;
match res {
Ok(()) => {}
Err(ClientError::ServerError(ProtocolError::ElectionNotNeeded, _)) => {}
Err(ClientError::Request(RequestError::NoVersionMatch { .. }))
if std::env::var("TEST_ELECT_LEADERS").is_err() =>
{
println!("Skip test_elect_leaders");
}
Err(e) => panic!("Unexpected error: {e}"),
}
}

pub fn large_record() -> Record {
Record {
key: Some(b"".to_vec()),