Skip to content

Commit

Permalink
as per DD_APM_PEER_TAGS_AGGREGATION in agent
Browse files Browse the repository at this point in the history
  • Loading branch information
keisku committed Jan 16, 2025
1 parent 6aa8d69 commit fe59e6d
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 59 deletions.
17 changes: 0 additions & 17 deletions contrib/valkey-go/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@ package valkey

import (
"math"
"os"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

const defaultServiceName = "valkey.client"

type clientConfig struct {
serviceName string
spanName string
analyticsRate float64
skipRaw bool
Expand All @@ -28,18 +24,12 @@ type clientConfig struct {
type ClientOption func(*clientConfig)

func defaults(cfg *clientConfig) {
cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound)
if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
}
if v := os.Getenv("DD_TRACE_VALKEY_SERVICE_NAME"); v == "" {
cfg.serviceName = defaultServiceName
} else {
cfg.serviceName = v
}
cfg.skipRaw = internal.BoolEnv("DD_TRACE_VALKEY_SKIP_RAW_COMMAND", false)
}

Expand All @@ -52,13 +42,6 @@ func WithSkipRawCommand(skip bool) ClientOption {
}
}

// WithServiceName sets the given service name for the client.
func WithServiceName(name string) ClientOption {
return func(cfg *clientConfig) {
cfg.serviceName = name
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) ClientOption {
return func(cfg *clientConfig) {
Expand Down
44 changes: 32 additions & 12 deletions contrib/valkey-go/valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,10 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
for _, fn := range opts {
fn(&cfg)
}
var host, portStr string
var host string
var port int
if len(option.InitAddress) == 1 {
host, portStr, err = net.SplitHostPort(option.InitAddress[0])
if err != nil {
log.Error("valkey.ClientOption.InitAddress contains invalid address: %s", err)
}
port, _ = strconv.Atoi(portStr)
host, port = splitHostPort(option.InitAddress[0])
}
core := coreClient{
Client: valkeyClient,
Expand All @@ -83,6 +79,16 @@ func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client,
}, nil
}

func splitHostPort(addr string) (string, int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Error("%q cannot be split: %s", addr, err)
return "", 0
}
port, _ := strconv.Atoi(portStr)
return host, port
}

type commander interface {
Commands() []string
}
Expand Down Expand Up @@ -154,10 +160,26 @@ type buildStartSpanOptionsInput struct {
skipRawCommand bool
}

func (c *coreClient) peerTags() []tracer.StartSpanOption {
ipAddr := net.ParseIP(c.host)
var peerHostKey string
if ipAddr == nil {
peerHostKey = ext.PeerHostname
} else if ipAddr.To4() != nil {
peerHostKey = ext.PeerHostIPV4
} else {
peerHostKey = ext.PeerHostIPV6
}
return []tracer.StartSpanOption{
tracer.Tag(ext.PeerService, ext.DBSystemValkey),
tracer.Tag(peerHostKey, c.host),
tracer.Tag(ext.PeerPort, c.port),
}
}

func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []tracer.StartSpanOption {
opts := []tracer.StartSpanOption{
tracer.SpanType(ext.SpanTypeValkey),
tracer.ServiceName(c.clientConfig.serviceName),
tracer.Tag(ext.TargetHost, c.host),
tracer.Tag(ext.TargetPort, c.port),
tracer.Tag(ext.ValkeyClientVersion, valkey.LibVer),
Expand All @@ -166,13 +188,15 @@ func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []t
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
tracer.Tag(ext.DBType, ext.DBSystemValkey),
tracer.Tag(ext.DBSystem, ext.DBSystemValkey),
tracer.Tag(ext.DBInstance, ext.DBSystemValkey),
tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB),
tracer.Tag(ext.ValkeyClientCommandWrite, input.isWrite),
tracer.Tag(ext.ValkeyClientCommandBlock, input.isBlock),
tracer.Tag(ext.ValkeyClientCommandMulti, input.isMulti),
tracer.Tag(ext.ValkeyClientCommandStream, input.isStream),
tracer.Tag(ext.ValkeyClientCommandWithPassword, c.option.Password != ""),
}
opts = append(opts, c.peerTags()...)
if input.command != "" {
opts = append(opts, []tracer.StartSpanOption{
// valkeyotel tags
Expand Down Expand Up @@ -324,11 +348,7 @@ func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) {
func (c *client) Nodes() map[string]valkey.Client {
nodes := c.Client.Nodes()
for addr, valkeyClient := range nodes {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Error("invalid address is set to valkey client: %s", err)
}
port, _ := strconv.Atoi(portStr)
host, port := splitHostPort(addr)
nodes[addr] = &client{
coreClient: coreClient{
Client: valkeyClient,
Expand Down
135 changes: 105 additions & 30 deletions contrib/valkey-go/valkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package valkey_test
package valkey

import (
"context"
Expand All @@ -14,9 +14,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/valkey-io/valkey-go"
valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

var (
Expand All @@ -35,11 +36,89 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestPeerTags(t *testing.T) {
tests := []struct {
initAddress string
expectedTags map[string]interface{}
}{
{
initAddress: "127.0.0.1:6379",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostIPV4: "127.0.0.1",
ext.PeerPort: 6379,
},
},
{
initAddress: "[::1]:6379",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostIPV6: "::1",
ext.PeerPort: 6379,
},
},
{
initAddress: "[2001:db8::2]:6379",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostIPV6: "2001:db8::2",
ext.PeerPort: 6379,
},
},
{
initAddress: "[2001:db8::2%lo]:6379",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostname: "2001:db8::2%lo",
ext.PeerPort: 6379,
},
},
{
initAddress: "::1:7777",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostname: "",
ext.PeerPort: 0,
},
},
{
initAddress: ":::7777",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostname: "",
ext.PeerPort: 0,
},
},
{
initAddress: "localhost:7777",
expectedTags: map[string]interface{}{
ext.PeerService: "valkey",
ext.PeerHostname: "localhost",
ext.PeerPort: 7777,
},
},
}
for _, tt := range tests {
t.Run(tt.initAddress, func(t *testing.T) {
host, port := splitHostPort(tt.initAddress)
client := coreClient{
host: host,
port: port,
}
var startSpanConfig ddtrace.StartSpanConfig
for _, tag := range client.peerTags() {
tag(&startSpanConfig)
}
require.Equal(t, tt.expectedTags, startSpanConfig.Tags)
})
}
}

func TestNewClient(t *testing.T) {
tests := []struct {
name string
valkeyClientOptions valkey.ClientOption
valkeytraceClientOptions []valkeytrace.ClientOption
valkeytraceClientOptions []ClientOption
valkeytraceClientEnvVars map[string]string
createSpans func(*testing.T, context.Context, valkey.Client)
assertNewClientError func(*testing.T, error)
Expand Down Expand Up @@ -74,19 +153,15 @@ func TestNewClient(t *testing.T) {
Username: valkeyUsername,
Password: valkeyPassword,
},
valkeytraceClientOptions: []valkeytrace.ClientOption{
valkeytrace.WithServiceName("my-valkey-client"),
valkeytrace.WithAnalytics(true),
valkeytrace.WithSkipRawCommand(true),
valkeytraceClientOptions: []ClientOption{
WithAnalytics(true),
WithSkipRawCommand(true),
},
createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) {
assert.NoError(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error())
},
assertSpans: []func(t *testing.T, span mocktracer.Span){
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Equal(t, "SET", span.Tag(ext.DBStatement))
assert.Equal(t, "SET", span.Tag(ext.ResourceName))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
Expand Down Expand Up @@ -119,9 +194,6 @@ func TestNewClient(t *testing.T) {
},
assertSpans: []func(t *testing.T, span mocktracer.Span){
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.DBStatement))
assert.Equal(t, "SET\ntest_key\ntest_value\nGET\ntest_key", span.Tag(ext.ResourceName))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
Expand Down Expand Up @@ -155,9 +227,6 @@ func TestNewClient(t *testing.T) {
},
assertSpans: []func(t *testing.T, span mocktracer.Span){
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
Expand All @@ -175,9 +244,6 @@ func TestNewClient(t *testing.T) {
assert.Nil(t, span.Tag(ext.Error))
},
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement))
assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName))
Expand All @@ -204,7 +270,6 @@ func TestNewClient(t *testing.T) {
Password: valkeyPassword,
},
valkeytraceClientEnvVars: map[string]string{
"DD_TRACE_VALKEY_SERVICE_NAME": "my-valkey-client",
"DD_TRACE_VALKEY_ANALYTICS_ENABLED": "true",
"DD_TRACE_VALKEY_SKIP_RAW_COMMAND": "true",
},
Expand All @@ -214,9 +279,6 @@ func TestNewClient(t *testing.T) {
},
assertSpans: []func(t *testing.T, span mocktracer.Span){
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Equal(t, "GET", span.Tag(ext.DBStatement))
assert.Equal(t, "GET", span.Tag(ext.ResourceName))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
Expand Down Expand Up @@ -252,9 +314,6 @@ func TestNewClient(t *testing.T) {
},
assertSpans: []func(t *testing.T, span mocktracer.Span){
func(t *testing.T, span mocktracer.Span) {
assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.Greater(t, span.Tag("db.stmt_size"), 0)
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.DBStatement))
assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.ResourceName))
Expand All @@ -276,25 +335,40 @@ func TestNewClient(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
mt := mocktracer.Start()
defer mt.Stop()
for k, v := range tt.valkeytraceClientEnvVars {
t.Setenv(k, v)
}
client, err := valkeytrace.NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
span, ctx := tracer.StartSpanFromContext(context.Background(), "test.root", tracer.ServiceName("test-service"))
client, err := NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...)
if tt.assertNewClientError == nil {
require.NoErrorf(t, err, tt.name)
} else {
tt.assertNewClientError(t, err)
span.Finish()
return
}
tt.createSpans(t, ctx, client)
span.Finish() // test.root exists in the last span.
spans := mt.FinishedSpans()
require.Len(t, spans, len(tt.assertSpans))
require.Len(t, spans, len(tt.assertSpans)+1) // +1 for test.root
for i, span := range spans {
if span.OperationName() == "test.root" {
continue
}
tt.assertSpans[i](t, span)
// Following assertions are common to all spans
t.Log("Following assertions are common to all spans")
assert.Equalf(t,
"test-service",
span.Tag(ext.ServiceName),
"service name should not be overwritten as per DD_APM_PEER_TAGS_AGGREGATION in trace-agent",
)
assert.Equal(t, "valkey", span.Tag(ext.PeerService))
assert.Equal(t, "127.0.0.1", span.Tag(ext.PeerHostIPV4))
assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost))
assert.Equal(t, valkeyPort, span.Tag(ext.PeerPort))
assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort))
assert.NotNil(t, span)
assert.True(t, span.Tag(ext.ValkeyClientCommandWithPassword).(bool))
assert.Equal(t, tt.valkeyClientOptions.Username, span.Tag(ext.DBUser))
Expand All @@ -304,6 +378,7 @@ func TestNewClient(t *testing.T) {
assert.Equal(t, "valkey-go/valkey", span.Tag(ext.Component))
assert.Equal(t, "valkey", span.Tag(ext.DBType))
assert.Equal(t, "valkey", span.Tag(ext.DBSystem))
assert.Equal(t, "valkey", span.Tag(ext.DBInstance))
}
})
}
Expand Down

0 comments on commit fe59e6d

Please sign in to comment.