From 87740fdc616ebe4f13ab144559fa9fd431267a39 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Sun, 9 Feb 2025 14:57:11 +0000 Subject: [PATCH] improve comments Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 21 +++++++------------ glide-core/tests/test_socket_listener.rs | 8 +++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 4bcc6adb91..a2743efc3c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -23,9 +23,8 @@ //! ``` mod connections_container; -mod pipeline_routing; - mod connections_logic; +mod pipeline_routing; /// Exposed only for testing. pub mod testing { pub use super::connections_container::ConnectionDetails; @@ -2133,7 +2132,6 @@ where Self::handle_pipeline_request(&pipeline, core).await } } - CmdArg::ClusterScan { cluster_scan_args, .. } => { @@ -2162,18 +2160,11 @@ where /// /// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses, /// and aggregates them if necessary according to the specified response policies. - /// - /// # Arguments - /// * `pipeline` - A reference to the pipeline to be executed. - /// * `core` - A reference to the core cluster connection state. - /// - /// # Returns - /// * `OperationResult` - Returns a result containing the aggregated responses from the sub-pipelines, or an error if the operation fails. async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core) -> OperationResult { // Distribute pipeline commands across cluster nodes based on routing information. // Returns: // - pipelines_by_node: Map of node addresses to their pipeline contexts - // - response_policies: List of response aggregation policies for multi-node operations + // - response_policies: List of response aggregation policies for multi-node commands let (pipelines_by_node, response_policies) = map_pipeline_to_nodes(pipeline, core.clone()).await?; @@ -2184,14 +2175,17 @@ where // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()]; - // Send the requests to each node and collect thw responses + // Send the requests to each node and collect the responses + // Returns a tuple containing: + // - A vector of results for each sub-pipeline execution. + // - A vector of (address, indices) pairs indicating where each response should be placed. let (responses, addresses_and_indices) = collect_and_send_pending_requests(pipelines_by_node, core.clone()).await; // Process the responses and update the pipeline_responses process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?; - // Process response policies after all tasks are complete + // Process response policies after all tasks are complete and aggregate the relevant commands. Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies) .await?; @@ -2272,6 +2266,7 @@ where pipeline_responses: &mut PipelineResponses, response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, ) -> Result<(), (OperationTarget, RedisError)> { + // Go over the multi-node commands for (index, routing_info, response_policy) in response_policies { let response_receivers = pipeline_responses[index] .iter() diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index b9054e3bea..870f0869dd 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -1270,12 +1270,12 @@ mod socket_listener { CommandComponents { args: vec!["FLUSHALL".to_string().into()], args_pointer: false, - request_type: RequestType::CustomCommand.into(), + request_type: RequestType::CustomCommand.into(), // AllPrimaries command }, CommandComponents { args: vec![], args_pointer: false, - request_type: RequestType::DBSize.into(), + request_type: RequestType::DBSize.into(), // Aggregation of sum }, CommandComponents { args: vec![key.clone().into()], @@ -1305,12 +1305,12 @@ mod socket_listener { CommandComponents { args: vec!["appendonly".to_string().into(), "no".to_string().into()], args_pointer: false, - request_type: RequestType::ConfigSet.into(), + request_type: RequestType::ConfigSet.into(), // AllNodes command }, CommandComponents { args: vec!["appendonly".to_string().into()], args_pointer: false, - request_type: RequestType::ConfigGet.into(), + request_type: RequestType::ConfigGet.into(), // RandomNode command }, ]; let mut buffer = Vec::with_capacity(200);