Skip to content

Commit

Permalink
add list of subgraphs/sources to context (#6215)
Browse files Browse the repository at this point in the history
  • Loading branch information
lennyburdette authored Nov 4, 2024
1 parent e12af36 commit 7ae593d
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,11 @@ expression: "&schema"
"description": "Enables connector debugging information on response extensions if the feature is enabled",
"type": "boolean"
},
"expose_sources_in_context": {
"default": false,
"description": "When enabled, adds an entry to the context for use in coprocessors ```json { \"context\": { \"entries\": { \"apollo_connectors::sources_in_query_plan\": [ { \"subgraph_name\": \"subgraph\", \"source_name\": \"source\" } ] } } } ```",
"type": "boolean"
},
"max_requests_per_operation_per_source": {
"default": null,
"description": "The maximum number of requests for a connector source",
Expand Down
15 changes: 15 additions & 0 deletions apollo-router/src/plugins/connectors/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ pub(crate) struct ConnectorsConfig {
/// The maximum number of requests for a connector source
#[serde(default)]
pub(crate) max_requests_per_operation_per_source: Option<usize>,

/// When enabled, adds an entry to the context for use in coprocessors
/// ```json
/// {
/// "context": {
/// "entries": {
/// "apollo_connectors::sources_in_query_plan": [
/// { "subgraph_name": "subgraph", "source_name": "source" }
/// ]
/// }
/// }
/// }
/// ```
#[serde(default)]
pub(crate) expose_sources_in_context: bool,
}

/// Configuration for a connector subgraph
Expand Down
45 changes: 44 additions & 1 deletion apollo-router/src/plugins/connectors/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::json;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt as TowerServiceExt;

use super::query_plans::get_connectors;
use crate::layers::ServiceExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::connectors::configuration::ConnectorsConfig;
use crate::plugins::connectors::request_limit::RequestLimits;
use crate::register_plugin;
use crate::services::connector_service::ConnectorSourceRef;
use crate::services::execution;
use crate::services::router::body::RouterBody;
use crate::services::supergraph;

const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging";
const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING";
const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging";
const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION";
const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan";

static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false);

#[derive(Debug, Clone)]
struct Connectors {
debug_extensions: bool,
max_requests: Option<usize>,
expose_sources_in_context: bool,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -70,6 +77,7 @@ impl Plugin for Connectors {
Ok(Connectors {
debug_extensions,
max_requests,
expose_sources_in_context: init.config.expose_sources_in_context,
})
}

Expand Down Expand Up @@ -121,7 +129,7 @@ impl Plugin for Connectors {
if let Some(first) = &mut first {
if let Some(inner) = Arc::into_inner(debug) {
first.extensions.insert(
"apolloConnectorsDebugging",
CONNECTORS_DEBUG_KEY,
json!({"version": "1", "data": inner.into_inner().serialize() }),
);
}
Expand All @@ -143,6 +151,41 @@ impl Plugin for Connectors {
)
.boxed()
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
if !self.expose_sources_in_context {
return service;
}

ServiceBuilder::new()
.map_request(|req: execution::Request| {
let Some(connectors) = get_connectors(&req.context) else {
return req;
};

// add [{"subgraph_name": "", "source_name": ""}] to the context
// for connectors with sources in the query plan.
let list = req
.query_plan
.root
.service_usage_set()
.into_iter()
.flat_map(|service_name| {
connectors
.get(service_name)
.map(|connector| ConnectorSourceRef::try_from(connector).ok())
})
.unique()
.collect_vec();

req.context
.insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list)
.unwrap();
req
})
.service(service)
.boxed()
}
}

pub(crate) const PLUGIN_NAME: &str = "preview_connectors";
Expand Down
28 changes: 23 additions & 5 deletions apollo-router/src/plugins/connectors/query_plans.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
use std::sync::Arc;

use apollo_federation::sources::connect::Connector;
use indexmap::IndexMap;

use crate::query_planner::PlanNode;
use crate::Context;

type ConnectorsContext = Arc<IndexMap<Arc<str>, String>>;
type ConnectorsByServiceName = Arc<IndexMap<Arc<str>, Connector>>;

pub(crate) fn store_connectors_context(
pub(crate) fn store_connectors(
context: &Context,
connectors_by_service_name: Arc<IndexMap<Arc<str>, Connector>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsByServiceName>(connectors_by_service_name));
}

pub(crate) fn get_connectors(context: &Context) -> Option<ConnectorsByServiceName> {
context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsByServiceName>().cloned())
}

type ConnectorLabels = Arc<IndexMap<Arc<str>, String>>;

pub(crate) fn store_connectors_labels(
context: &Context,
labels_by_service_name: Arc<IndexMap<Arc<str>, String>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsContext>(labels_by_service_name));
.with_lock(|mut lock| lock.insert::<ConnectorLabels>(labels_by_service_name));
}

pub(crate) fn replace_connector_service_names_text(
Expand All @@ -22,7 +40,7 @@ pub(crate) fn replace_connector_service_names_text(
) -> Option<Arc<String>> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());
if let Some(replacements) = replacements {
text.as_ref().map(|text| {
let mut text = text.to_string();
Expand All @@ -42,7 +60,7 @@ pub(crate) fn replace_connector_service_names(
) -> Arc<PlanNode> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());

return if let Some(replacements) = replacements {
let mut plan = plan.clone();
Expand Down
75 changes: 75 additions & 0 deletions apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,81 @@ async fn test_interface_object() {
);
}

#[tokio::test]
async fn test_sources_in_context() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/coprocessor"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"control": "continue",
"version": 1,
"stage": "ExecutionRequest"
})))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/posts"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{ "userId": 1, "id": 1, "title": "title", "body": "body" },
{ "userId": 1, "id": 2, "title": "title", "body": "body" }]
)))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/users/1"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 1,
"name": "Leanne Graham",
"username": "Bret"
})))
.mount(&mock_server)
.await;
let uri = mock_server.uri();

