Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): add ability to set span attributes on connector span #6125

Merged
merged 21 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use wiremock::MockServer;
use wiremock::ResponseTemplate;

use crate::json_ext::ValueExt;
use crate::plugins::connectors::tracing::CONNECT_SPAN_NAME;
use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
use crate::router_factory::RouterSuperServiceFactory;
use crate::router_factory::YamlRouterFactory;
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/plugins/connectors/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub(crate) const CONNECT_SPAN_NAME: &str = "connect";
pub(crate) const CONNECTOR_TYPE_HTTP: &str = "http";
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub(crate) mod attributes;
pub(crate) mod events;
pub(crate) mod instruments;
pub(crate) mod selectors;
pub(crate) mod spans;

pub(crate) type ConnectorRequest = HttpRequest;
pub(crate) type ConnectorResponse = HttpResponse;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use schemars::JsonSchema;
use serde::Deserialize;

use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel;
use crate::plugins::telemetry::config_new::conditional::Conditional;
use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes;
use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector;
use crate::plugins::telemetry::config_new::extendable::Extendable;
use crate::plugins::telemetry::config_new::DefaultForLevel;
use crate::plugins::telemetry::otlp::TelemetryDataKind;

#[derive(Deserialize, JsonSchema, Clone, Default, Debug)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ConnectorSpans {
/// Custom attributes that are attached to the connector span.
pub(crate) attributes: Extendable<ConnectorAttributes, Conditional<ConnectorSelector>>,
}

impl DefaultForLevel for ConnectorSpans {
fn defaults_for_level(
&mut self,
requirement_level: DefaultAttributeRequirementLevel,
kind: TelemetryDataKind,
) {
self.attributes.defaults_for_level(requirement_level, kind);
}
}
5 changes: 5 additions & 0 deletions apollo-router/src/plugins/telemetry/config_new/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use schemars::JsonSchema;
use serde::Deserialize;

use super::conditional::Conditional;
use super::connector::spans::ConnectorSpans;
use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel;
use crate::plugins::telemetry::config_new::attributes::RouterAttributes;
use crate::plugins::telemetry::config_new::attributes::SubgraphAttributes;
Expand Down Expand Up @@ -35,6 +36,10 @@ pub(crate) struct Spans {
/// Attributes to include on the subgraph span.
/// Subgraph spans contain information about the subgraph request and response and therefore contain subgraph specific attributes.
pub(crate) subgraph: SubgraphSpans,

/// Attributes to include on the connector span.
/// Connector spans contain information about the connector request and response and therefore contain connector specific attributes.
pub(crate) connector: ConnectorSpans,
}

impl Spans {
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/plugins/telemetry/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) const QUERY_PLANNING_SPAN_NAME: &str = "query_planning";
pub(crate) const HTTP_REQUEST_SPAN_NAME: &str = "http_request";
pub(crate) const SUBGRAPH_REQUEST_SPAN_NAME: &str = "subgraph_request";
pub(crate) const QUERY_PARSING_SPAN_NAME: &str = "parse_query";
pub(crate) const CONNECT_SPAN_NAME: &str = "connect";

pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 9] = [
REQUEST_SPAN_NAME,
Expand Down
106 changes: 103 additions & 3 deletions apollo-router/src/plugins/telemetry/dynamic_attribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::consts::OTEL_KIND;
use super::consts::OTEL_NAME;
use super::consts::OTEL_STATUS_CODE;
use super::consts::OTEL_STATUS_MESSAGE;
use super::formatters::APOLLO_CONNECTOR_PREFIX;
use super::formatters::APOLLO_PRIVATE_PREFIX;
use super::otel::layer::str_to_span_kind;
use super::otel::layer::str_to_status;
Expand Down Expand Up @@ -69,6 +70,11 @@ impl DynAttributeLayer {
pub(crate) trait SpanDynAttribute {
fn set_span_dyn_attribute(&self, key: Key, value: opentelemetry::Value);
fn set_span_dyn_attributes(&self, attributes: impl IntoIterator<Item = KeyValue>);
fn set_span_dyn_attributes_for_span_name(
&self,
span_name: &str,
attributes: impl IntoIterator<Item = KeyValue>,
);
}

impl SpanDynAttribute for ::tracing::Span {
Expand Down Expand Up @@ -101,7 +107,7 @@ impl SpanDynAttribute for ::tracing::Span {
}
}
} else {
if key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) {
if key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) || key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) {
return;
}
let mut extensions = s.extensions_mut();
Expand Down Expand Up @@ -170,7 +176,98 @@ impl SpanDynAttribute for ::tracing::Span {
}
} else {
let mut attributes = attributes
.filter(|kv| !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX))
.filter(|kv| {
!kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)
&& !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX)
})
.peekable();
if attributes.peek().is_none() {
return;
}
let mut extensions = s.extensions_mut();
match extensions.get_mut::<LogAttributes>() {
Some(registered_attributes) => {
registered_attributes.extend(attributes);
}
None => {
// Can't use ::tracing::error! because it could create deadlock on extensions
eprintln!("no LogAttributes, this is a bug");
}
}
}
}
};
} else {
::tracing::error!("no Registry, this is a bug");
}
});
}

