diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go index 60a910f90d45..39a45b1ef4b9 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go @@ -26,8 +26,12 @@ func TestCPUStats(t *testing.T) { extractor := NewCPUMetricExtractor(&zap.Logger{}) var cMetrics []*cExtractor.CAdvisorMetric - cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType) - cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType) + if extractor.HasValue(podRawMetric) { + cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType) + } + if extractor.HasValue(podRawMetric2) { + cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType) + } cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_usage_total", 3.125000, 0) cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_utilization", 0.156250, 0) @@ -39,8 +43,12 @@ func TestCPUStats(t *testing.T) { nodeRawMetric := ConvertNodeToRaw(&result.Node) nodeRawMetric2 := ConvertNodeToRaw(&result2.Node) - cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType) - cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType) + if extractor.HasValue(nodeRawMetric) { + cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType) + } + if extractor.HasValue(nodeRawMetric2) { + cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType) + } cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_usage_total", 51.5, 0.5) cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_utilization", 2.5, 0.5) diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor.go new file mode 100644 index 000000000000..04ee20ab298c --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + +import ( + "time" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + + "go.uber.org/zap" +) + +type MemMetricExtractor struct { + logger *zap.Logger + rateCalculator awsmetrics.MetricCalculator +} + +func (m *MemMetricExtractor) HasValue(rawMetric *RawMetric) bool { + if rawMetric.MemoryStats != nil { + return true + } + return false +} + +func (m *MemMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric { + var metrics []*cExtractor.CAdvisorMetric + + metric := cExtractor.NewCadvisorMetric(containerType, m.logger) + identifier := rawMetric.Id + + metric.AddField(ci.MetricName(containerType, ci.MemUsage), *rawMetric.MemoryStats.UsageBytes) + metric.AddField(ci.MetricName(containerType, ci.MemRss), *rawMetric.MemoryStats.RSSBytes) + metric.AddField(ci.MetricName(containerType, ci.MemWorkingset), *rawMetric.MemoryStats.WorkingSetBytes) + + multiplier := float64(time.Second) + cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgfault), identifier, + float64(*rawMetric.MemoryStats.PageFaults), rawMetric.Time, multiplier) + cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgmajfault), identifier, + float64(*rawMetric.MemoryStats.MajorPageFaults), rawMetric.Time, multiplier) + + memoryCapacity := mInfo.GetMemoryCapacity() + if metric.GetField(ci.MetricName(containerType, ci.MemWorkingset)) != nil && memoryCapacity != 0 { + metric.AddField(ci.MetricName(containerType, ci.MemUtilization), float64(metric.GetField(ci.MetricName(containerType, ci.MemWorkingset)).(uint64))/float64(memoryCapacity)*100) + } + + if containerType == ci.TypeNode { + metric.AddField(ci.MetricName(containerType, ci.MemLimit), memoryCapacity) + } + + metrics = append(metrics, metric) + return metrics +} + +func (m *MemMetricExtractor) Shutdown() error { + return m.rateCalculator.Shutdown() +} + +func NewMemMetricExtractor(logger *zap.Logger) *MemMetricExtractor { + return &MemMetricExtractor{ + logger: logger, + rateCalculator: cExtractor.NewFloat64RateCalculator(), + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor_test.go new file mode 100644 index 000000000000..8b8acff855e7 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/mem_extractor_test.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + cTestUtils "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/require" +) + +func TestMemStats(t *testing.T) { + MockCPUMemInfo := cTestUtils.MockCPUMemInfo{} + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json") + + podRawMetric := ConvertPodToRaw(&result.Pods[0]) + podRawMetric2 := ConvertPodToRaw(&result2.Pods[0]) + + containerType := containerinsight.TypePod + extractor := NewMemMetricExtractor(nil) + + var cMetrics []*cExtractor.CAdvisorMetric + if extractor.HasValue(podRawMetric) { + cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType) + } + if extractor.HasValue(podRawMetric2) { + cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType) + } + + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_rss", 0) + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_usage", 0) + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_working_set", 209088512) + + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_memory_pgfault", 0, 0) + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_memory_pgmajfault", 0, 0) + require.NoError(t, extractor.Shutdown()) + + // for node type + containerType = containerinsight.TypeNode + extractor = NewMemMetricExtractor(nil) + + nodeRawMetric := ConvertNodeToRaw(&result.Node) + nodeRawMetric2 := ConvertNodeToRaw(&result2.Node) + + if extractor.HasValue(nodeRawMetric) { + cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType) + } + + if extractor.HasValue(nodeRawMetric2) { + cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType) + } + + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_rss", 0) + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_usage", 3572293632) + cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_working_set", 1026678784) + cExtractor.AssertContainsTaggedInt(t, cMetrics[0], "node_memory_limit", 1073741824) + + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_pgfault", 0, 0) + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_pgmajfault", 0, 0) + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_utilization", 95, 0.7) + + require.NoError(t, extractor.Shutdown()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index c4622c226098..8306937ceabd 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -12,8 +12,9 @@ import ( "os" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "go.opentelemetry.io/collector/pdata/pmetric" @@ -29,6 +30,8 @@ type K8sWindows struct { hostInfo host.Info } +var metricsExtractors = []extractors.MetricExtractor{} + func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) { nodeName := os.Getenv("HOST_NAME") if nodeName == "" { @@ -39,6 +42,10 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err)) return nil, err } + + metricsExtractors = []extractors.MetricExtractor{} + metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger)) + metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger)) return &K8sWindows{ logger: logger, nodeName: nodeName, @@ -66,9 +73,9 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics { return result } -func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric { +func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric { //ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes() - var result []*extractors.CAdvisorMetric + var result []*cExtractor.CAdvisorMetric for _, m := range cadvisormetrics { tags := m.GetTags() //c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV) @@ -109,3 +116,7 @@ func (k *K8sWindows) Shutdown() error { k.logger.Debug("D! called K8sWindows Shutdown") return nil } + +func GetMetricsExtractors() []extractors.MetricExtractor { + return metricsExtractors +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go index 685360ec00ba..8ebd12cf3b18 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go @@ -14,8 +14,8 @@ import ( ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" - "go.uber.org/zap" stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) @@ -34,6 +34,7 @@ func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) { if err != nil { return nil, fmt.Errorf("failed to initialize kubelet client: %w", err) } + return &kubeletSummaryProvider{ logger: logger, client: kclient, @@ -63,10 +64,9 @@ func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtr var metrics []*cExtractor.CAdvisorMetric - nodeCPUCores := k.hostInfo.GetNumCores() for _, pod := range summary.Pods { k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name)) - metric := cExtractor.NewCadvisorMetric(ci.TypePod, k.logger) + tags := map[string]string{} tags[ci.PodIDKey] = pod.PodRef.UID @@ -74,19 +74,15 @@ func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtr tags[ci.K8sNamespace] = pod.PodRef.Namespace tags[ci.Timestamp] = strconv.FormatInt(pod.CPU.Time.UnixNano(), 10) - // CPU metric - metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), float64(*pod.CPU.UsageCoreNanoSeconds)) - metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores)) - - // Memory metrics - metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes) - metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes) - metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes) - metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity()) - metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100) - - metric.AddTags(tags) - metrics = append(metrics, metric) + rawMetric := extractors.ConvertPodToRaw(&pod) + for _, extractor := range GetMetricsExtractors() { + if extractor.HasValue(rawMetric) { + metrics = append(metrics, extractor.GetValue(rawMetric, &k.hostInfo, ci.TypePod)...) + } + } + for _, metric := range metrics { + metric.AddTags(tags) + } } return metrics, nil }