From fe59e6d14442bd3742caa4ce2e88da62dda8f685 Mon Sep 17 00:00:00 2001 From: keisku Date: Thu, 16 Jan 2025 02:58:00 +0000 Subject: [PATCH] as per DD_APM_PEER_TAGS_AGGREGATION in agent --- contrib/valkey-go/option.go | 17 ---- contrib/valkey-go/valkey.go | 44 +++++++--- contrib/valkey-go/valkey_test.go | 135 ++++++++++++++++++++++++------- 3 files changed, 137 insertions(+), 59 deletions(-) diff --git a/contrib/valkey-go/option.go b/contrib/valkey-go/option.go index 21b16498ae..406ca5a753 100644 --- a/contrib/valkey-go/option.go +++ b/contrib/valkey-go/option.go @@ -9,16 +9,12 @@ package valkey import ( "math" - "os" "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" ) -const defaultServiceName = "valkey.client" - type clientConfig struct { - serviceName string spanName string analyticsRate float64 skipRaw bool @@ -28,18 +24,12 @@ type clientConfig struct { type ClientOption func(*clientConfig) func defaults(cfg *clientConfig) { - cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound) if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) { cfg.analyticsRate = 1.0 } else { cfg.analyticsRate = math.NaN() } - if v := os.Getenv("DD_TRACE_VALKEY_SERVICE_NAME"); v == "" { - cfg.serviceName = defaultServiceName - } else { - cfg.serviceName = v - } cfg.skipRaw = internal.BoolEnv("DD_TRACE_VALKEY_SKIP_RAW_COMMAND", false) } @@ -52,13 +42,6 @@ func WithSkipRawCommand(skip bool) ClientOption { } } -// WithServiceName sets the given service name for the client. -func WithServiceName(name string) ClientOption { - return func(cfg *clientConfig) { - cfg.serviceName = name - } -} - // WithAnalytics enables Trace Analytics for all started spans. func WithAnalytics(on bool) ClientOption { return func(cfg *clientConfig) { diff --git a/contrib/valkey-go/valkey.go b/contrib/valkey-go/valkey.go index f470c01a32..1bdf183463 100644 --- a/contrib/valkey-go/valkey.go +++ b/contrib/valkey-go/valkey.go @@ -62,14 +62,10 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client, for _, fn := range opts { fn(&cfg) } - var host, portStr string + var host string var port int if len(option.InitAddress) == 1 { - host, portStr, err = net.SplitHostPort(option.InitAddress[0]) - if err != nil { - log.Error("valkey.ClientOption.InitAddress contains invalid address: %s", err) - } - port, _ = strconv.Atoi(portStr) + host, port = splitHostPort(option.InitAddress[0]) } core := coreClient{ Client: valkeyClient, @@ -83,6 +79,16 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client, }, nil } +func splitHostPort(addr string) (string, int) { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + log.Error("%q cannot be split: %s", addr, err) + return "", 0 + } + port, _ := strconv.Atoi(portStr) + return host, port +} + type commander interface { Commands() []string } @@ -154,10 +160,26 @@ type buildStartSpanOptionsInput struct { skipRawCommand bool } +func (c *coreClient) peerTags() []tracer.StartSpanOption { + ipAddr := net.ParseIP(c.host) + var peerHostKey string + if ipAddr == nil { + peerHostKey = ext.PeerHostname + } else if ipAddr.To4() != nil { + peerHostKey = ext.PeerHostIPV4 + } else { + peerHostKey = ext.PeerHostIPV6 + } + return []tracer.StartSpanOption{ + tracer.Tag(ext.PeerService, ext.DBSystemValkey), + tracer.Tag(peerHostKey, c.host), + tracer.Tag(ext.PeerPort, c.port), + } +} + func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []tracer.StartSpanOption { opts := []tracer.StartSpanOption{ tracer.SpanType(ext.SpanTypeValkey), - tracer.ServiceName(c.clientConfig.serviceName), tracer.Tag(ext.TargetHost, c.host), tracer.Tag(ext.TargetPort, c.port), tracer.Tag(ext.ValkeyClientVersion, valkey.LibVer), @@ -166,6 +188,7 @@ func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []t tracer.Tag(ext.SpanKind, ext.SpanKindClient), tracer.Tag(ext.DBType, ext.DBSystemValkey), tracer.Tag(ext.DBSystem, ext.DBSystemValkey), + tracer.Tag(ext.DBInstance, ext.DBSystemValkey), tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB), tracer.Tag(ext.ValkeyClientCommandWrite, input.isWrite), tracer.Tag(ext.ValkeyClientCommandBlock, input.isBlock), @@ -173,6 +196,7 @@ func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []t tracer.Tag(ext.ValkeyClientCommandStream, input.isStream), tracer.Tag(ext.ValkeyClientCommandWithPassword, c.option.Password != ""), } + opts = append(opts, c.peerTags()...) if input.command != "" { opts = append(opts, []tracer.StartSpanOption{ // valkeyotel tags @@ -324,11 +348,7 @@ func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) { func (c *client) Nodes() map[string]valkey.Client { nodes := c.Client.Nodes() for addr, valkeyClient := range nodes { - host, portStr, err := net.SplitHostPort(addr) - if err != nil { - log.Error("invalid address is set to valkey client: %s", err) - } - port, _ := strconv.Atoi(portStr) + host, port := splitHostPort(addr) nodes[addr] = &client{ coreClient: coreClient{ Client: valkeyClient, diff --git a/contrib/valkey-go/valkey_test.go b/contrib/valkey-go/valkey_test.go index 10329a4e92..8badb15f96 100644 --- a/contrib/valkey-go/valkey_test.go +++ b/contrib/valkey-go/valkey_test.go @@ -2,7 +2,7 @@ // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. -package valkey_test +package valkey import ( "context" @@ -14,9 +14,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/valkey-io/valkey-go" - valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) var ( @@ -35,11 +36,89 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestPeerTags(t *testing.T) { + tests := []struct { + initAddress string + expectedTags map[string]interface{} + }{ + { + initAddress: "127.0.0.1:6379", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostIPV4: "127.0.0.1", + ext.PeerPort: 6379, + }, + }, + { + initAddress: "[::1]:6379", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostIPV6: "::1", + ext.PeerPort: 6379, + }, + }, + { + initAddress: "[2001:db8::2]:6379", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostIPV6: "2001:db8::2", + ext.PeerPort: 6379, + }, + }, + { + initAddress: "[2001:db8::2%lo]:6379", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostname: "2001:db8::2%lo", + ext.PeerPort: 6379, + }, + }, + { + initAddress: "::1:7777", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostname: "", + ext.PeerPort: 0, + }, + }, + { + initAddress: ":::7777", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostname: "", + ext.PeerPort: 0, + }, + }, + { + initAddress: "localhost:7777", + expectedTags: map[string]interface{}{ + ext.PeerService: "valkey", + ext.PeerHostname: "localhost", + ext.PeerPort: 7777, + }, + }, + } + for _, tt := range tests { + t.Run(tt.initAddress, func(t *testing.T) { + host, port := splitHostPort(tt.initAddress) + client := coreClient{ + host: host, + port: port, + } + var startSpanConfig ddtrace.StartSpanConfig + for _, tag := range client.peerTags() { + tag(&startSpanConfig) + } + require.Equal(t, tt.expectedTags, startSpanConfig.Tags) + }) + } +} + func TestNewClient(t *testing.T) { tests := []struct { name string valkeyClientOptions valkey.ClientOption - valkeytraceClientOptions []valkeytrace.ClientOption + valkeytraceClientOptions []ClientOption valkeytraceClientEnvVars map[string]string createSpans func(*testing.T, context.Context, valkey.Client) assertNewClientError func(*testing.T, error) @@ -74,19 +153,15 @@ func TestNewClient(t *testing.T) { Username: valkeyUsername, Password: valkeyPassword, }, - valkeytraceClientOptions: []valkeytrace.ClientOption{ - valkeytrace.WithServiceName("my-valkey-client"), - valkeytrace.WithAnalytics(true), - valkeytrace.WithSkipRawCommand(true), + valkeytraceClientOptions: []ClientOption{ + WithAnalytics(true), + WithSkipRawCommand(true), }, createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { assert.NoError(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error()) }, assertSpans: []func(t *testing.T, span mocktracer.Span){ func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Equal(t, "SET", span.Tag(ext.DBStatement)) assert.Equal(t, "SET", span.Tag(ext.ResourceName)) assert.Greater(t, span.Tag("db.stmt_size"), 0) @@ -119,9 +194,6 @@ func TestNewClient(t *testing.T) { }, assertSpans: []func(t *testing.T, span mocktracer.Span){ func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.DBStatement)) assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.ResourceName)) assert.Greater(t, span.Tag("db.stmt_size"), 0) @@ -155,9 +227,6 @@ func TestNewClient(t *testing.T) { }, assertSpans: []func(t *testing.T, span mocktracer.Span){ func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Greater(t, span.Tag("db.stmt_size"), 0) assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement)) assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName)) @@ -175,9 +244,6 @@ func TestNewClient(t *testing.T) { assert.Nil(t, span.Tag(ext.Error)) }, func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Greater(t, span.Tag("db.stmt_size"), 0) assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement)) assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName)) @@ -204,7 +270,6 @@ func TestNewClient(t *testing.T) { Password: valkeyPassword, }, valkeytraceClientEnvVars: map[string]string{ - "DD_TRACE_VALKEY_SERVICE_NAME": "my-valkey-client", "DD_TRACE_VALKEY_ANALYTICS_ENABLED": "true", "DD_TRACE_VALKEY_SKIP_RAW_COMMAND": "true", }, @@ -214,9 +279,6 @@ func TestNewClient(t *testing.T) { }, assertSpans: []func(t *testing.T, span mocktracer.Span){ func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Equal(t, "GET", span.Tag(ext.DBStatement)) assert.Equal(t, "GET", span.Tag(ext.ResourceName)) assert.Greater(t, span.Tag("db.stmt_size"), 0) @@ -252,9 +314,6 @@ func TestNewClient(t *testing.T) { }, assertSpans: []func(t *testing.T, span mocktracer.Span){ func(t *testing.T, span mocktracer.Span) { - assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) - assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) - assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.Greater(t, span.Tag("db.stmt_size"), 0) assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.DBStatement)) assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.ResourceName)) @@ -276,25 +335,40 @@ func TestNewClient(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() mt := mocktracer.Start() defer mt.Stop() for k, v := range tt.valkeytraceClientEnvVars { t.Setenv(k, v) } - client, err := valkeytrace.NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...) + span, ctx := tracer.StartSpanFromContext(context.Background(), "test.root", tracer.ServiceName("test-service")) + client, err := NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...) if tt.assertNewClientError == nil { require.NoErrorf(t, err, tt.name) } else { tt.assertNewClientError(t, err) + span.Finish() return } tt.createSpans(t, ctx, client) + span.Finish() // test.root exists in the last span. spans := mt.FinishedSpans() - require.Len(t, spans, len(tt.assertSpans)) + require.Len(t, spans, len(tt.assertSpans)+1) // +1 for test.root for i, span := range spans { + if span.OperationName() == "test.root" { + continue + } tt.assertSpans[i](t, span) - // Following assertions are common to all spans + t.Log("Following assertions are common to all spans") + assert.Equalf(t, + "test-service", + span.Tag(ext.ServiceName), + "service name should not be overwritten as per DD_APM_PEER_TAGS_AGGREGATION in trace-agent", + ) + assert.Equal(t, "valkey", span.Tag(ext.PeerService)) + assert.Equal(t, "127.0.0.1", span.Tag(ext.PeerHostIPV4)) + assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.PeerPort)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) assert.NotNil(t, span) assert.True(t, span.Tag(ext.ValkeyClientCommandWithPassword).(bool)) assert.Equal(t, tt.valkeyClientOptions.Username, span.Tag(ext.DBUser)) @@ -304,6 +378,7 @@ func TestNewClient(t *testing.T) { assert.Equal(t, "valkey-go/valkey", span.Tag(ext.Component)) assert.Equal(t, "valkey", span.Tag(ext.DBType)) assert.Equal(t, "valkey", span.Tag(ext.DBSystem)) + assert.Equal(t, "valkey", span.Tag(ext.DBInstance)) } }) }