Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HTTP metrics for Connectors #6067

Merged
merged 12 commits into from
Oct 15, 2024

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions apollo-router/src/plugins/telemetry/config_new/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ const NETWORK_LOCAL_PORT: Key = Key::from_static_str("network.local.port");
const NETWORK_PEER_ADDRESS: Key = Key::from_static_str("network.peer.address");
const NETWORK_PEER_PORT: Key = Key::from_static_str("network.peer.port");

pub(super) const HTTP_REQUEST_HEADERS: Key = Key::from_static_str("http.request.headers");
pub(super) const HTTP_REQUEST_URI: Key = Key::from_static_str("http.request.uri");
pub(super) const HTTP_REQUEST_VERSION: Key = Key::from_static_str("http.request.version");
pub(super) const HTTP_REQUEST_BODY: Key = Key::from_static_str("http.request.body");

pub(super) const HTTP_RESPONSE_HEADERS: Key = Key::from_static_str("http.response.headers");
pub(super) const HTTP_RESPONSE_STATUS: Key = Key::from_static_str("http.response.status");
pub(super) const HTTP_RESPONSE_VERSION: Key = Key::from_static_str("http.response.version");
pub(super) const HTTP_RESPONSE_BODY: Key = Key::from_static_str("http.response.body");

#[derive(Deserialize, JsonSchema, Clone, Debug, Default, Copy)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum DefaultAttributeRequirementLevel {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use opentelemetry_api::Key;
use opentelemetry_api::KeyValue;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;

use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel;
use crate::plugins::telemetry::config_new::attributes::StandardAttribute;
use crate::plugins::telemetry::config_new::attributes::SUBGRAPH_NAME;
use crate::plugins::telemetry::config_new::connector::ConnectorRequest;
use crate::plugins::telemetry::config_new::connector::ConnectorResponse;
use crate::plugins::telemetry::config_new::DefaultForLevel;
use crate::plugins::telemetry::config_new::Selectors;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::services::connector_service::ConnectorInfo;
use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY;
use crate::Context;

const CONNECTOR_HTTP_METHOD: Key = Key::from_static_str("connector.http.method");
const CONNECTOR_SOURCE_NAME: Key = Key::from_static_str("connector.source.name");
const CONNECTOR_URL_TEMPLATE: Key = Key::from_static_str("connector.url.template");

#[derive(Deserialize, JsonSchema, Clone, Default, Debug)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ConnectorAttributes {
/// The name of the subgraph containing the connector
/// Examples:
///
/// * posts
///
/// Requirement level: Required
#[serde(rename = "subgraph.name")]
subgraph_name: Option<StandardAttribute>,

/// The name of the source for this connector, if defined
/// Examples:
///
/// * posts_api
///
/// Requirement level: Conditionally Required: If the connector has a source defined
#[serde(rename = "connector.source.name")]
connector_source_name: Option<StandardAttribute>,

/// The HTTP method for the connector
/// Examples:
///
/// * GET
/// * POST
///
/// Requirement level: Required
#[serde(rename = "connector.http.method")]
connector_http_method: Option<StandardAttribute>,

/// The connector URL template, relative to the source base URL if one is defined
/// Examples:
///
/// * /users/{$this.id!}/post
///
/// Requirement level: Required
#[serde(rename = "connector.url.template")]
connector_url_template: Option<StandardAttribute>,
}

impl DefaultForLevel for ConnectorAttributes {
fn defaults_for_level(
&mut self,
requirement_level: DefaultAttributeRequirementLevel,
_kind: TelemetryDataKind,
) {
match requirement_level {
DefaultAttributeRequirementLevel::Required => {
if self.subgraph_name.is_none() {
self.subgraph_name = Some(StandardAttribute::Bool(true));
}
}
DefaultAttributeRequirementLevel::Recommended => {
if self.subgraph_name.is_none() {
self.subgraph_name = Some(StandardAttribute::Bool(true));
}
if self.connector_source_name.is_none() {
self.connector_source_name = Some(StandardAttribute::Bool(true));
}
if self.connector_http_method.is_none() {
self.connector_http_method = Some(StandardAttribute::Bool(true));
}
if self.connector_url_template.is_none() {
self.connector_url_template = Some(StandardAttribute::Bool(true));
}
}
DefaultAttributeRequirementLevel::None => {}
}
}
}

