Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Efa device level metrics #277

Open
wants to merge 6 commits into
base: aws-cwa-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/aws/containerinsight/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
ContainerIDkey = "ContainerId"
GpuDevice = "GpuDevice"
EfaDevice = "EfaDevice"
EniId = "ElasticNetworkInterfaceId"

PodStatus = "pod_status"
ContainerStatus = "container_status"
Expand Down
10 changes: 10 additions & 0 deletions internal/metadataproviders/aws/ec2/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Provider interface {
GetHandlers() *request.Handlers
Hostname(ctx context.Context) (string, error)
InstanceID(ctx context.Context) (string, error)
NetworkInterfaceID(ctx context.Context, mac_address string) (string, error)
}

type metadataClient struct {
Expand Down Expand Up @@ -45,6 +46,15 @@ func (c *metadataClient) InstanceID(_ context.Context) (string, error) {
return c.metadataFallbackEnable.GetMetadata("instance-id")
}


func (c *metadataClient) NetworkInterfaceID(_ context.Context, mac_address string) (string, error) {
eniID, err := c.metadata.GetMetadata("network/interfaces/macs/" + mac_address + "/interface-id")
if err == nil {
return eniID, err
}
return c.metadataFallbackEnable.GetMetadata("network/interfaces/macs/" + mac_address + "/interface-id")
}

func (c *metadataClient) Hostname(_ context.Context) (string, error) {
hostname, err := c.metadata.GetMetadata("hostname")
if err == nil {
Expand Down
3 changes: 3 additions & 0 deletions receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-00010101000000-000000000000
github.com/prometheus/common v0.60.1
github.com/prometheus/prometheus v0.54.1
Expand Down Expand Up @@ -293,3 +294,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware => ../../extension/awsmiddleware

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders => ../../internal/metadataproviders
4 changes: 2 additions & 2 deletions receiver/awscontainerinsightreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 93 additions & 7 deletions receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
Expand All @@ -17,6 +18,8 @@ import (
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/aws/aws-sdk-go/aws/session"
ec2provider "github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders/aws/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)
Expand Down Expand Up @@ -57,13 +60,19 @@ type Scraper struct {
podResourcesStore podResourcesStore
store *efaStore
logger *zap.Logger
ec2Provider ec2MetadataProvider
}

type sysFsReader interface {
EfaDataExists() (bool, error)
ListDevices() ([]efaDeviceName, error)
ListPorts(deviceName efaDeviceName) ([]string, error)
ReadCounter(deviceName efaDeviceName, port string, counter string) (uint64, error)
GetMACAddressFromDeviceName(deviceName efaDeviceName, port int) (string, error)
}

type ec2MetadataProvider interface {
NetworkInterfaceID(ctx context.Context, macAddress string) (string, error)
}

type podResourcesStore interface {
Expand All @@ -78,10 +87,17 @@ type efaStore struct {

// efaDevices is a collection of every Amazon Elastic Fabric Adapter (EFA) device in
// /sys/class/infiniband.
type efaDevices map[efaDeviceName]*efaCounters
type efaDevices map[efaDevice]*efaCounters

type efaDevice struct {
Name efaDeviceName
MacAddress string
EniId string
}

type efaDeviceName string


// efaCounters contains counter values from files in
// /sys/class/infiniband/<Name>/ports/<Port>/hw_counters
// for a single port of one Amazon Elastic Fabric Adapter device.
Expand All @@ -106,6 +122,7 @@ func NewEfaSyfsScraper(logger *zap.Logger, decorator stores.Decorator, podResour
podResourcesStore: podResourcesStore,
store: new(efaStore),
logger: logger,
ec2Provider: ec2provider.NewProvider(session.Must(session.NewSession())),
}

go e.startScrape(ctx)
Expand Down Expand Up @@ -133,10 +150,13 @@ func (s *Scraper) GetMetrics() []pmetric.Metrics {
if store == nil || store.devices == nil {
return result
}
for deviceName, counters := range *store.devices {
for efaDevice, counters := range *store.devices {
if counters == nil {
continue
}
deviceName:= efaDevice.Name
eniId:= efaDevice.EniId

containerInfo := s.podResourcesStore.GetContainerInfo(string(deviceName), efaK8sResourceName)

nodeMetric := stores.NewCIMetric(ci.TypeNodeEFA, s.logger)
Expand Down Expand Up @@ -189,6 +209,7 @@ func (s *Scraper) GetMetrics() []pmetric.Metrics {

for _, m := range allMetrics {
m.AddTag(ci.EfaDevice, string(deviceName))
m.AddTag(ci.EniId, string(eniId))
m.AddTag(ci.Timestamp, strconv.FormatInt(store.timestamp.UnixNano(), 10))
}
for _, m := range podContainerMetrics {
Expand Down Expand Up @@ -222,7 +243,7 @@ func (s *Scraper) startScrape(ctx context.Context) {
for {
select {
case <-ticker.C:
err := s.scrape()
err := s.scrape(ctx)
if err != nil {
s.logger.Warn("Failed to scrape EFA metrics from filesystem", zap.Error(err))
}
Expand All @@ -232,7 +253,7 @@ func (s *Scraper) startScrape(ctx context.Context) {
}
}

func (s *Scraper) scrape() error {
func (s *Scraper) scrape(ctx context.Context) error {
exists, err := s.sysFsReader.EfaDataExists()
if err != nil {
return err
Expand All @@ -243,7 +264,7 @@ func (s *Scraper) scrape() error {

timestamp := time.Now()

devices, err := s.parseEfaDevices()
devices, err := s.parseEfaDevices(ctx)
if err != nil {
return err
}
Expand All @@ -256,7 +277,7 @@ func (s *Scraper) scrape() error {
return nil
}

func (s *Scraper) parseEfaDevices() (*efaDevices, error) {
func (s *Scraper) parseEfaDevices(ctx context.Context) (*efaDevices, error) {
deviceNames, err := s.sysFsReader.ListDevices()
if err != nil {
return nil, err
Expand All @@ -265,11 +286,22 @@ func (s *Scraper) parseEfaDevices() (*efaDevices, error) {
devices := make(efaDevices, len(deviceNames))
for _, name := range deviceNames {
counters, err := s.parseEfaDevice(name)

macAddress, err := s.sysFsReader.GetMACAddressFromDeviceName(name, 1)

eniId, err := s.ec2Provider.NetworkInterfaceID(ctx, macAddress)

if err != nil {
return nil, err
}

devices[name] = counters
device := efaDevice{
Name: name,
MacAddress: macAddress,
EniId: eniId,
}

devices[device] = counters
}

return &devices, nil
Expand Down Expand Up @@ -392,6 +424,60 @@ func (r *sysfsReaderImpl) ReadCounter(deviceName efaDeviceName, port string, cou
return readUint64ValueFromFile(path)
}

func (r *sysfsReaderImpl) GetMACAddressFromDeviceName(deviceName efaDeviceName, port int) (string, error) {

// Construct sysfs path for GID
gidPath := fmt.Sprintf("/sys/class/infiniband/%s/ports/%d/gids/0", string(deviceName), port)

// Read the GID file
gidBytes, err := os.ReadFile(gidPath)
if err != nil {
return "", fmt.Errorf("failed to read GID file: %v", err)
}

ipString := strings.TrimSpace(string(gidBytes))

// Parse the IPv6 address
ip := net.ParseIP(ipString)
if ip == nil || ip.To16() == nil {
return "", fmt.Errorf("invalid IPv6 address")
}

// Verify it's a link-local address (fe80::/10)
if !ip.IsLinkLocalUnicast() {
return "", fmt.Errorf("not a link-local address")
}

// Extract interface identifier (last 64 bits)
interfaceID := ip.To16()[8:]
if len(interfaceID) != 8 {
return "", fmt.Errorf("invalid interface identifier")
}

// Verify EUI-64 format (check for ff:fe in bytes 3-4)
if interfaceID[3] != 0xff || interfaceID[4] != 0xfe {
return "", fmt.Errorf("address does not use EUI-64 format")
}

// Reconstruct MAC address
mac := make(net.HardwareAddr, 6)

// First octet: invert Universal/Local bit (bit 1)
mac[0] = interfaceID[0] ^ 0x02 // XOR with 0b00000010

// Next two bytes remain unchanged
mac[1] = interfaceID[1]
mac[2] = interfaceID[2]

// Last three bytes from the end of the interface ID
mac[3] = interfaceID[5]
mac[4] = interfaceID[6]
mac[5] = interfaceID[7]

return mac.String(), nil
}


func readUint64ValueFromFile(path string) (uint64, error) {
bytes, err := os.ReadFile(path)
if err != nil {
Expand Down
Loading
Loading