From 11eacb4a2698a5e2a79ed3dc3a9a69601301b0c6 Mon Sep 17 00:00:00 2001 From: Coenen Benjamin Date: Wed, 30 Oct 2024 16:27:47 +0100 Subject: [PATCH] feat(telemetry): add ability to set span attributes on connector span (#6125) Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> Co-authored-by: Matthew Hawkins --- ...nfiguration__tests__schema_generation.snap | 59 ++++++++++ apollo-router/src/plugins/connectors/tests.rs | 2 +- .../src/plugins/connectors/tracing.rs | 1 - .../telemetry/config_new/connector/mod.rs | 1 + .../telemetry/config_new/connector/spans.rs | 27 +++++ .../src/plugins/telemetry/config_new/spans.rs | 5 + apollo-router/src/plugins/telemetry/consts.rs | 1 + .../plugins/telemetry/dynamic_attribute.rs | 106 +++++++++++++++++- .../src/plugins/telemetry/formatters/json.rs | 11 +- .../src/plugins/telemetry/formatters/mod.rs | 3 + apollo-router/src/plugins/telemetry/mod.rs | 63 ++++++++--- .../src/plugins/telemetry/tracing/mod.rs | 15 ++- .../src/services/connector_service.rs | 6 +- apollo-router/tests/common.rs | 6 +- .../tests/fixtures/supergraph_connect.graphql | 73 ++++++++++++ .../fixtures/jaeger.connectors.router.yaml | 55 +++++++++ .../tests/integration/telemetry/jaeger.rs | 102 ++++++++++++++++- .../telemetry/instrumentation/selectors.mdx | 3 - .../telemetry/instrumentation/spans.mdx | 8 +- 19 files changed, 506 insertions(+), 41 deletions(-) create mode 100644 apollo-router/src/plugins/telemetry/config_new/connector/spans.rs create mode 100644 apollo-router/tests/fixtures/supergraph_connect.graphql create mode 100644 apollo-router/tests/integration/telemetry/fixtures/jaeger.connectors.router.yaml diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index dd302b42a2..0cc1c7cf4c 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2140,6 +2140,16 @@ expression: "&schema" } ] }, + "ConnectorSpans": { + "additionalProperties": false, + "properties": { + "attributes": { + "$ref": "#/definitions/extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::conditional::Conditional", + "description": "#/definitions/extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::conditional::Conditional" + } + }, + "type": "object" + }, "ConnectorValue": { "anyOf": [ { @@ -5956,6 +5966,10 @@ expression: "&schema" "Spans": { "additionalProperties": false, "properties": { + "connector": { + "$ref": "#/definitions/ConnectorSpans", + "description": "#/definitions/ConnectorSpans" + }, "default_attribute_requirement_level": { "$ref": "#/definitions/DefaultAttributeRequirementLevel", "description": "#/definitions/DefaultAttributeRequirementLevel" @@ -7959,6 +7973,22 @@ expression: "&schema" ], "type": "string" }, + "conditional_attribute_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector": { + "anyOf": [ + { + "$ref": "#/definitions/ConnectorSelector", + "description": "#/definitions/ConnectorSelector" + }, + { + "properties": { + "condition": { + "$ref": "#/definitions/Condition_for_ConnectorSelector", + "description": "#/definitions/Condition_for_ConnectorSelector" + } + } + } + ] + }, "conditional_attribute_apollo_router::plugins::telemetry::config_new::selectors::RouterSelector": { "anyOf": [ { @@ -8434,6 +8464,35 @@ expression: "&schema" }, "type": "object" }, + "extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::conditional::Conditional": { + "additionalProperties": { + "$ref": "#/definitions/conditional_attribute_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector", + "description": "#/definitions/conditional_attribute_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector" + }, + "properties": { + "connector.http.method": { + "$ref": "#/definitions/StandardAttribute", + "description": "#/definitions/StandardAttribute", + "nullable": true + }, + "connector.source.name": { + "$ref": "#/definitions/StandardAttribute", + "description": "#/definitions/StandardAttribute", + "nullable": true + }, + "connector.url.template": { + "$ref": "#/definitions/StandardAttribute", + "description": "#/definitions/StandardAttribute", + "nullable": true + }, + "subgraph.name": { + "$ref": "#/definitions/StandardAttribute", + "description": "#/definitions/StandardAttribute", + "nullable": true + } + }, + "type": "object" + }, "extendable_attribute_apollo_router::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes_apollo_router::plugins::telemetry::config_new::connector::selectors::ConnectorSelector": { "additionalProperties": { "$ref": "#/definitions/ConnectorSelector", diff --git a/apollo-router/src/plugins/connectors/tests.rs b/apollo-router/src/plugins/connectors/tests.rs index c582566cbc..c6f3230e5e 100644 --- a/apollo-router/src/plugins/connectors/tests.rs +++ b/apollo-router/src/plugins/connectors/tests.rs @@ -26,7 +26,7 @@ use wiremock::MockServer; use wiremock::ResponseTemplate; use crate::json_ext::ValueExt; -use crate::plugins::connectors::tracing::CONNECT_SPAN_NAME; +use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; use crate::router_factory::RouterSuperServiceFactory; use crate::router_factory::YamlRouterFactory; diff --git a/apollo-router/src/plugins/connectors/tracing.rs b/apollo-router/src/plugins/connectors/tracing.rs index 6e2f448a77..c215116fe2 100644 --- a/apollo-router/src/plugins/connectors/tracing.rs +++ b/apollo-router/src/plugins/connectors/tracing.rs @@ -1,2 +1 @@ -pub(crate) const CONNECT_SPAN_NAME: &str = "connect"; pub(crate) const CONNECTOR_TYPE_HTTP: &str = "http"; diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs b/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs index 32d6be43de..6242ccc39d 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs @@ -7,6 +7,7 @@ pub(crate) mod attributes; pub(crate) mod events; pub(crate) mod instruments; pub(crate) mod selectors; +pub(crate) mod spans; pub(crate) type ConnectorRequest = HttpRequest; pub(crate) type ConnectorResponse = HttpResponse; diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/spans.rs b/apollo-router/src/plugins/telemetry/config_new/connector/spans.rs new file mode 100644 index 0000000000..db58f9f4f9 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/connector/spans.rs @@ -0,0 +1,27 @@ +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel; +use crate::plugins::telemetry::config_new::conditional::Conditional; +use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes; +use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector; +use crate::plugins::telemetry::config_new::extendable::Extendable; +use crate::plugins::telemetry::config_new::DefaultForLevel; +use crate::plugins::telemetry::otlp::TelemetryDataKind; + +#[derive(Deserialize, JsonSchema, Clone, Default, Debug)] +#[serde(deny_unknown_fields, default)] +pub(crate) struct ConnectorSpans { + /// Custom attributes that are attached to the connector span. + pub(crate) attributes: Extendable>, +} + +impl DefaultForLevel for ConnectorSpans { + fn defaults_for_level( + &mut self, + requirement_level: DefaultAttributeRequirementLevel, + kind: TelemetryDataKind, + ) { + self.attributes.defaults_for_level(requirement_level, kind); + } +} diff --git a/apollo-router/src/plugins/telemetry/config_new/spans.rs b/apollo-router/src/plugins/telemetry/config_new/spans.rs index 0abf12b797..20436480d9 100644 --- a/apollo-router/src/plugins/telemetry/config_new/spans.rs +++ b/apollo-router/src/plugins/telemetry/config_new/spans.rs @@ -2,6 +2,7 @@ use schemars::JsonSchema; use serde::Deserialize; use super::conditional::Conditional; +use super::connector::spans::ConnectorSpans; use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel; use crate::plugins::telemetry::config_new::attributes::RouterAttributes; use crate::plugins::telemetry::config_new::attributes::SubgraphAttributes; @@ -35,6 +36,10 @@ pub(crate) struct Spans { /// Attributes to include on the subgraph span. /// Subgraph spans contain information about the subgraph request and response and therefore contain subgraph specific attributes. pub(crate) subgraph: SubgraphSpans, + + /// Attributes to include on the connector span. + /// Connector spans contain information about the connector request and response and therefore contain connector specific attributes. + pub(crate) connector: ConnectorSpans, } impl Spans { diff --git a/apollo-router/src/plugins/telemetry/consts.rs b/apollo-router/src/plugins/telemetry/consts.rs index c82d7b202b..dbf34f4532 100644 --- a/apollo-router/src/plugins/telemetry/consts.rs +++ b/apollo-router/src/plugins/telemetry/consts.rs @@ -19,6 +19,7 @@ pub(crate) const QUERY_PLANNING_SPAN_NAME: &str = "query_planning"; pub(crate) const HTTP_REQUEST_SPAN_NAME: &str = "http_request"; pub(crate) const SUBGRAPH_REQUEST_SPAN_NAME: &str = "subgraph_request"; pub(crate) const QUERY_PARSING_SPAN_NAME: &str = "parse_query"; +pub(crate) const CONNECT_SPAN_NAME: &str = "connect"; pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 9] = [ REQUEST_SPAN_NAME, diff --git a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs index d9cde3ca21..8b893f1add 100644 --- a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs +++ b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs @@ -10,6 +10,7 @@ use super::consts::OTEL_KIND; use super::consts::OTEL_NAME; use super::consts::OTEL_STATUS_CODE; use super::consts::OTEL_STATUS_MESSAGE; +use super::formatters::APOLLO_CONNECTOR_PREFIX; use super::formatters::APOLLO_PRIVATE_PREFIX; use super::otel::layer::str_to_span_kind; use super::otel::layer::str_to_status; @@ -69,6 +70,11 @@ impl DynAttributeLayer { pub(crate) trait SpanDynAttribute { fn set_span_dyn_attribute(&self, key: Key, value: opentelemetry::Value); fn set_span_dyn_attributes(&self, attributes: impl IntoIterator); + fn set_span_dyn_attributes_for_span_name( + &self, + span_name: &str, + attributes: impl IntoIterator, + ); } impl SpanDynAttribute for ::tracing::Span { @@ -101,7 +107,7 @@ impl SpanDynAttribute for ::tracing::Span { } } } else { - if key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) { + if key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) || key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) { return; } let mut extensions = s.extensions_mut(); @@ -170,7 +176,98 @@ impl SpanDynAttribute for ::tracing::Span { } } else { let mut attributes = attributes - .filter(|kv| !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)) + .filter(|kv| { + !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) + && !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) + }) + .peekable(); + if attributes.peek().is_none() { + return; + } + let mut extensions = s.extensions_mut(); + match extensions.get_mut::() { + Some(registered_attributes) => { + registered_attributes.extend(attributes); + } + None => { + // Can't use ::tracing::error! because it could create deadlock on extensions + eprintln!("no LogAttributes, this is a bug"); + } + } + } + } + }; + } else { + ::tracing::error!("no Registry, this is a bug"); + } + }); + } + + fn set_span_dyn_attributes_for_span_name( + &self, + span_name: &str, + attributes: impl IntoIterator, + ) { + let mut attributes = attributes.into_iter().peekable(); + if attributes.peek().is_none() { + return; + } + self.with_subscriber(move |(id, dispatch)| { + if let Some(reg) = dispatch.downcast_ref::() { + match reg.span(id) { + None => eprintln!("no spanref, this is a bug"), + Some(mut s) => { + if s.name() != span_name { + while let Some(parent_span) = s.parent() { + if parent_span.name() == span_name { + s = parent_span; + break; + } + s = parent_span; + } + } + + if s.is_sampled() { + let mut extensions = s.extensions_mut(); + match extensions.get_mut::() { + Some(otel_data) => { + if otel_data.builder.attributes.is_none() { + otel_data.builder.attributes = Some( + attributes + .inspect(|attr| { + update_otel_data( + otel_data, + &attr.key, + &attr.value, + ) + }) + .collect(), + ); + } else { + let attributes: Vec = attributes + .inspect(|attr| { + update_otel_data(otel_data, &attr.key, &attr.value) + }) + .collect(); + otel_data + .builder + .attributes + .as_mut() + .unwrap() + .extend(attributes); + } + } + None => { + // Can't use ::tracing::error! because it could create deadlock on extensions + eprintln!("no OtelData, this is a bug"); + } + } + } else { + let mut attributes = attributes + .filter(|kv| { + !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) + && !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) + }) .peekable(); if attributes.peek().is_none() { return; @@ -265,7 +362,10 @@ impl EventDynAttribute for ::tracing::Span { } } else { let mut attributes = attributes - .filter(|kv| !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)) + .filter(|kv| { + !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) + && !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) + }) .peekable(); if attributes.peek().is_none() { return; diff --git a/apollo-router/src/plugins/telemetry/formatters/json.rs b/apollo-router/src/plugins/telemetry/formatters/json.rs index 8f6551ce14..fc53ac1499 100644 --- a/apollo-router/src/plugins/telemetry/formatters/json.rs +++ b/apollo-router/src/plugins/telemetry/formatters/json.rs @@ -20,6 +20,7 @@ use tracing_subscriber::registry::SpanRef; use super::get_trace_and_span_id; use super::EventFormatter; +use super::APOLLO_CONNECTOR_PREFIX; use super::APOLLO_PRIVATE_PREFIX; use super::EXCLUDED_ATTRIBUTES; use crate::plugins::telemetry::config::AttributeValue; @@ -128,7 +129,9 @@ where if let Some(otel_attributes) = otel_attributes { for (key, value) in otel_attributes.iter().filter(|(key, _)| { let key_name = key.as_str(); - !key_name.starts_with(APOLLO_PRIVATE_PREFIX) && !self.1.contains(&key_name) + !key_name.starts_with(APOLLO_PRIVATE_PREFIX) + && !key_name.starts_with(APOLLO_CONNECTOR_PREFIX) + && !self.1.contains(&key_name) }) { serializer.serialize_entry(key.as_str(), &value.as_str())?; } @@ -147,7 +150,9 @@ where }; for kv in custom_attributes.iter().filter(|kv| { let key_name = kv.key.as_str(); - !key_name.starts_with(APOLLO_PRIVATE_PREFIX) && !self.1.contains(&key_name) + !key_name.starts_with(APOLLO_PRIVATE_PREFIX) + && !key_name.starts_with(APOLLO_CONNECTOR_PREFIX) + && !self.1.contains(&key_name) }) { match &kv.value { Value::Bool(value) => { @@ -403,6 +408,7 @@ where .filter(|(key, _)| { let key_name = key.as_str(); !key_name.starts_with(APOLLO_PRIVATE_PREFIX) + && !key_name.starts_with(APOLLO_CONNECTOR_PREFIX) && include_attributes.contains(key_name) }) .map(|(key, val)| (key.clone(), val.clone())), @@ -427,6 +433,7 @@ where .filter(|kv| { let key_name = kv.key.as_str(); !key_name.starts_with(APOLLO_PRIVATE_PREFIX) + && !key_name.starts_with(APOLLO_CONNECTOR_PREFIX) && include_attributes.contains(key_name) }) .map(|kv| (kv.key.clone(), kv.value.clone())), diff --git a/apollo-router/src/plugins/telemetry/formatters/mod.rs b/apollo-router/src/plugins/telemetry/formatters/mod.rs index f99bc6a91c..17a4abb5c2 100644 --- a/apollo-router/src/plugins/telemetry/formatters/mod.rs +++ b/apollo-router/src/plugins/telemetry/formatters/mod.rs @@ -32,6 +32,9 @@ use crate::metrics::layer::METRIC_PREFIX_VALUE; use crate::plugins::telemetry::otel::OtelData; pub(crate) const APOLLO_PRIVATE_PREFIX: &str = "apollo_private."; +// FIXME: this is a temporary solution to avoid exposing hardcoded attributes in connector spans instead of using the custom telemetry features. +// The reason this is introduced right now is to directly avoid people relying on these attributes and then creating a breaking change in the future. +pub(crate) const APOLLO_CONNECTOR_PREFIX: &str = "apollo.connector."; // This list comes from Otel https://opentelemetry.io/docs/specs/semconv/attributes-registry/code/ and pub(crate) const EXCLUDED_ATTRIBUTES: [&str; 5] = [ "code.filepath", diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 25f8a5d134..e44762b412 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -108,6 +108,7 @@ use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments; use crate::plugins::telemetry::config_new::instruments::SupergraphInstruments; use crate::plugins::telemetry::config_new::trace_id; use crate::plugins::telemetry::config_new::DatadogId; +use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; use crate::plugins::telemetry::consts::EXECUTION_SPAN_NAME; use crate::plugins::telemetry::consts::OTEL_NAME; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; @@ -865,6 +866,7 @@ impl PluginPrivate for Telemetry { service: crate::services::http::BoxService, ) -> crate::services::http::BoxService { let req_fn_config = self.config.clone(); + let res_fn_config = self.config.clone(); let static_connector_instruments = self.connector_custom_instruments.read().clone(); ServiceBuilder::new() .map_future_with_request_data( @@ -881,9 +883,17 @@ impl PluginPrivate for Telemetry { let custom_events = req_fn_config.instrumentation.events.new_connector_events(); custom_events.on_request(http_request); + + let custom_span_attributes = req_fn_config + .instrumentation + .spans + .connector + .attributes + .on_request(http_request); + ( http_request.context.clone(), - Some((custom_instruments, custom_events)), + Some((custom_instruments, custom_events, custom_span_attributes)), ) } else { (http_request.context.clone(), None) @@ -891,28 +901,51 @@ impl PluginPrivate for Telemetry { }, move |(context, custom_telemetry): ( Context, - Option<(ConnectorInstruments, ConnectorEvents)>, + Option<(ConnectorInstruments, ConnectorEvents, Vec)>, ), f: BoxFuture< 'static, Result, >| { + let conf = res_fn_config.clone(); async move { - if let Some((custom_instruments, custom_events)) = custom_telemetry { - let result = f.await; - match &result { - Ok(http_response) => { - custom_instruments.on_response(http_response); - custom_events.on_response(http_response); - } - Err(err) => { - custom_instruments.on_error(err, &context); - custom_events.on_error(err, &context); + match custom_telemetry { + Some((custom_instruments, custom_events, custom_span_attributes)) => { + let span = Span::current(); + span.set_span_dyn_attributes_for_span_name( + CONNECT_SPAN_NAME, + custom_span_attributes, + ); + let result = f.await; + match &result { + Ok(http_response) => { + span.set_span_dyn_attributes_for_span_name( + CONNECT_SPAN_NAME, + conf.instrumentation + .spans + .connector + .attributes + .on_response(http_response), + ); + custom_instruments.on_response(http_response); + custom_events.on_response(http_response); + } + Err(err) => { + span.set_span_dyn_attributes_for_span_name( + CONNECT_SPAN_NAME, + conf.instrumentation + .spans + .connector + .attributes + .on_error(err, &context), + ); + custom_instruments.on_error(err, &context); + custom_events.on_error(err, &context); + } } + result } - result - } else { - f.await + _ => f.await, } } }, diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index 0172f3e094..ce83b1b29d 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use tower::BoxError; use super::config_new::spans::Spans; +use super::formatters::APOLLO_CONNECTOR_PREFIX; use super::formatters::APOLLO_PRIVATE_PREFIX; use crate::plugins::telemetry::config::TracingCommon; @@ -50,17 +51,19 @@ impl SpanProcessor for ApolloFilterSpanProcessor { } fn on_end(&self, span: SpanData) { - if span - .attributes - .iter() - .any(|(key, _)| key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)) - { + if span.attributes.iter().any(|(key, _)| { + key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) + || key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) + }) { let attributes_len = span.attributes.len(); let span = SpanData { attributes: span .attributes .into_iter() - .filter(|(k, _)| !k.as_str().starts_with(APOLLO_PRIVATE_PREFIX)) + .filter(|(k, _)| { + !k.as_str().starts_with(APOLLO_PRIVATE_PREFIX) + && !k.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) + }) .fold( EvictedHashMap::new(attributes_len as u32, attributes_len), |mut m, (k, v)| { diff --git a/apollo-router/src/services/connector_service.rs b/apollo-router/src/services/connector_service.rs index e043afd070..a8b29f9601 100644 --- a/apollo-router/src/services/connector_service.rs +++ b/apollo-router/src/services/connector_service.rs @@ -32,8 +32,8 @@ use crate::plugins::connectors::make_requests::make_requests; use crate::plugins::connectors::plugin::ConnectorContext; use crate::plugins::connectors::request_limit::RequestLimits; use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; -use crate::plugins::connectors::tracing::CONNECT_SPAN_NAME; use crate::plugins::subscription::SubscriptionConfig; +use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; use crate::services::ConnectRequest; use crate::services::ConnectResponse; use crate::spec::Schema; @@ -145,7 +145,6 @@ impl tower::Service for ConnectorService { let Some(http_client_factory) = http_client_factory else { return Err("no http client found".into()); }; - let fetch_time_offset = request.context.created_at.elapsed().as_nanos() as i64; let span = tracing::info_span!( CONNECT_SPAN_NAME, @@ -159,6 +158,9 @@ impl tower::Service for ConnectorService { "apollo_private.sent_time_offset" = fetch_time_offset, "otel.status_code" = tracing::field::Empty, ); + // TODO: I think we should get rid of these attributes by default and only add it from custom telemetry. We just need to double check it's not required for Studio. + + // These additionnal attributes will be added to custom telemetry feature // TODO: apollo.connector.field.alias // TODO: apollo.connector.field.return_type // TODO: apollo.connector.field.selection_set diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index 4a9506a658..4e349d3bb8 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -297,7 +297,9 @@ impl IntegrationTest { let url = format!("http://{address}/"); // Add a default override for products, if not specified - subgraph_overrides.entry("products".into()).or_insert(url); + subgraph_overrides + .entry("products".into()) + .or_insert(url.clone()); // Insert the overrides into the config let config_str = merge_overrides(&config, &subgraph_overrides, None, &redis_namespace); @@ -387,7 +389,6 @@ impl IntegrationTest { .env("APOLLO_KEY", apollo_key) .env("APOLLO_GRAPH_REF", apollo_graph_ref); } - router .args(dbg!([ "--hr", @@ -413,7 +414,6 @@ impl IntegrationTest { let mut lines = reader.lines(); while let Ok(Some(line)) = lines.next_line().await { println!("{line}"); - // Extract the bind address from a log line that looks like this: GraphQL endpoint exposed at http://127.0.0.1:51087/ if let Some(captures) = bind_address_regex.captures(&line) { let address = captures.name("address").unwrap().as_str(); diff --git a/apollo-router/tests/fixtures/supergraph_connect.graphql b/apollo-router/tests/fixtures/supergraph_connect.graphql new file mode 100644 index 0000000000..a1ed2a27a1 --- /dev/null +++ b/apollo-router/tests/fixtures/supergraph_connect.graphql @@ -0,0 +1,73 @@ +schema + @link(url: "https://specs.apollo.dev/link/v1.0") + @link(url: "https://specs.apollo.dev/join/v0.5", for: EXECUTION) + @link(url: "https://specs.apollo.dev/connect/v0.1", for: EXECUTION) + @join__directive(graphs: [POSTS], name: "link", args: {url: "https://specs.apollo.dev/connect/v0.1", import: ["@connect", "@source"]}) + @join__directive(graphs: [POSTS], name: "source", args: {name: "jsonPlaceholder", http: {baseURL: "https://jsonplaceholder.typicode.com/"}}) + @join__directive(graphs: [POSTS], name: "source", args: {name: "routerHealth", http: {baseURL: "http://localhost:4000/"}}) +{ + query: Query +} + +directive @join__directive(graphs: [join__Graph!], name: String!, args: join__DirectiveArguments) repeatable on SCHEMA | OBJECT | INTERFACE | FIELD_DEFINITION + +directive @join__enumValue(graph: join__Graph!) repeatable on ENUM_VALUE + +directive @join__field(graph: join__Graph, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean, overrideLabel: String, contextArguments: [join__ContextArgument!]) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION + +directive @join__graph(name: String!, url: String!) on ENUM_VALUE + +directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE + +directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true, isInterfaceObject: Boolean! = false) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR + +directive @join__unionMember(graph: join__Graph!, member: String!) repeatable on UNION + +directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA + +input join__ContextArgument { + name: String! + type: String! + context: String! + selection: join__FieldValue! +} + +scalar join__DirectiveArguments + +scalar join__FieldSet + +scalar join__FieldValue + +enum join__Graph { + POSTS @join__graph(name: "posts", url: "http://localhost") +} + +scalar link__Import + +enum link__Purpose { + """ + `SECURITY` features provide metadata necessary to securely resolve fields. + """ + SECURITY + + """ + `EXECUTION` features provide metadata necessary for operation execution. + """ + EXECUTION +} + +type Post + @join__type(graph: POSTS) +{ + id: ID! + body: String + title: String + status: String +} + +type Query + @join__type(graph: POSTS) +{ + posts: [Post] @join__directive(graphs: [POSTS], name: "connect", args: {source: "jsonPlaceholder", http: {GET: "/posts"}, selection: "id\ntitle\nbody"}) + post(id: ID!): Post @join__directive(graphs: [POSTS], name: "connect", args: {source: "jsonPlaceholder", http: {GET: "/posts/{$args.id}"}, selection: "id\ntitle\nbody"}) @join__directive(graphs: [POSTS], name: "connect", args: {source: "routerHealth", http: {GET: "/health?_={$args.id}"}, selection: "id: $args.id\nstatus", entity: true}) +} diff --git a/apollo-router/tests/integration/telemetry/fixtures/jaeger.connectors.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/jaeger.connectors.router.yaml new file mode 100644 index 0000000000..6c0e4b4b84 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/jaeger.connectors.router.yaml @@ -0,0 +1,55 @@ +telemetry: + instrumentation: + spans: + connector: + attributes: + connector.source.name: true + connector.http.method: true + connector.http.response.status_code: + connector_http_response_status: code + connector.url.template: true + connector.http.response.header.content-type: + connector_http_response_header: "content-type" + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + propagation: + jaeger: true + common: + service_name: router + sampler: always_on + jaeger: + enabled: true + batch_processor: + scheduled_delay: 100ms + collector: + endpoint: http://127.0.0.1:14268/api/traces + logging: + experimental_when_header: + - name: apollo-router-log-request + value: test + headers: true # default: false + body: true # default: false + # log request for all requests coming from Iphones + - name: custom-header + match: ^foo.* + headers: true + +override_subgraph_url: + products: http://localhost:4005 +include_subgraph_errors: + all: true + +supergraph: + listen: "127.0.0.1:50642" +health_check: + enabled: true + listen: "127.0.0.1:50642" +preview_connectors: + subgraphs: + posts: # The name of the subgraph + sources: + routerHealth: # Refers to @source(name: "routerHealth") + override_url: http://127.0.0.1:50642 diff --git a/apollo-router/tests/integration/telemetry/jaeger.rs b/apollo-router/tests/integration/telemetry/jaeger.rs index fcf59e4ef5..eb3839b036 100644 --- a/apollo-router/tests/integration/telemetry/jaeger.rs +++ b/apollo-router/tests/integration/telemetry/jaeger.rs @@ -1,6 +1,7 @@ extern crate core; use std::collections::HashSet; +use std::path::PathBuf; use std::time::Duration; use anyhow::anyhow; @@ -38,6 +39,7 @@ async fn test_reload() -> Result<(), BoxError> { Some("ExampleQuery"), &["client", "router", "subgraph"], false, + false, ) .await?; router.touch_config().await; @@ -71,6 +73,7 @@ async fn test_remote_root() -> Result<(), BoxError> { Some("ExampleQuery"), &["client", "router", "subgraph"], false, + false, ) .await?; @@ -102,6 +105,7 @@ async fn test_local_root() -> Result<(), BoxError> { Some("ExampleQuery"), &["router", "subgraph"], false, + false, ) .await?; @@ -150,6 +154,7 @@ async fn test_local_root_50_percent_sample() -> Result<(), BoxError> { Some("ExampleQuery"), &["router", "subgraph"], false, + false, ) .await .is_ok() @@ -208,12 +213,48 @@ async fn test_default_operation() -> Result<(), BoxError> { Some("ExampleQuery1"), &["client", "router", "subgraph"], false, + false, ) .await?; router.graceful_shutdown().await; Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_rest_connectors() -> Result<(), BoxError> { + if std::env::var("TEST_APOLLO_KEY").is_ok() && std::env::var("TEST_APOLLO_GRAPH_REF").is_ok() { + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Jaeger) + .supergraph(PathBuf::from("tests/fixtures/supergraph_connect.graphql")) + .config(include_str!("fixtures/jaeger.connectors.router.yaml")) + .build() + .await; + + router.start().await; + router.assert_started().await; + let query = json!({"query":"query ExampleQuery { posts { id } }","variables":{}, "operationName": "ExampleQuery"}); + + let (id, result) = router.execute_query(&query).await; + assert!(!result + .headers() + .get("apollo-custom-trace-id") + .unwrap() + .is_empty()); + + validate_trace( + id, + &query, + Some("ExampleQuery"), + &["client", "router"], + false, + true, + ) + .await?; + router.graceful_shutdown().await; + } + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_anonymous_operation() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() @@ -233,7 +274,15 @@ async fn test_anonymous_operation() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_trace(id, &query, None, &["client", "router", "subgraph"], false).await?; + validate_trace( + id, + &query, + None, + &["client", "router", "subgraph"], + false, + false, + ) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -262,6 +311,7 @@ async fn test_selected_operation() -> Result<(), BoxError> { Some("ExampleQuery2"), &["client", "router", "subgraph"], false, + false, ) .await?; router.graceful_shutdown().await; @@ -288,6 +338,7 @@ async fn test_span_customization() -> Result<(), BoxError> { Some("ExampleQuery"), &["client", "router", "subgraph"], true, + false, ) .await?; router.graceful_shutdown().await; @@ -324,6 +375,7 @@ async fn test_decimal_trace_id() -> Result<(), BoxError> { Some("ExampleQuery1"), &["client", "router", "subgraph"], false, + false, ) .await?; router.graceful_shutdown().await; @@ -336,6 +388,7 @@ async fn validate_trace( operation_name: Option<&str>, services: &[&'static str], custom_span_instrumentation: bool, + check_connect_span: bool, ) -> Result<(), BoxError> { let params = url::form_urlencoded::Serializer::new(String::new()) .append_pair("service", services.first().expect("expected root service")) @@ -350,6 +403,7 @@ async fn validate_trace( operation_name, services, custom_span_instrumentation, + check_connect_span, ) .await .is_ok() @@ -364,6 +418,7 @@ async fn validate_trace( operation_name, services, custom_span_instrumentation, + check_connect_span, ) .await?; Ok(()) @@ -375,6 +430,7 @@ async fn find_valid_trace( operation_name: Option<&str>, services: &[&'static str], custom_span_instrumentation: bool, + check_connect_span: bool, ) -> Result<(), BoxError> { // A valid trace has: // * All three services @@ -392,7 +448,7 @@ async fn find_valid_trace( verify_trace_participants(&trace, services)?; // Verify that we got the expected span operation names - verify_spans_present(&trace, operation_name, services)?; + verify_spans_present(&trace, operation_name, services, check_connect_span)?; // Verify that all spans have a path to the root 'client_request' span verify_span_parenting(&trace, services)?; @@ -406,6 +462,10 @@ async fn find_valid_trace( // Verify that router span fields are present verify_router_span_fields(&trace, custom_span_instrumentation)?; + if check_connect_span { + verify_connect_span_fields(&trace)?; + } + Ok(()) } @@ -550,6 +610,37 @@ fn verify_supergraph_span_fields( Ok(()) } +fn verify_connect_span_fields(trace: &Value) -> Result<(), BoxError> { + // We can't actually assert the values on a span. Only that a field has been set. + let connect_span = trace.select_path("$..spans[?(@.operationName == 'connect')]")?[0]; + assert_eq!( + connect_span + .select_path("$.tags[?(@.key == 'connector.http.method')].value")? + .first(), + Some(&&Value::String("GET".to_string())) + ); + assert_eq!( + connect_span + .select_path("$.tags[?(@.key == 'connector.source.name')].value")? + .first(), + Some(&&Value::String("jsonPlaceholder".to_string())) + ); + assert_eq!( + connect_span + .select_path("$.tags[?(@.key == 'connector.url.template')].value")? + .first(), + Some(&&Value::String("/posts".to_string())) + ); + assert_eq!( + connect_span + .select_path("$.tags[?(@.key == 'subgraph.name')].value")? + .first(), + None + ); + + Ok(()) +} + fn verify_trace_participants(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { let actual_services: HashSet = trace .select_path("$..serviceName")? @@ -574,6 +665,7 @@ fn verify_spans_present( trace: &Value, operation_name: Option<&str>, services: &[&'static str], + check_connect_span: bool, ) -> Result<(), BoxError> { let operation_names: HashSet = trace .select_path("$..operationName")? @@ -583,7 +675,6 @@ fn verify_spans_present( let mut expected_operation_names: HashSet = HashSet::from( [ "execution", - "subgraph server", operation_name .map(|name| format!("query {name}")) .unwrap_or("query".to_string()) @@ -592,10 +683,13 @@ fn verify_spans_present( "fetch", //"parse_query", Parse query will only happen once //"query_planning", query planning will only happen once - "subgraph", ] .map(|s| s.into()), ); + if !check_connect_span { + expected_operation_names.insert("subgraph server".into()); + expected_operation_names.insert("subgraph".into()); + } if services.contains(&"client") { expected_operation_names.insert("client_request".into()); } diff --git a/docs/source/configuration/telemetry/instrumentation/selectors.mdx b/docs/source/configuration/telemetry/instrumentation/selectors.mdx index 6002dbc271..d63e9003a3 100644 --- a/docs/source/configuration/telemetry/instrumentation/selectors.mdx +++ b/docs/source/configuration/telemetry/instrumentation/selectors.mdx @@ -114,9 +114,6 @@ Apollo Connectors for REST APIs make HTTP calls to the upstream HTTP API. These | `static` | No | | A static string value | | `error` | No | `reason` | A string value containing error reason when it's a critical error | - -The above Apollo Connectors selectors are not currently supported for Spans, only for Instruments, Events, and Conditions. - ### GraphQL diff --git a/docs/source/configuration/telemetry/instrumentation/spans.mdx b/docs/source/configuration/telemetry/instrumentation/spans.mdx index 73dddd58e0..696aecf626 100644 --- a/docs/source/configuration/telemetry/instrumentation/spans.mdx +++ b/docs/source/configuration/telemetry/instrumentation/spans.mdx @@ -14,7 +14,7 @@ A **span** captures contextual information about requests and responses as they' -The `router`, `supergraph` and `subgraph` sections are used to define custom span configuration for each service: +The `router`, `supergraph`, `subgraph` and `connector` sections are used to define custom span configuration for each service: ```yaml title="router.yaml" telemetry: @@ -29,6 +29,9 @@ telemetry: subgraph: # highlight-line attributes: {} # ... + connector: # highlight-line + attributes: {} + # ... ``` ### `attributes` @@ -238,6 +241,9 @@ telemetry: subgraph: attributes: {} # ... + connector: + attributes: {} + # ... ``` ## Spans configuration reference