diff --git a/apollo-router/src/plugins/connectors/plugin.rs b/apollo-router/src/plugins/connectors/plugin.rs index cc22f28e30b..ae45bdc4e73 100644 --- a/apollo-router/src/plugins/connectors/plugin.rs +++ b/apollo-router/src/plugins/connectors/plugin.rs @@ -15,20 +15,25 @@ use serde::Deserialize; use serde::Serialize; use serde_json_bytes::json; use tower::BoxError; +use tower::ServiceBuilder; use tower::ServiceExt as TowerServiceExt; +use super::query_plans::get_connectors; use crate::layers::ServiceExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::plugins::connectors::configuration::ConnectorsConfig; use crate::plugins::connectors::request_limit::RequestLimits; use crate::register_plugin; +use crate::services::execution; use crate::services::router::body::RouterBody; use crate::services::supergraph; const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging"; const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING"; +const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging"; const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION"; +const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan"; static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false); @@ -121,7 +126,7 @@ impl Plugin for Connectors { if let Some(first) = &mut first { if let Some(inner) = Arc::into_inner(debug) { first.extensions.insert( - "apolloConnectorsDebugging", + CONNECTORS_DEBUG_KEY, json!({"version": "1", "data": inner.into_inner().serialize() }), ); } @@ -143,6 +148,49 @@ impl Plugin for Connectors { ) .boxed() } + + fn execution_service(&self, service: execution::BoxService) -> execution::BoxService { + ServiceBuilder::new() + .map_request(|req: execution::Request| { + let Some(connectors) = get_connectors(&req.context) else { + return req; + }; + + // add [{"subgraph_name": "", "source_name": ""}] to the context + // for connectors with sources in the query plan. + let list = req + .query_plan + .root + .service_usage() + .unique() + .flat_map(|service_name| { + let Some(connector) = connectors.get(service_name) else { + return None; + }; + + let Some(ref source_name) = connector.id.source_name else { + return None; + }; + + Some((connector.id.subgraph_name.clone(), source_name.clone())) + }) + .unique() + .map(|(subgraph_name, source_name)| { + json!({ + "subgraph_name": subgraph_name, + "source_name": source_name, + }) + }) + .collect_vec(); + + req.context + .insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list) + .unwrap(); + req + }) + .service(service) + .boxed() + } } pub(crate) const PLUGIN_NAME: &str = "preview_connectors"; diff --git a/apollo-router/src/plugins/connectors/query_plans.rs b/apollo-router/src/plugins/connectors/query_plans.rs index 8af0a2dad25..0a9698a5921 100644 --- a/apollo-router/src/plugins/connectors/query_plans.rs +++ b/apollo-router/src/plugins/connectors/query_plans.rs @@ -1,19 +1,37 @@ use std::sync::Arc; +use apollo_federation::sources::connect::Connector; use indexmap::IndexMap; use crate::query_planner::PlanNode; use crate::Context; -type ConnectorsContext = Arc, String>>; +type ConnectorsByServiceName = Arc, Connector>>; -pub(crate) fn store_connectors_context( +pub(crate) fn store_connectors( + context: &Context, + connectors_by_service_name: Arc, Connector>>, +) { + context + .extensions() + .with_lock(|mut lock| lock.insert::(connectors_by_service_name)); +} + +pub(crate) fn get_connectors(context: &Context) -> Option { + context + .extensions() + .with_lock(|lock| lock.get::().cloned()) +} + +type ConnectorLabels = Arc, String>>; + +pub(crate) fn store_connectors_labels( context: &Context, labels_by_service_name: Arc, String>>, ) { context .extensions() - .with_lock(|mut lock| lock.insert::(labels_by_service_name)); + .with_lock(|mut lock| lock.insert::(labels_by_service_name)); } pub(crate) fn replace_connector_service_names_text( @@ -22,7 +40,7 @@ pub(crate) fn replace_connector_service_names_text( ) -> Option> { let replacements = context .extensions() - .with_lock(|lock| lock.get::().cloned()); + .with_lock(|lock| lock.get::().cloned()); if let Some(replacements) = replacements { text.as_ref().map(|text| { let mut text = text.to_string(); @@ -42,7 +60,7 @@ pub(crate) fn replace_connector_service_names( ) -> Arc { let replacements = context .extensions() - .with_lock(|lock| lock.get::().cloned()); + .with_lock(|lock| lock.get::().cloned()); return if let Some(replacements) = replacements { let mut plan = plan.clone(); diff --git a/apollo-router/src/plugins/connectors/tests.rs b/apollo-router/src/plugins/connectors/tests.rs index c6f3230e5ee..9e3c640e0a4 100644 --- a/apollo-router/src/plugins/connectors/tests.rs +++ b/apollo-router/src/plugins/connectors/tests.rs @@ -1360,6 +1360,78 @@ async fn test_interface_object() { ); } +#[tokio::test] +async fn test_sources_in_context() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/coprocessor")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "control": "continue", + "version": 1, + "stage": "ExecutionRequest" + }))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path("/posts")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([ + { "userId": 1, "id": 1, "title": "title", "body": "body" }, + { "userId": 1, "id": 2, "title": "title", "body": "body" }] + ))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path("/users/1")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "id": 1, + "name": "Leanne Graham", + "username": "Bret" + }))) + .mount(&mock_server) + .await; + let uri = mock_server.uri(); + + let _ = execute( + &QUICKSTART_SCHEMA.replace("https://jsonplaceholder.typicode.com", &mock_server.uri()), + &uri, + "query Posts { posts { id body title author { name username } } }", + Default::default(), + Some(json!({ + "coprocessor": { + "url": format!("{}/coprocessor", mock_server.uri()), + "execution": { + "request": { + "context": true + } + } + } + })), + |_| {}, + ) + .await; + + let requests = &mock_server.received_requests().await.unwrap(); + let coprocessor_request = requests.first().unwrap(); + let body = coprocessor_request + .body_json::() + .unwrap(); + pretty_assertions::assert_eq!( + body.get("context") + .unwrap() + .as_object() + .unwrap() + .get("entries") + .unwrap() + .as_object() + .unwrap() + .get("apollo_connectors::sources_in_query_plan") + .unwrap(), + &serde_json_bytes::json!([ + { "subgraph_name": "connectors", "source_name": "jsonPlaceholder" } + ]) + ); +} + mod quickstart_tests { use super::*; diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index 1b3e8e0ca04..74643d38e3e 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -101,6 +101,10 @@ impl QueryPlan { pub fn subgraph_fetches(&self) -> usize { self.root.subgraph_fetches() } + + pub fn service_usage<'a>(&'a self) -> Box + 'a> { + self.root.service_usage() + } } // holds the query plan executon arguments that do not change between calls diff --git a/apollo-router/src/query_planner/fetch.rs b/apollo-router/src/query_planner/fetch.rs index 80d05cf4491..84f4c89504b 100644 --- a/apollo-router/src/query_planner/fetch.rs +++ b/apollo-router/src/query_planner/fetch.rs @@ -556,7 +556,6 @@ impl FetchNode { } } - #[cfg(test)] pub(crate) fn service_name(&self) -> &str { &self.service_name } diff --git a/apollo-router/src/query_planner/plan.rs b/apollo-router/src/query_planner/plan.rs index 447adb7ba7a..f19b7fffc39 100644 --- a/apollo-router/src/query_planner/plan.rs +++ b/apollo-router/src/query_planner/plan.rs @@ -468,7 +468,6 @@ impl PlanNode { Ok(()) } - #[cfg(test)] /// Retrieves all the services used across all plan nodes. /// /// Note that duplicates are not filtered. diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index ca5870988c9..5967706ef15 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -704,9 +704,9 @@ pub(crate) async fn create_plugins( add_optional_apollo_plugin!("demand_control"); // This relative ordering is documented in `docs/source/customizations/native.mdx`: + add_optional_apollo_plugin!("preview_connectors"); add_optional_apollo_plugin!("rhai"); add_optional_apollo_plugin!("coprocessor"); - add_optional_apollo_plugin!("preview_connectors"); add_user_plugins!(); // Macros above remove from `apollo_plugin_factories`, so anything left at the end diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 8438e6a741f..a5e4a10f5a7 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -35,7 +35,8 @@ use crate::graphql; use crate::graphql::IntoGraphQLErrors; use crate::graphql::Response; use crate::plugin::DynPlugin; -use crate::plugins::connectors::query_plans::store_connectors_context; +use crate::plugins::connectors::query_plans::store_connectors; +use crate::plugins::connectors::query_plans::store_connectors_labels; use crate::plugins::subscription::Subscription; use crate::plugins::subscription::SubscriptionConfig; use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN; @@ -130,7 +131,8 @@ impl Service for SupergraphService { fn call(&mut self, req: SupergraphRequest) -> Self::Future { if let Some(connectors) = &self.schema.connectors { - store_connectors_context(&req.context, connectors.labels_by_service_name.clone()); + store_connectors_labels(&req.context, connectors.labels_by_service_name.clone()); + store_connectors(&req.context, connectors.by_service_name.clone()); } // Consume our cloned services and allow ownership to be transferred to the async block.