impl Selectors for ConnectorAttributes {
type Request = ConnectorRequest;
type Response = ConnectorResponse;
type EventResponse = ();

fn on_request(&self, request: &Self::Request) -> Vec<KeyValue> {
let mut attrs = Vec::new();

if let Ok(Some(connector_info)) = request
.context
.get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY)
{
if let Some(key) = self
.subgraph_name
.as_ref()
.and_then(|a| a.key(SUBGRAPH_NAME))
{
attrs.push(KeyValue::new(key, connector_info.subgraph_name.to_string()));
}
if let Some(key) = self
.connector_source_name
.as_ref()
.and_then(|a| a.key(CONNECTOR_SOURCE_NAME))
{
if let Some(source_name) = connector_info.source_name {
attrs.push(KeyValue::new(key, source_name.to_string()));
}
}
if let Some(key) = self
.connector_http_method
.as_ref()
.and_then(|a| a.key(CONNECTOR_HTTP_METHOD))
{
attrs.push(KeyValue::new(key, connector_info.http_method));
}
if let Some(key) = self
.connector_url_template
.as_ref()
.and_then(|a| a.key(CONNECTOR_URL_TEMPLATE))
{
attrs.push(KeyValue::new(key, connector_info.url_template.to_string()));
}
}
attrs
}

fn on_response(&self, _response: &Self::Response) -> Vec<KeyValue> {
Vec::default()
}

fn on_error(&self, _error: &BoxError, _ctx: &Context) -> Vec<KeyValue> {
Vec::default()
}
}
211 changes: 211 additions & 0 deletions apollo-router/src/plugins/telemetry/config_new/connector/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use opentelemetry_api::Key;
use opentelemetry_api::KeyValue;
use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD;
use parking_lot::Mutex;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;

use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_BODY;
use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS;
use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI;
use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_VERSION;
use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes;
use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector;
use crate::plugins::telemetry::config_new::connector::ConnectorRequest;
use crate::plugins::telemetry::config_new::connector::ConnectorResponse;
use crate::plugins::telemetry::config_new::events::log_event;
use crate::plugins::telemetry::config_new::events::CustomEvent;
use crate::plugins::telemetry::config_new::events::CustomEventInner;
use crate::plugins::telemetry::config_new::events::CustomEvents;
use crate::plugins::telemetry::config_new::events::Event;
use crate::plugins::telemetry::config_new::events::EventLevel;
use crate::plugins::telemetry::config_new::events::StandardEvent;
use crate::plugins::telemetry::config_new::events::StandardEventConfig;
use crate::plugins::telemetry::config_new::extendable::Extendable;
use crate::plugins::telemetry::config_new::instruments::Instrumented;
use crate::Context;

#[derive(Clone, Deserialize, JsonSchema, Debug, Default)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ConnectorEventsConfig {
/// Log the connector HTTP request
pub(crate) request: StandardEventConfig<ConnectorSelector>,
/// Log the connector HTTP response
pub(crate) response: StandardEventConfig<ConnectorSelector>,
/// Log the connector HTTP error
pub(crate) error: StandardEventConfig<ConnectorSelector>,
}

#[derive(Clone)]
pub(crate) struct ConnectorEventRequest(pub(crate) StandardEvent<ConnectorSelector>);

#[derive(Clone)]
pub(crate) struct ConnectorEventResponse(pub(crate) StandardEvent<ConnectorSelector>);

pub(crate) type ConnectorEvents =
CustomEvents<ConnectorRequest, ConnectorResponse, ConnectorAttributes, ConnectorSelector>;

pub(crate) fn new_connector_events(
config: &Extendable<ConnectorEventsConfig, Event<ConnectorAttributes, ConnectorSelector>>,
) -> ConnectorEvents {
let custom_events = config
.custom
.iter()
.filter_map(|(event_name, event_cfg)| match &event_cfg.level {
EventLevel::Off => None,
_ => Some(CustomEvent {
inner: Mutex::new(CustomEventInner {
name: event_name.clone(),
level: event_cfg.level,
event_on: event_cfg.on,
message: event_cfg.message.clone(),
selectors: event_cfg.attributes.clone().into(),
condition: event_cfg.condition.clone(),
attributes: Vec::new(),
}),
}),
})
.collect();

ConnectorEvents {
request: config.attributes.request.clone().into(),
response: config.attributes.response.clone().into(),
error: config.attributes.error.clone().into(),
custom: custom_events,
}
}