fn set_span_dyn_attributes_for_span_name(
&self,
span_name: &str,
attributes: impl IntoIterator<Item = KeyValue>,
) {
let mut attributes = attributes.into_iter().peekable();
if attributes.peek().is_none() {
return;
}
self.with_subscriber(move |(id, dispatch)| {
if let Some(reg) = dispatch.downcast_ref::<Registry>() {
match reg.span(id) {
None => eprintln!("no spanref, this is a bug"),
Some(mut s) => {
if s.name() != span_name {
while let Some(parent_span) = s.parent() {
if parent_span.name() == span_name {
s = parent_span;
break;
}
s = parent_span;
}
}

if s.is_sampled() {
let mut extensions = s.extensions_mut();
match extensions.get_mut::<OtelData>() {
Some(otel_data) => {
if otel_data.builder.attributes.is_none() {
otel_data.builder.attributes = Some(
attributes
.inspect(|attr| {
update_otel_data(
otel_data,
&attr.key,
&attr.value,
)
})
.collect(),
);
} else {
let attributes: Vec<KeyValue> = attributes
.inspect(|attr| {
update_otel_data(otel_data, &attr.key, &attr.value)
})
.collect();
otel_data
.builder
.attributes
.as_mut()
.unwrap()
.extend(attributes);
}
}
None => {
// Can't use ::tracing::error! because it could create deadlock on extensions
eprintln!("no OtelData, this is a bug");
}
}
} else {
let mut attributes = attributes
.filter(|kv| {
!kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)
&& !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX)
})
.peekable();
if attributes.peek().is_none() {
return;
Expand Down Expand Up @@ -265,7 +362,10 @@ impl EventDynAttribute for ::tracing::Span {
}
} else {
let mut attributes = attributes
.filter(|kv| !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX))
.filter(|kv| {
!kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)
&& !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX)
})
.peekable();
if attributes.peek().is_none() {
return;
Expand Down
11 changes: 9 additions & 2 deletions apollo-router/src/plugins/telemetry/formatters/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing_subscriber::registry::SpanRef;

use super::get_trace_and_span_id;
use super::EventFormatter;
use super::APOLLO_CONNECTOR_PREFIX;
use super::APOLLO_PRIVATE_PREFIX;
use super::EXCLUDED_ATTRIBUTES;
use crate::plugins::telemetry::config::AttributeValue;
Expand Down Expand Up @@ -128,7 +129,9 @@ where
if let Some(otel_attributes) = otel_attributes {
for (key, value) in otel_attributes.iter().filter(|(key, _)| {
let key_name = key.as_str();
!key_name.starts_with(APOLLO_PRIVATE_PREFIX) && !self.1.contains(&key_name)
!key_name.starts_with(APOLLO_PRIVATE_PREFIX)
&& !key_name.starts_with(APOLLO_CONNECTOR_PREFIX)
&& !self.1.contains(&key_name)
}) {
serializer.serialize_entry(key.as_str(), &value.as_str())?;
}
Expand All @@ -147,7 +150,9 @@ where
};
for kv in custom_attributes.iter().filter(|kv| {
let key_name = kv.key.as_str();
!key_name.starts_with(APOLLO_PRIVATE_PREFIX) && !self.1.contains(&key_name)
!key_name.starts_with(APOLLO_PRIVATE_PREFIX)
&& !key_name.starts_with(APOLLO_CONNECTOR_PREFIX)
&& !self.1.contains(&key_name)
}) {
match &kv.value {
Value::Bool(value) => {
Expand Down Expand Up @@ -403,6 +408,7 @@ where
.filter(|(key, _)| {
let key_name = key.as_str();
!key_name.starts_with(APOLLO_PRIVATE_PREFIX)
&& !key_name.starts_with(APOLLO_CONNECTOR_PREFIX)
&& include_attributes.contains(key_name)
})
.map(|(key, val)| (key.clone(), val.clone())),
Expand All @@ -427,6 +433,7 @@ where
.filter(|kv| {
let key_name = kv.key.as_str();
!key_name.starts_with(APOLLO_PRIVATE_PREFIX)
&& !key_name.starts_with(APOLLO_CONNECTOR_PREFIX)
&& include_attributes.contains(key_name)
})
.map(|kv| (kv.key.clone(), kv.value.clone())),
Expand Down
3 changes: 3 additions & 0 deletions apollo-router/src/plugins/telemetry/formatters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use crate::metrics::layer::METRIC_PREFIX_VALUE;
use crate::plugins::telemetry::otel::OtelData;

pub(crate) const APOLLO_PRIVATE_PREFIX: &str = "apollo_private.";
// FIXME: this is a temporary solution to avoid exposing hardcoded attributes in connector spans instead of using the custom telemetry features.
// The reason this is introduced right now is to directly avoid people relying on these attributes and then creating a breaking change in the future.
pub(crate) const APOLLO_CONNECTOR_PREFIX: &str = "apollo.connector.";
// This list comes from Otel https://opentelemetry.io/docs/specs/semconv/attributes-registry/code/ and
pub(crate) const EXCLUDED_ATTRIBUTES: [&str; 5] = [
"code.filepath",
Expand Down
63 changes: 48 additions & 15 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments;
use crate::plugins::telemetry::config_new::instruments::SupergraphInstruments;
use crate::plugins::telemetry::config_new::trace_id;
use crate::plugins::telemetry::config_new::DatadogId;
use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME;
use crate::plugins::telemetry::consts::EXECUTION_SPAN_NAME;
use crate::plugins::telemetry::consts::OTEL_NAME;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
Expand Down Expand Up @@ -865,6 +866,7 @@ impl PluginPrivate for Telemetry {
service: crate::services::http::BoxService,
) -> crate::services::http::BoxService {
let req_fn_config = self.config.clone();
let res_fn_config = self.config.clone();
let static_connector_instruments = self.connector_custom_instruments.read().clone();
ServiceBuilder::new()
.map_future_with_request_data(
Expand All @@ -881,38 +883,69 @@ impl PluginPrivate for Telemetry {
let custom_events =
req_fn_config.instrumentation.events.new_connector_events();
custom_events.on_request(http_request);

let custom_span_attributes = req_fn_config
.instrumentation
.spans
.connector
.attributes
.on_request(http_request);
pubmodmatt marked this conversation as resolved.
Show resolved Hide resolved

(
http_request.context.clone(),
Some((custom_instruments, custom_events)),
Some((custom_instruments, custom_events, custom_span_attributes)),
)
} else {
(http_request.context.clone(), None)
}
},
move |(context, custom_telemetry): (
Context,
Option<(ConnectorInstruments, ConnectorEvents)>,
Option<(ConnectorInstruments, ConnectorEvents, Vec<KeyValue>)>,
),
f: BoxFuture<
'static,
Result<crate::services::http::HttpResponse, BoxError>,
>| {
let conf = res_fn_config.clone();
async move {
if let Some((custom_instruments, custom_events)) = custom_telemetry {
let result = f.await;
match &result {
Ok(http_response) => {
custom_instruments.on_response(http_response);
custom_events.on_response(http_response);
}
Err(err) => {
custom_instruments.on_error(err, &context);
custom_events.on_error(err, &context);
match custom_telemetry {
Some((custom_instruments, custom_events, custom_span_attributes)) => {
let span = Span::current();
bnjjj marked this conversation as resolved.
Show resolved Hide resolved
span.set_span_dyn_attributes_for_span_name(
CONNECT_SPAN_NAME,
custom_span_attributes,
);
let result = f.await;
match &result {
Ok(http_response) => {
span.set_span_dyn_attributes_for_span_name(
CONNECT_SPAN_NAME,
conf.instrumentation
.spans
.connector
.attributes
.on_response(http_response),
);
custom_instruments.on_response(http_response);
custom_events.on_response(http_response);
}
Err(err) => {
span.set_span_dyn_attributes_for_span_name(
CONNECT_SPAN_NAME,
conf.instrumentation
.spans
.connector
.attributes
.on_error(err, &context),
);
custom_instruments.on_error(err, &context);
custom_events.on_error(err, &context);
}
}
result
}
result
} else {
f.await
_ => f.await,
}
}
},
Expand Down
Loading