diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index d0e1251b547c..8ba53e17480a 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "strconv" - "strings" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -19,8 +18,6 @@ import ( ) const ( - AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal." - summaryCountSuffix = "_count" summarySumSuffix = "_sum" ) @@ -534,11 +531,6 @@ func (dps summaryDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) { func createLabels(attributes pcommon.Map) map[string]string { labels := make(map[string]string, attributes.Len()+1) attributes.Range(func(k string, v pcommon.Value) bool { - // we don't want to export entity related attributes as dimensions, so we skip these - if strings.HasPrefix(k, AWSEntityPrefix) { - return true - } - labels[k] = v.AsString() return true }) diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index 46f489ce7b4a..e71602b0916f 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -1940,7 +1940,6 @@ func TestCreateLabels(t *testing.T) { "a": "A", "b": "B", "c": "C", - "com.amazonaws.cloudwatch.entity.internal.A": "A", })) labels := createLabels(labelsMap) diff --git a/exporter/awsemfexporter/internal/entity/entityattributes.go b/exporter/awsemfexporter/internal/entity/entityattributes.go new file mode 100644 index 000000000000..f9735606b13a --- /dev/null +++ b/exporter/awsemfexporter/internal/entity/entityattributes.go @@ -0,0 +1,64 @@ +package entity + +const ( + // Entity resource attributes in OTLP payload + AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal." + AttributeEntityType = AWSEntityPrefix + "type" + AttributeEntityServiceName = AWSEntityPrefix + "service.name" + AttributeEntityDeploymentEnvironment = AWSEntityPrefix + "deployment.environment" + AttributeEntityK8sClusterName = AWSEntityPrefix + "k8s.cluster.name" + AttributeEntityK8sNamespaceName = AWSEntityPrefix + "k8s.namespace.name" + AttributeEntityK8sWorkloadName = AWSEntityPrefix + "k8s.workload.name" + AttributeEntityK8sNodeName = AWSEntityPrefix + "k8s.node.name" + AttributeEntityServiceNameSource = AWSEntityPrefix + "service.name.source" + AttributeEntityPlatformType = AWSEntityPrefix + "platform.type" + AttributeEntityInstanceID = AWSEntityPrefix + "instance.id" + + // Entity fields in EMF log + Name = "Name" + Environment = "Environment" + EntityType = "Entity.Type" + EksCluster = "EKS.Cluster" + K8sCluster = "K8s.Cluster" + K8sNamespace = "K8s.Namespace" + K8sWorkload = "K8s.Workload" + K8sNode = "K8s.Node" + AWSServiceNameSource = "AWS.ServiceNameSource" + PlatformType = "PlatformType" + InstanceID = "EC2.InstanceId" + + // Possible values for PlatformType + AttributeEntityEKSPlatform = "AWS::EKS" + AttributeEntityK8sPlatform = "K8s" +) + +// attributeEntityToFieldMap maps attribute entity resource attributes to entity fields +var attributeEntityToFieldMap = map[string]string{ + AttributeEntityType: EntityType, + AttributeEntityServiceName: Name, + AttributeEntityDeploymentEnvironment: Environment, + AttributeEntityK8sNamespaceName: K8sNamespace, + AttributeEntityK8sWorkloadName: K8sWorkload, + AttributeEntityK8sNodeName: K8sNode, + AttributeEntityPlatformType: PlatformType, + AttributeEntityInstanceID: InstanceID, + AttributeEntityServiceNameSource: AWSServiceNameSource, +} + +// GetEntityField returns entity field for provided attribute +func GetEntityField(attribute string, value ...string) string { + if attribute == AttributeEntityK8sClusterName && len(value) == 1 { + switch value[0] { + case AttributeEntityEKSPlatform: + return EksCluster + case AttributeEntityK8sPlatform: + return K8sCluster + } + } + + if field, ok := attributeEntityToFieldMap[attribute]; ok { + return field + } + + return "" +} diff --git a/exporter/awsemfexporter/internal/entity/entityattributes_test.go b/exporter/awsemfexporter/internal/entity/entityattributes_test.go new file mode 100644 index 000000000000..0c60b605c979 --- /dev/null +++ b/exporter/awsemfexporter/internal/entity/entityattributes_test.go @@ -0,0 +1,110 @@ +package entity + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetEntityField(t *testing.T) { + tests := []struct { + name string + attribute string + values []string + want string + }{ + { + name: "AttributeEntityType from map", + attribute: AttributeEntityType, + values: nil, + want: EntityType, + }, + { + name: "AttributeEntityServiceName from map", + attribute: AttributeEntityServiceName, + values: nil, + want: Name, + }, + { + name: "AttributeEntityDeploymentEnvironment from map", + attribute: AttributeEntityDeploymentEnvironment, + values: nil, + want: Environment, + }, + { + name: "AttributeEntityK8sNamespaceName from map", + attribute: AttributeEntityK8sNamespaceName, + values: nil, + want: K8sNamespace, + }, + { + name: "AttributeEntityK8sWorkloadName from map", + attribute: AttributeEntityK8sWorkloadName, + values: nil, + want: K8sWorkload, + }, + { + name: "AttributeEntityK8sNodeName from map", + attribute: AttributeEntityK8sNodeName, + values: nil, + want: K8sNode, + }, + { + name: "AttributeEntityPlatformType from map", + attribute: AttributeEntityPlatformType, + values: nil, + want: PlatformType, + }, + { + name: "AttributeEntityInstanceID from map", + attribute: AttributeEntityInstanceID, + values: nil, + want: InstanceID, + }, + { + name: "AttributeEntityServiceNameSource from map", + attribute: AttributeEntityServiceNameSource, + values: nil, + want: AWSServiceNameSource, + }, + { + name: "K8sClusterName with EKSPlatform", + attribute: AttributeEntityK8sClusterName, + values: []string{AttributeEntityEKSPlatform}, + want: EksCluster, + }, + { + name: "K8sClusterName with K8sPlatform", + attribute: AttributeEntityK8sClusterName, + values: []string{AttributeEntityK8sPlatform}, + want: K8sCluster, + }, + { + name: "K8sClusterName with unknown platform", + attribute: AttributeEntityK8sClusterName, + values: []string{"unknown"}, + want: "", + }, + { + name: "Unknown attribute", + attribute: "unknown", + values: nil, + want: "", + }, + { + name: "K8sClusterName with no values provided", + attribute: AttributeEntityK8sClusterName, + values: nil, + want: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := GetEntityField(tc.attribute, tc.values...) + assert.Equalf(t, tc.want, got, + "GetEntityField(%q, %v) = %q; want %q", + tc.attribute, tc.values, got, tc.want) + }) + } +} diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index bb12c10a5926..f8d00e25bd50 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -15,6 +15,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/entity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" ) @@ -185,7 +186,21 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf // Add labels to fields for k, v := range labels { - fields[k] = v + if !strings.HasPrefix(k, entity.AWSEntityPrefix) { + fields[k] = v + continue + } + + if k == entity.AttributeEntityK8sClusterName { + if entityField := entity.GetEntityField(k, labels[entity.AttributeEntityPlatformType]); entityField != "" { + fields[entityField] = v + } + continue + } + + if entityField := entity.GetEntityField(k); entityField != "" { + fields[entityField] = v + } } // Add metrics to fields for metricName, metricInfo := range groupedMetric.metrics { @@ -219,23 +234,31 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf // groupedMetricToCWMeasurement creates a single CW Measurement from a grouped metric. func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config) cWMeasurement { labels := groupedMetric.labels + + filteredLabels := make(map[string]string) + for k, v := range labels { + if !strings.HasPrefix(k, entity.AWSEntityPrefix) { + filteredLabels[k] = v + } + } + dimensionRollupOption := config.DimensionRollupOption // Create a dimension set containing list of label names - dimSet := make([]string, len(labels)) + dimSet := make([]string, len(filteredLabels)) idx := 0 - for labelName := range labels { + for labelName := range filteredLabels { dimSet[idx] = labelName idx++ } dimensions := [][]string{dimSet} // Apply single/zero dimension rollup to labels - rollupDimensionArray := dimensionRollup(dimensionRollupOption, labels) + rollupDimensionArray := dimensionRollup(dimensionRollupOption, filteredLabels) if len(rollupDimensionArray) > 0 { // Perform duplication check for edge case with a single label and single dimension roll-up - _, hasOTelLibKey := labels[oTellibDimensionKey] + _, hasOTelLibKey := filteredLabels[oTellibDimensionKey] isSingleLabel := len(dimSet) <= 1 || (len(dimSet) == 2 && hasOTelLibKey) singleDimRollup := dimensionRollupOption == singleDimensionRollupOnly || dimensionRollupOption == zeroAndSingleDimensionRollup @@ -272,17 +295,24 @@ func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config) func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, config *Config) (cWMeasurements []cWMeasurement) { labels := groupedMetric.labels + filteredLabels := make(map[string]string) + for k, v := range labels { + if !strings.HasPrefix(k, entity.AWSEntityPrefix) { + filteredLabels[k] = v + } + } + // Filter metric declarations by labels metricDeclarations := make([]*MetricDeclaration, 0, len(config.MetricDeclarations)) for _, metricDeclaration := range config.MetricDeclarations { - if metricDeclaration.MatchesLabels(labels) { + if metricDeclaration.MatchesLabels(filteredLabels) { metricDeclarations = append(metricDeclarations, metricDeclaration) } } // If the whole batch of metrics don't match any metric declarations, drop them if len(metricDeclarations) == 0 { - labelsStr, _ := json.Marshal(labels) + labelsStr, _ := json.Marshal(filteredLabels) var metricNames []string for metricName := range groupedMetric.metrics { metricNames = append(metricNames, metricName) @@ -341,7 +371,7 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf } // Apply single/zero dimension rollup to labels - rollupDimensionArray := dimensionRollup(config.DimensionRollupOption, labels) + rollupDimensionArray := dimensionRollup(config.DimensionRollupOption, filteredLabels) // Translate each group into a CW Measurement cWMeasurements = make([]cWMeasurement, 0, len(metricDeclGroups)) @@ -349,7 +379,7 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf var dimensions [][]string // Extract dimensions from matched metric declarations for _, metricDeclIdx := range group.metricDeclIdxList { - dims := metricDeclarations[metricDeclIdx].ExtractDimensions(labels) + dims := metricDeclarations[metricDeclIdx].ExtractDimensions(filteredLabels) dimensions = append(dimensions, dims...) } dimensions = append(dimensions, rollupDimensionArray...) diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index f8479fbe3f98..7078ae1ad58d 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/entity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/occonventions" ) @@ -1552,22 +1553,22 @@ func TestGroupedMetricToCWMeasurementsWithFilters(t *testing.T) { MetricNameSelectors: []string{"metric(1|3)"}, }, }, []cWMeasurement{ - { - Namespace: namespace, - Dimensions: [][]string{{}}, - Metrics: []map[string]string{ - { - "Name": "metric1", - "Unit": "Count", - }, - { - "Name": "metric3", - "Unit": "Seconds", - }, + { + Namespace: namespace, + Dimensions: [][]string{{}}, + Metrics: []map[string]string{ + { + "Name": "metric1", + "Unit": "Count", + }, + { + "Name": "metric3", + "Unit": "Seconds", }, }, }, }, + }, { "label matchers", []*MetricDeclaration{ @@ -2589,6 +2590,127 @@ func TestTranslateOtToGroupedMetricForInitialDeltaValue(t *testing.T) { } } +func TestEntityAttributesToFields(t *testing.T) { + timestamp := int64(1596151098037) + namespace := "TestNamespace" + + labels := map[string]string{ + "normal_label": "normal_value", + entity.AttributeEntityPlatformType: entity.AttributeEntityEKSPlatform, + entity.AttributeEntityK8sClusterName: "myEksCluster", + entity.AttributeEntityK8sNamespaceName: "myNamespace", + entity.AttributeEntityInstanceID: "i-0123456789", + "com.amazonaws.cloudwatch.entity.internal.unknown_field": "should_not_appear", + } + + metrics := map[string]*metricInfo{ + "metric1": {value: 1, unit: "Count"}, + } + + groupedMetric := &groupedMetric{ + labels: labels, + metrics: metrics, + metadata: cWMetricMetadata{ + groupedMetricMetadata: groupedMetricMetadata{ + namespace: namespace, + timestampMs: timestamp, + }, + }, + } + + config := &Config{ + DimensionRollupOption: "", + logger: zap.NewNop(), + } + + cw := translateGroupedMetricToCWMetric(groupedMetric, config) + + require.Len(t, cw.measurements, 1) + dims := cw.measurements[0].Dimensions + + assert.Equal(t, [][]string{{"normal_label"}}, dims) + + assert.Equal(t, timestamp, cw.timestampMs) + assert.Equal(t, namespace, cw.measurements[0].Namespace) + + expectedFields := map[string]any{ + "normal_label": "normal_value", + "metric1": 1, + "EKS.Cluster": "myEksCluster", + "K8s.Namespace": "myNamespace", + "PlatformType": entity.AttributeEntityEKSPlatform, + "EC2.InstanceId": "i-0123456789", + } + assert.Equal(t, expectedFields, cw.fields) + + expectedMeasurement := []cWMeasurement{{ + Namespace: namespace, + Dimensions: [][]string{{"normal_label"}}, + Metrics: []map[string]string{{ + "Name": "metric1", + "Unit": "Count", + }}, + }} + assertCWMeasurementSliceEqual(t, expectedMeasurement, cw.measurements) +} + +func TestEntityK8sClusterWithMissingPlatformType(t *testing.T) { + labels := map[string]string{ + entity.AttributeEntityK8sClusterName: "myEksCluster", + } + + groupedMetric := &groupedMetric{ + labels: labels, + metrics: map[string]*metricInfo{ + "metric1": {value: 1, unit: "Count"}, + }, + metadata: cWMetricMetadata{ + groupedMetricMetadata: groupedMetricMetadata{ + namespace: "NoPlatformType", + timestampMs: 10000, + }, + }, + } + config := &Config{ + logger: zap.NewNop(), + } + cw := translateGroupedMetricToCWMetric(groupedMetric, config) + + require.Len(t, cw.measurements, 1) + assert.Empty(t, cw.measurements[0].Dimensions[0], "should have no dimension since no normal labels") + + assert.Empty(t, cw.fields["K8s.Cluster"]) + assert.Empty(t, cw.fields["EKS.Cluster"]) + + expectedFields := map[string]any{ + "metric1": 1, + } + assert.Equal(t, expectedFields, cw.fields) +} + +func TestUnknownEntityAttributeIsDropped(t *testing.T) { + labels := map[string]string{ + entity.AWSEntityPrefix + "unknown_field": "some_value", + } + + groupedMetric := &groupedMetric{ + labels: labels, + metrics: map[string]*metricInfo{ + "metric1": {value: 1, unit: "Count"}, + }, + metadata: cWMetricMetadata{ + groupedMetricMetadata: groupedMetricMetadata{ + namespace: "UnknownEntityAttribute", + timestampMs: 12345, + }, + }, + } + cw := translateGroupedMetricToCWMetric(groupedMetric, &Config{}) + + assert.NotContains(t, cw.fields, "unknown_field") + assert.Equal(t, 1, cw.fields["metric1"]) +} + func generateTestMetrics(tm testMetric) pmetric.Metrics { md := pmetric.NewMetrics() now := time.Now()