Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: select cluster controller for certain actions #60

Merged
merged 5 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
KAFKA_CONNECT: "redpanda-0:9092"
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "redpanda-1:9092"
steps:
- checkout
- rust_components
Expand Down Expand Up @@ -244,7 +246,9 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
KAFKA_CONNECT: "kafka-0:9093"
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "kafka-1:9093"
steps:
- checkout
- rust_components
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ let client = Client::new_plain(vec![connection]).await.unwrap();

// create a topic
let topic = "my_topic";
client.create_topic(
let controller_client = client.controller_client().await.unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
Expand Down Expand Up @@ -90,7 +91,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9092 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
```

in another session.
Expand All @@ -106,7 +107,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9094 cargo test
```

in another session.
Expand Down
2 changes: 1 addition & 1 deletion src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct BackoffConfig {
impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_secs(1),
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(500),
base: 3.,
}
Expand Down
170 changes: 170 additions & 0 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::{debug, error, info};

use crate::{
backoff::{Backoff, BackoffConfig},
client::{Error, Result},
connection::{BrokerConnection, BrokerConnector},
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::{Int16, Int32, NullableString, String_},
},
};

#[derive(Debug)]
pub struct ControllerClient {
brokers: Arc<BrokerConnector>,

backoff_config: BackoffConfig,

/// Current broker connection if any
current_broker: Mutex<Option<BrokerConnection>>,
}

impl ControllerClient {
pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
Self {
brokers,
backoff_config: Default::default(),
current_broker: Mutex::new(None),
}
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let request = &CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
};

self.maybe_retry("create_topic", || async move {
let broker = self.get_cached_controller_broker().await?;
let response = broker.request(request).await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
})
.await
}

/// Takes a `request_name` and a function yielding a fallible future
/// and handles certain classes of error
async fn maybe_retry<R, F, T>(&self, request_name: &str, f: R) -> Result<T>
where
R: (Fn() -> F) + Send + Sync,
F: std::future::Future<Output = Result<T>> + Send,
{
let mut backoff = Backoff::new(&self.backoff_config);

loop {
let error = match f().await {
Ok(v) => return Ok(v),
Err(e) => e,
};

match error {
// broken connection
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_controller_broker().await,

// our broker is actually not the controller
Error::ServerError(ProtocolError::NotController, _) => {
self.invalidate_cached_controller_broker().await;
}

// fatal
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return Err(error);
}
}

let backoff = backoff.next();
info!(
e=%error,
request_name,
backoff_secs=backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
}

/// Gets a cached [`BrokerConnection`] to any cluster controller.
async fn get_cached_controller_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}

info!("Creating new controller broker connection",);

let controller_id = self.get_controller_id().await?;
let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| {
Error::InvalidResponse(format!(
"Controller {} not found in metadata response",
controller_id
))
})?;

*current_broker = Some(Arc::clone(&broker));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have the same derp as partition leaders where brokers can be controller but not realise they are? I presume not??

Copy link
Collaborator Author

@crepererum crepererum Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a desync could only lead NotController (I hope Kafka is checking this before checking the content of the operation). Let's see if we can some weird errors and then we can add another check.

Ok(broker)
}

/// Invalidates the cached controller broker.
///
/// The next call to `[ContollerClient::get_cached_controller_broker]` will get a new connection
pub async fn invalidate_cached_controller_broker(&self) {
debug!("Invalidating cached controller broker");
self.current_broker.lock().await.take();
}

/// Retrieve the broker ID of the controller
async fn get_controller_id(&self) -> Result<i32> {
let metadata = self.brokers.request_metadata(None, Some(vec![])).await?;

let controller_id = metadata
.controller_id
.ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))?
.0;

Ok(controller_id)
}
}
58 changes: 9 additions & 49 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ use std::sync::Arc;
use thiserror::Error;

use crate::{
client::partition::PartitionClient,
connection::BrokerConnector,
protocol::{
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::*,
},
client::partition::PartitionClient, connection::BrokerConnector, protocol::primitives::Boolean,
topic::Topic,
};

