Skip to content

Commit

Permalink
improve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Feb 9, 2025
1 parent cf273f3 commit 87740fd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
21 changes: 8 additions & 13 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
//! ```
mod connections_container;
mod pipeline_routing;

mod connections_logic;
mod pipeline_routing;
/// Exposed only for testing.
pub mod testing {
pub use super::connections_container::ConnectionDetails;
Expand Down Expand Up @@ -2133,7 +2132,6 @@ where
Self::handle_pipeline_request(&pipeline, core).await
}
}

CmdArg::ClusterScan {
cluster_scan_args, ..
} => {
Expand Down Expand Up @@ -2162,18 +2160,11 @@ where
///
/// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses,
/// and aggregates them if necessary according to the specified response policies.
///
/// # Arguments
/// * `pipeline` - A reference to the pipeline to be executed.
/// * `core` - A reference to the core cluster connection state.
///
/// # Returns
/// * `OperationResult` - Returns a result containing the aggregated responses from the sub-pipelines, or an error if the operation fails.
async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core<C>) -> OperationResult {
// Distribute pipeline commands across cluster nodes based on routing information.
// Returns:
// - pipelines_by_node: Map of node addresses to their pipeline contexts
// - response_policies: List of response aggregation policies for multi-node operations
// - response_policies: List of response aggregation policies for multi-node commands
let (pipelines_by_node, response_policies) =
map_pipeline_to_nodes(pipeline, core.clone()).await?;

Expand All @@ -2184,14 +2175,17 @@ where
// a vector of tuples where each tuple holds a response to the command and the address of the node that provided it.
let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()];

// Send the requests to each node and collect thw responses
// Send the requests to each node and collect the responses
// Returns a tuple containing:
// - A vector of results for each sub-pipeline execution.
// - A vector of (address, indices) pairs indicating where each response should be placed.
let (responses, addresses_and_indices) =
collect_and_send_pending_requests(pipelines_by_node, core.clone()).await;

// Process the responses and update the pipeline_responses
process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?;

// Process response policies after all tasks are complete
// Process response policies after all tasks are complete and aggregate the relevant commands.
Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies)
.await?;

Expand Down Expand Up @@ -2272,6 +2266,7 @@ where
pipeline_responses: &mut PipelineResponses,
response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option<ResponsePolicy>)>,
) -> Result<(), (OperationTarget, RedisError)> {
// Go over the multi-node commands
for (index, routing_info, response_policy) in response_policies {
let response_receivers = pipeline_responses[index]
.iter()
Expand Down
8 changes: 4 additions & 4 deletions glide-core/tests/test_socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,12 +1270,12 @@ mod socket_listener {
CommandComponents {
args: vec!["FLUSHALL".to_string().into()],
args_pointer: false,
request_type: RequestType::CustomCommand.into(),
request_type: RequestType::CustomCommand.into(), // AllPrimaries command
},
CommandComponents {
args: vec![],
args_pointer: false,
request_type: RequestType::DBSize.into(),
request_type: RequestType::DBSize.into(), // Aggregation of sum
},
CommandComponents {
args: vec![key.clone().into()],
Expand Down Expand Up @@ -1305,12 +1305,12 @@ mod socket_listener {
CommandComponents {
args: vec!["appendonly".to_string().into(), "no".to_string().into()],
args_pointer: false,
request_type: RequestType::ConfigSet.into(),
request_type: RequestType::ConfigSet.into(), // AllNodes command
},
CommandComponents {
args: vec!["appendonly".to_string().into()],
args_pointer: false,
request_type: RequestType::ConfigGet.into(),
request_type: RequestType::ConfigGet.into(), // RandomNode command
},
];
let mut buffer = Vec::with_capacity(200);
Expand Down

0 comments on commit 87740fd

Please sign in to comment.