Skip to content

Commit

Permalink
refactor handle_pipeline_request to accept Arc<Pipeline> instead of &…
Browse files Browse the repository at this point in the history
…Pipeline

Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Feb 9, 2025
1 parent 87740fd commit 3b72aba
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2129,7 +2129,7 @@ where
.await
} else {
// The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately.
Self::handle_pipeline_request(&pipeline, core).await
Self::handle_pipeline_request(pipeline, core).await
}
}
CmdArg::ClusterScan {
Expand Down Expand Up @@ -2160,13 +2160,16 @@ where
///
/// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses,
/// and aggregates them if necessary according to the specified response policies.
async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core<C>) -> OperationResult {
async fn handle_pipeline_request(
pipeline: Arc<crate::Pipeline>,
core: Core<C>,
) -> OperationResult {
// Distribute pipeline commands across cluster nodes based on routing information.
// Returns:
// - pipelines_by_node: Map of node addresses to their pipeline contexts
// - response_policies: List of response aggregation policies for multi-node commands
let (pipelines_by_node, response_policies) =
map_pipeline_to_nodes(pipeline, core.clone()).await?;
map_pipeline_to_nodes(&pipeline, core.clone()).await?;

// Initialize `PipelineResponses` to store responses for each pipeline command.
// This will be used to store the responses from the different sub-pipelines to the pipeline commands.
Expand Down

0 comments on commit 3b72aba

Please sign in to comment.