impl Instrumented
for CustomEvents<ConnectorRequest, ConnectorResponse, ConnectorAttributes, ConnectorSelector>
{
type Request = ConnectorRequest;
type Response = ConnectorResponse;
type EventResponse = ();

fn on_request(&self, request: &Self::Request) {
if self.request.level() != EventLevel::Off {
if let Some(condition) = self.request.condition() {
if condition.lock().evaluate_request(request) != Some(true) {
return;
}
}
let mut attrs = Vec::with_capacity(5);
#[cfg(test)]
let headers = {
let mut headers: indexmap::IndexMap<String, http::HeaderValue> = request
.http_request
.headers()
.clone()
.into_iter()
.filter_map(|(name, val)| Some((name?.to_string(), val)))
.collect();
headers.sort_keys();
headers
};
#[cfg(not(test))]
let headers = request.http_request.headers();

attrs.push(KeyValue::new(
HTTP_REQUEST_HEADERS,
opentelemetry::Value::String(format!("{:?}", headers).into()),
));
attrs.push(KeyValue::new(
HTTP_REQUEST_METHOD,
opentelemetry::Value::String(format!("{}", request.http_request.method()).into()),
));
attrs.push(KeyValue::new(
HTTP_REQUEST_URI,
opentelemetry::Value::String(format!("{}", request.http_request.uri()).into()),
));
attrs.push(KeyValue::new(
HTTP_REQUEST_VERSION,
opentelemetry::Value::String(
format!("{:?}", request.http_request.version()).into(),
),
));
attrs.push(KeyValue::new(
HTTP_REQUEST_BODY,
opentelemetry::Value::String(format!("{:?}", request.http_request.body()).into()),
));
log_event(self.request.level(), "connector.request", attrs, "");
}
for custom_event in &self.custom {
custom_event.on_request(request);
}
}

fn on_response(&self, response: &Self::Response) {
if self.response.level() != EventLevel::Off {
if let Some(condition) = self.response.condition() {
if !condition.lock().evaluate_response(response) {
return;
}
}
let mut attrs = Vec::with_capacity(4);
#[cfg(test)]
let headers = {
let mut headers: indexmap::IndexMap<String, http::HeaderValue> = response
.http_response
.headers()
.clone()
.into_iter()
.filter_map(|(name, val)| Some((name?.to_string(), val)))
.collect();
headers.sort_keys();
headers
};
#[cfg(not(test))]
let headers = response.http_response.headers();

attrs.push(KeyValue::new(
HTTP_RESPONSE_HEADERS,
opentelemetry::Value::String(format!("{:?}", headers).into()),
));
attrs.push(KeyValue::new(
HTTP_RESPONSE_STATUS,
opentelemetry::Value::String(format!("{}", response.http_response.status()).into()),
));
attrs.push(KeyValue::new(
HTTP_RESPONSE_VERSION,
opentelemetry::Value::String(
format!("{:?}", response.http_response.version()).into(),
),
));
attrs.push(KeyValue::new(
HTTP_RESPONSE_BODY,
opentelemetry::Value::String(format!("{:?}", response.http_response.body()).into()),
));
log_event(self.response.level(), "connector.response", attrs, "");
}
for custom_event in &self.custom {
custom_event.on_response(response);
}
}

fn on_error(&self, error: &BoxError, ctx: &Context) {
if self.error.level() != EventLevel::Off {
if let Some(condition) = self.error.condition() {
if !condition.lock().evaluate_error(error, ctx) {
return;
}
}
log_event(
self.error.level(),
"connector.http.error",
vec![KeyValue::new(
Key::from_static_str("error"),
opentelemetry::Value::String(error.to_string().into()),
)],
"",
);
}
for custom_event in &self.custom {
custom_event.on_error(error, ctx);
}
}
}
Loading