let _ = execute(
&QUICKSTART_SCHEMA.replace("https://jsonplaceholder.typicode.com", &mock_server.uri()),
&uri,
"query Posts { posts { id body title author { name username } } }",
Default::default(),
Some(json!({
"preview_connectors": {
"expose_sources_in_context": true
},
"coprocessor": {
"url": format!("{}/coprocessor", mock_server.uri()),
"execution": {
"request": {
"context": true
}
}
}
})),
|_| {},
)
.await;

let requests = &mock_server.received_requests().await.unwrap();
let coprocessor_request = requests.first().unwrap();
let body = coprocessor_request
.body_json::<serde_json_bytes::Value>()
.unwrap();
pretty_assertions::assert_eq!(
body.get("context")
.unwrap()
.as_object()
.unwrap()
.get("entries")
.unwrap()
.as_object()
.unwrap()
.get("apollo_connectors::sources_in_query_plan")
.unwrap(),
&serde_json_bytes::json!([
{ "subgraph_name": "connectors", "source_name": "jsonPlaceholder" }
])
);
}

mod quickstart_tests {
use super::*;

Expand Down
46 changes: 46 additions & 0 deletions apollo-router/src/query_planner/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use apollo_compiler::collections::HashSet;
use apollo_compiler::validation::Valid;
use router_bridge::planner::PlanOptions;
use router_bridge::planner::UsageReporting;
Expand Down Expand Up @@ -503,6 +504,51 @@ impl PlanNode {
}
}

/// A version of `service_usage` that doesn't use recursion
/// and returns a `HashSet` instead of an `Iterator`.
pub(crate) fn service_usage_set(&self) -> HashSet<&str> {
let mut services = HashSet::default();
let mut stack = vec![self];
while let Some(node) = stack.pop() {
match node {
Self::Sequence { nodes } | Self::Parallel { nodes } => {
stack.extend(nodes.iter());
}
Self::Fetch(fetch) => {
services.insert(fetch.service_name.as_ref());
}
Self::Subscription { primary, rest } => {
services.insert(primary.service_name.as_ref());
if let Some(rest) = rest {
stack.push(rest);
}
}
Self::Flatten(flatten) => {
stack.push(&flatten.node);
}
Self::Defer { primary, deferred } => {
if let Some(primary) = primary.node.as_ref() {
stack.push(primary);
}
stack.extend(deferred.iter().flat_map(|d| d.node.as_deref()));
}
Self::Condition {
if_clause,
else_clause,
..
} => {
if let Some(if_clause) = if_clause {
stack.push(if_clause);
}
if let Some(else_clause) = else_clause {
stack.push(else_clause);
}
}
}
}
services
}

pub(crate) fn extract_authorization_metadata(
&mut self,
schema: &Valid<apollo_compiler::Schema>,
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ pub(crate) async fn create_plugins(
add_optional_apollo_plugin!("demand_control");

// This relative ordering is documented in `docs/source/customizations/native.mdx`:
add_optional_apollo_plugin!("preview_connectors");
add_optional_apollo_plugin!("rhai");
add_optional_apollo_plugin!("coprocessor");
add_optional_apollo_plugin!("preview_connectors");
add_user_plugins!();

// Macros above remove from `apollo_plugin_factories`, so anything left at the end
Expand Down
13 changes: 12 additions & 1 deletion apollo-router/src/services/connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl From<&Connector> for ConnectorInfo {
}

/// A reference to a unique Connector source.
#[derive(Hash, Eq, PartialEq, Clone)]
#[derive(Hash, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub(crate) struct ConnectorSourceRef {
pub(crate) subgraph_name: String,
pub(crate) source_name: String,
Expand Down Expand Up @@ -117,6 +117,17 @@ impl FromStr for ConnectorSourceRef {
}
}

impl TryFrom<&Connector> for ConnectorSourceRef {
type Error = ();

fn try_from(value: &Connector) -> Result<Self, Self::Error> {
Ok(Self {
subgraph_name: value.id.subgraph_name.to_string(),
source_name: value.id.source_name.clone().ok_or(())?,
})
}
}

impl tower::Service<ConnectRequest> for ConnectorService {
type Response = ConnectResponse;
type Error = BoxError;
Expand Down
6 changes: 4 additions & 2 deletions apollo-router/src/services/supergraph/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::graphql;
use crate::graphql::IntoGraphQLErrors;
use crate::graphql::Response;
use crate::plugin::DynPlugin;
use crate::plugins::connectors::query_plans::store_connectors_context;
use crate::plugins::connectors::query_plans::store_connectors;
use crate::plugins::connectors::query_plans::store_connectors_labels;
use crate::plugins::subscription::Subscription;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
Expand Down Expand Up @@ -130,7 +131,8 @@ impl Service<SupergraphRequest> for SupergraphService {

fn call(&mut self, req: SupergraphRequest) -> Self::Future {
if let Some(connectors) = &self.schema.connectors {
store_connectors_context(&req.context, connectors.labels_by_service_name.clone());
store_connectors_labels(&req.context, connectors.labels_by_service_name.clone());
store_connectors(&req.context, connectors.by_service_name.clone());
}

// Consume our cloned services and allow ownership to be transferred to the async block.
Expand Down

0 comments on commit 7ae593d

Please sign in to comment.