pub mod consumer;
pub mod controller;
pub mod error;
pub mod partition;
pub mod producer;

use error::{Error, Result};

use self::controller::ControllerClient;

#[derive(Debug, Error)]
pub enum ProduceError {
#[error(transparent)]
Expand Down Expand Up @@ -67,6 +65,11 @@ impl Client {
Ok(Self { brokers })
}

/// Returns a client for performing certain cluster-wide operations.
pub async fn controller_client(&self) -> Result<ControllerClient> {
Ok(ControllerClient::new(Arc::clone(&self.brokers)))
}

/// Returns a client for performing operations on a specific partition
pub async fn partition_client(
&self,
Expand Down Expand Up @@ -98,47 +101,4 @@ impl Client {
})
.collect())
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let broker = self.brokers.get_arbitrary_cached_broker().await?;
let response = broker
.request(CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
})
.await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
}
}
27 changes: 16 additions & 11 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rand::prelude::*;
use std::sync::Arc;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use thiserror::Error;
use tokio::io::BufStream;
Expand Down Expand Up @@ -50,8 +50,10 @@ pub struct BrokerConnector {
/// Discovered brokers in the cluster, including bootstrap brokers
topology: BrokerTopology,

/// The current cached broker
current_broker: Mutex<Option<BrokerConnection>>,
/// The cached arbitrary broker.
///
/// This one is used for metadata queries.
cached_arbitrary_broker: Mutex<Option<BrokerConnection>>,

/// The backoff configuration on error
backoff_config: BackoffConfig,
Expand All @@ -72,7 +74,7 @@ impl BrokerConnector {
Self {
bootstrap_brokers,
topology: Default::default(),
current_broker: Mutex::new(None),
cached_arbitrary_broker: Mutex::new(None),
backoff_config: Default::default(),
tls_config,
max_message_size,
Expand Down Expand Up @@ -113,7 +115,7 @@ impl BrokerConnector {
// Retrieve the broker within the loop, in case it is invalidated
let broker = match broker_override.as_ref() {
Some(b) => Arc::clone(b),
None => self.get_arbitrary_cached_broker().await?,
None => self.get_cached_arbitrary_broker().await?,
};

let error = match broker.request(&request).await {
Expand Down Expand Up @@ -142,15 +144,18 @@ impl BrokerConnector {
tokio::time::sleep(backoff).await;
};

// Since the metadata request contains information about the cluster state, use it to update our view.
self.topology.update(&response.brokers);

Ok(response)
}

/// Invalidates the current cached broker
/// Invalidates the cached arbitrary broker.
///
/// The next call to `[BrokerConnector::get_arbitrary_cached_broker]` will get a new connection
/// The next call to `[BrokerConnector::get_cached_arbitrary_broker]` will get a new connection
pub async fn invalidate_cached_arbitrary_broker(&self) {
self.current_broker.lock().await.take();
debug!("Invalidating cached arbitrary broker");
self.cached_arbitrary_broker.lock().await.take();
}

/// Returns a new connection to the broker with the provided id
Expand Down Expand Up @@ -179,8 +184,8 @@ impl BrokerConnector {
}

/// Gets a cached [`BrokerConnection`] to any broker
pub async fn get_arbitrary_cached_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
pub async fn get_cached_arbitrary_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.cached_arbitrary_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}
Expand Down Expand Up @@ -230,7 +235,7 @@ impl std::fmt::Debug for BrokerConnector {
f.debug_struct("BrokerConnector")
.field("bootstrap_brokers", &self.bootstrap_brokers)
.field("topology", &self.topology)
.field("current_broker", &self.current_broker)
.field("cached_arbitrary_broker", &self.cached_arbitrary_broker)
.field("backoff_config", &self.backoff_config)
.field("tls_config", &tls_config)
.field("max_message_size", &self.max_message_size)
Expand Down
Loading