Skip to content

Commit

Permalink
Add service entity support for EMF
Browse files Browse the repository at this point in the history
  • Loading branch information
musa-asad committed Feb 6, 2025
1 parent 2d84c08 commit a80df3d
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 30 deletions.
8 changes: 0 additions & 8 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -19,8 +18,6 @@ import (
)

const (
AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal."

summaryCountSuffix = "_count"
summarySumSuffix = "_sum"
)
Expand Down Expand Up @@ -534,11 +531,6 @@ func (dps summaryDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
func createLabels(attributes pcommon.Map) map[string]string {
labels := make(map[string]string, attributes.Len()+1)
attributes.Range(func(k string, v pcommon.Value) bool {
// we don't want to export entity related attributes as dimensions, so we skip these
if strings.HasPrefix(k, AWSEntityPrefix) {
return true
}

labels[k] = v.AsString()
return true
})
Expand Down
1 change: 0 additions & 1 deletion exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,6 @@ func TestCreateLabels(t *testing.T) {
"a": "A",
"b": "B",
"c": "C",
"com.amazonaws.cloudwatch.entity.internal.A": "A",
}))

labels := createLabels(labelsMap)
Expand Down
64 changes: 64 additions & 0 deletions exporter/awsemfexporter/internal/entity/entityattributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package entity

const (
// Entity resource attributes in OTLP payload
AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal."
AttributeEntityType = AWSEntityPrefix + "type"
AttributeEntityServiceName = AWSEntityPrefix + "service.name"
AttributeEntityDeploymentEnvironment = AWSEntityPrefix + "deployment.environment"
AttributeEntityK8sClusterName = AWSEntityPrefix + "k8s.cluster.name"
AttributeEntityK8sNamespaceName = AWSEntityPrefix + "k8s.namespace.name"
AttributeEntityK8sWorkloadName = AWSEntityPrefix + "k8s.workload.name"
AttributeEntityK8sNodeName = AWSEntityPrefix + "k8s.node.name"
AttributeEntityServiceNameSource = AWSEntityPrefix + "service.name.source"
AttributeEntityPlatformType = AWSEntityPrefix + "platform.type"
AttributeEntityInstanceID = AWSEntityPrefix + "instance.id"

// Entity fields in EMF log
Name = "Name"
Environment = "Environment"
EntityType = "Entity.Type"
EksCluster = "EKS.Cluster"
K8sCluster = "K8s.Cluster"
K8sNamespace = "K8s.Namespace"
K8sWorkload = "K8s.Workload"
K8sNode = "K8s.Node"
AWSServiceNameSource = "AWS.ServiceNameSource"
PlatformType = "PlatformType"
InstanceID = "EC2.InstanceId"

// Possible values for PlatformType
AttributeEntityEKSPlatform = "AWS::EKS"
AttributeEntityK8sPlatform = "K8s"
)

// attributeEntityToFieldMap maps attribute entity resource attributes to entity fields
var attributeEntityToFieldMap = map[string]string{
AttributeEntityType: EntityType,
AttributeEntityServiceName: Name,
AttributeEntityDeploymentEnvironment: Environment,
AttributeEntityK8sNamespaceName: K8sNamespace,
AttributeEntityK8sWorkloadName: K8sWorkload,
AttributeEntityK8sNodeName: K8sNode,
AttributeEntityPlatformType: PlatformType,
AttributeEntityInstanceID: InstanceID,
AttributeEntityServiceNameSource: AWSServiceNameSource,
}

// GetEntityField returns entity field for provided attribute
func GetEntityField(attribute string, value ...string) string {
if attribute == AttributeEntityK8sClusterName && len(value) == 1 {
switch value[0] {
case AttributeEntityEKSPlatform:
return EksCluster
case AttributeEntityK8sPlatform:
return K8sCluster
}
}

if field, ok := attributeEntityToFieldMap[attribute]; ok {
return field
}

return ""
}
110 changes: 110 additions & 0 deletions exporter/awsemfexporter/internal/entity/entityattributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package entity

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetEntityField(t *testing.T) {
tests := []struct {
name string
attribute string
values []string
want string
}{
{
name: "AttributeEntityType from map",
attribute: AttributeEntityType,
values: nil,
want: EntityType,
},
{
name: "AttributeEntityServiceName from map",
attribute: AttributeEntityServiceName,
values: nil,
want: Name,
},
{
name: "AttributeEntityDeploymentEnvironment from map",
attribute: AttributeEntityDeploymentEnvironment,
values: nil,
want: Environment,
},
{
name: "AttributeEntityK8sNamespaceName from map",
attribute: AttributeEntityK8sNamespaceName,
values: nil,
want: K8sNamespace,
},
{
name: "AttributeEntityK8sWorkloadName from map",
attribute: AttributeEntityK8sWorkloadName,
values: nil,
want: K8sWorkload,
},
{
name: "AttributeEntityK8sNodeName from map",
attribute: AttributeEntityK8sNodeName,
values: nil,
want: K8sNode,
},
{
name: "AttributeEntityPlatformType from map",
attribute: AttributeEntityPlatformType,
values: nil,
want: PlatformType,
},
{
name: "AttributeEntityInstanceID from map",
attribute: AttributeEntityInstanceID,
values: nil,
want: InstanceID,
},
{
name: "AttributeEntityServiceNameSource from map",
attribute: AttributeEntityServiceNameSource,
values: nil,
want: AWSServiceNameSource,
},
{
name: "K8sClusterName with EKSPlatform",
attribute: AttributeEntityK8sClusterName,
values: []string{AttributeEntityEKSPlatform},
want: EksCluster,
},
{
name: "K8sClusterName with K8sPlatform",
attribute: AttributeEntityK8sClusterName,
values: []string{AttributeEntityK8sPlatform},
want: K8sCluster,
},
{
name: "K8sClusterName with unknown platform",
attribute: AttributeEntityK8sClusterName,
values: []string{"unknown"},
want: "",
},
{
name: "Unknown attribute",
attribute: "unknown",
values: nil,
want: "",
},
{
name: "K8sClusterName with no values provided",
attribute: AttributeEntityK8sClusterName,
values: nil,
want: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := GetEntityField(tc.attribute, tc.values...)
assert.Equalf(t, tc.want, got,
"GetEntityField(%q, %v) = %q; want %q",
tc.attribute, tc.values, got, tc.want)
})
}
}
48 changes: 39 additions & 9 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/entity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)
Expand Down Expand Up @@ -185,7 +186,21 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf

// Add labels to fields
for k, v := range labels {
fields[k] = v
if !strings.HasPrefix(k, entity.AWSEntityPrefix) {
fields[k] = v
continue
}

if k == entity.AttributeEntityK8sClusterName {
if entityField := entity.GetEntityField(k, labels[entity.AttributeEntityPlatformType]); entityField != "" {
fields[entityField] = v
}
continue
}

if entityField := entity.GetEntityField(k); entityField != "" {
fields[entityField] = v
}
}
// Add metrics to fields
for metricName, metricInfo := range groupedMetric.metrics {
Expand Down Expand Up @@ -219,23 +234,31 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf
// groupedMetricToCWMeasurement creates a single CW Measurement from a grouped metric.
func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config) cWMeasurement {
labels := groupedMetric.labels

filteredLabels := make(map[string]string)
for k, v := range labels {
if !strings.HasPrefix(k, entity.AWSEntityPrefix) {
filteredLabels[k] = v
}
}

dimensionRollupOption := config.DimensionRollupOption

// Create a dimension set containing list of label names
dimSet := make([]string, len(labels))
dimSet := make([]string, len(filteredLabels))
idx := 0
for labelName := range labels {
for labelName := range filteredLabels {
dimSet[idx] = labelName
idx++
}
dimensions := [][]string{dimSet}

// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(dimensionRollupOption, labels)
rollupDimensionArray := dimensionRollup(dimensionRollupOption, filteredLabels)

if len(rollupDimensionArray) > 0 {
// Perform duplication check for edge case with a single label and single dimension roll-up
_, hasOTelLibKey := labels[oTellibDimensionKey]
_, hasOTelLibKey := filteredLabels[oTellibDimensionKey]
isSingleLabel := len(dimSet) <= 1 || (len(dimSet) == 2 && hasOTelLibKey)
singleDimRollup := dimensionRollupOption == singleDimensionRollupOnly ||
dimensionRollupOption == zeroAndSingleDimensionRollup
Expand Down Expand Up @@ -272,17 +295,24 @@ func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config)
func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, config *Config) (cWMeasurements []cWMeasurement) {
labels := groupedMetric.labels

filteredLabels := make(map[string]string)
for k, v := range labels {
if !strings.HasPrefix(k, entity.AWSEntityPrefix) {
filteredLabels[k] = v
}
}

// Filter metric declarations by labels
metricDeclarations := make([]*MetricDeclaration, 0, len(config.MetricDeclarations))
for _, metricDeclaration := range config.MetricDeclarations {
if metricDeclaration.MatchesLabels(labels) {
if metricDeclaration.MatchesLabels(filteredLabels) {
metricDeclarations = append(metricDeclarations, metricDeclaration)
}
}

// If the whole batch of metrics don't match any metric declarations, drop them
if len(metricDeclarations) == 0 {
labelsStr, _ := json.Marshal(labels)
labelsStr, _ := json.Marshal(filteredLabels)
var metricNames []string
for metricName := range groupedMetric.metrics {
metricNames = append(metricNames, metricName)
Expand Down Expand Up @@ -341,15 +371,15 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf
}

// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(config.DimensionRollupOption, labels)
rollupDimensionArray := dimensionRollup(config.DimensionRollupOption, filteredLabels)

// Translate each group into a CW Measurement
cWMeasurements = make([]cWMeasurement, 0, len(metricDeclGroups))
for _, group := range metricDeclGroups {
var dimensions [][]string
// Extract dimensions from matched metric declarations
for _, metricDeclIdx := range group.metricDeclIdxList {
dims := metricDeclarations[metricDeclIdx].ExtractDimensions(labels)
dims := metricDeclarations[metricDeclIdx].ExtractDimensions(filteredLabels)
dimensions = append(dimensions, dims...)
}
dimensions = append(dimensions, rollupDimensionArray...)
Expand Down
Loading

0 comments on commit a80df3d

Please sign in to comment.