-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: select cluster controller for certain actions #60
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
135148a
ci: do not connect to the cluster controller by default
crepererum 1458878
fix: introduce `ControllerClient` for cluster-wide operations
crepererum 200df6a
fix: lower initial backoff to 100ms (from 1s)
crepererum 65161d6
fix: do not handle irrelevant errors
crepererum 48ced12
test: increate timeouts
crepererum File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
use std::sync::Arc; | ||
|
||
use tokio::sync::Mutex; | ||
use tracing::{debug, error, info}; | ||
|
||
use crate::{ | ||
backoff::{Backoff, BackoffConfig}, | ||
client::{Error, Result}, | ||
connection::{BrokerConnection, BrokerConnector}, | ||
messenger::RequestError, | ||
protocol::{ | ||
error::Error as ProtocolError, | ||
messages::{CreateTopicRequest, CreateTopicsRequest}, | ||
primitives::{Int16, Int32, NullableString, String_}, | ||
}, | ||
}; | ||
|
||
#[derive(Debug)] | ||
pub struct ControllerClient { | ||
brokers: Arc<BrokerConnector>, | ||
|
||
backoff_config: BackoffConfig, | ||
|
||
/// Current broker connection if any | ||
current_broker: Mutex<Option<BrokerConnection>>, | ||
} | ||
|
||
impl ControllerClient { | ||
pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self { | ||
Self { | ||
brokers, | ||
backoff_config: Default::default(), | ||
current_broker: Mutex::new(None), | ||
} | ||
} | ||
|
||
/// Create a topic | ||
pub async fn create_topic( | ||
&self, | ||
name: impl Into<String> + Send, | ||
num_partitions: i32, | ||
replication_factor: i16, | ||
) -> Result<()> { | ||
let request = &CreateTopicsRequest { | ||
topics: vec![CreateTopicRequest { | ||
name: String_(name.into()), | ||
num_partitions: Int32(num_partitions), | ||
replication_factor: Int16(replication_factor), | ||
assignments: vec![], | ||
configs: vec![], | ||
tagged_fields: None, | ||
}], | ||
// TODO: Expose as configuration parameter | ||
timeout_ms: Int32(5_000), | ||
validate_only: None, | ||
tagged_fields: None, | ||
}; | ||
|
||
self.maybe_retry("create_topic", || async move { | ||
let broker = self.get_cached_controller_broker().await?; | ||
let response = broker.request(request).await?; | ||
|
||
if response.topics.len() != 1 { | ||
return Err(Error::InvalidResponse(format!( | ||
"Expected a single topic in response, got {}", | ||
response.topics.len() | ||
))); | ||
} | ||
|
||
let topic = response.topics.into_iter().next().unwrap(); | ||
|
||
match topic.error { | ||
None => Ok(()), | ||
Some(protocol_error) => match topic.error_message { | ||
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)), | ||
_ => Err(Error::ServerError(protocol_error, Default::default())), | ||
}, | ||
} | ||
}) | ||
.await | ||
} | ||
|
||
/// Takes a `request_name` and a function yielding a fallible future | ||
/// and handles certain classes of error | ||
async fn maybe_retry<R, F, T>(&self, request_name: &str, f: R) -> Result<T> | ||
where | ||
R: (Fn() -> F) + Send + Sync, | ||
F: std::future::Future<Output = Result<T>> + Send, | ||
{ | ||
let mut backoff = Backoff::new(&self.backoff_config); | ||
|
||
loop { | ||
let error = match f().await { | ||
Ok(v) => return Ok(v), | ||
Err(e) => e, | ||
}; | ||
|
||
match error { | ||
// broken connection | ||
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_)) | ||
| Error::Connection(_) => self.invalidate_cached_controller_broker().await, | ||
|
||
// our broker is actually not the controller | ||
Error::ServerError(ProtocolError::NotController, _) => { | ||
self.invalidate_cached_controller_broker().await; | ||
} | ||
|
||
// fatal | ||
_ => { | ||
error!( | ||
e=%error, | ||
request_name, | ||
"request encountered fatal error", | ||
); | ||
return Err(error); | ||
} | ||
} | ||
|
||
let backoff = backoff.next(); | ||
info!( | ||
e=%error, | ||
request_name, | ||
backoff_secs=backoff.as_secs(), | ||
"request encountered non-fatal error - backing off", | ||
); | ||
tokio::time::sleep(backoff).await; | ||
} | ||
} | ||
|
||
/// Gets a cached [`BrokerConnection`] to any cluster controller. | ||
async fn get_cached_controller_broker(&self) -> Result<BrokerConnection> { | ||
let mut current_broker = self.current_broker.lock().await; | ||
if let Some(broker) = &*current_broker { | ||
return Ok(Arc::clone(broker)); | ||
} | ||
|
||
info!("Creating new controller broker connection",); | ||
|
||
let controller_id = self.get_controller_id().await?; | ||
let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| { | ||
Error::InvalidResponse(format!( | ||
"Controller {} not found in metadata response", | ||
controller_id | ||
)) | ||
})?; | ||
|
||
*current_broker = Some(Arc::clone(&broker)); | ||
Ok(broker) | ||
} | ||
|
||
/// Invalidates the cached controller broker. | ||
/// | ||
/// The next call to `[ContollerClient::get_cached_controller_broker]` will get a new connection | ||
pub async fn invalidate_cached_controller_broker(&self) { | ||
debug!("Invalidating cached controller broker"); | ||
self.current_broker.lock().await.take(); | ||
} | ||
|
||
/// Retrieve the broker ID of the controller | ||
async fn get_controller_id(&self) -> Result<i32> { | ||
let metadata = self.brokers.request_metadata(None, Some(vec![])).await?; | ||
|
||
let controller_id = metadata | ||
.controller_id | ||
.ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))? | ||
.0; | ||
|
||
Ok(controller_id) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have the same derp as partition leaders where brokers can be controller but not realise they are? I presume not??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a desync could only lead
NotController
(I hope Kafka is checking this before checking the content of the operation). Let's see if we can some weird errors and then we can add another check.