Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: implement DD_TRACE_INTEGRATION_TAGS + contrib/net/http support #3156

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions contrib/net/http/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ const (
)

type Config struct {
SpanOpts []ddtrace.StartSpanOption
FinishOpts []ddtrace.FinishOption
IgnoreRequest func(*http.Request) bool
ResourceNamer func(*http.Request) string
IsStatusError func(int) bool
HeaderTags *internal.LockMap
ServiceName string
AnalyticsRate float64
SpanOpts []ddtrace.StartSpanOption
FinishOpts []ddtrace.FinishOption
IgnoreRequest func(*http.Request) bool
ResourceNamer func(*http.Request) string
IsStatusError func(int) bool
HeaderTags *internal.LockMap
ServiceName string
AnalyticsRate float64
IntegrationTags *internal.IntegrationTags
}

// Option represents an option that can be passed to NewServeMux or WrapHandler.
Expand All @@ -52,6 +53,7 @@ func Default() *Config {
}
cfg.IgnoreRequest = func(_ *http.Request) bool { return false }
cfg.ResourceNamer = func(_ *http.Request) string { return "" }
cfg.IntegrationTags = globalconfig.IntegrationTags()

return cfg
}
Expand Down
32 changes: 32 additions & 0 deletions contrib/net/http/internal/wrap/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package wrap

import (
"net"
"net/http"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace"
Expand Down Expand Up @@ -55,6 +56,11 @@ func (mux *ServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
so := make([]ddtrace.StartSpanOption, len(mux.cfg.SpanOpts), len(mux.cfg.SpanOpts)+1)
copy(so, mux.cfg.SpanOpts)
so = append(so, httptrace.HeaderTagsFromRequest(r, mux.cfg.HeaderTags))

for k, v := range serverIntegrationTags(mux.cfg, r) {
so = append(so, tracer.Tag(k, v))
}

TraceAndServe(mux.ServeMux, w, r, &httptrace.ServeConfig{
Service: mux.cfg.ServiceName,
Resource: resource,
Expand All @@ -64,3 +70,29 @@ func (mux *ServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
RouteParams: patternValues(pattern, r),
})
}

func serverIntegrationTags(cfg *config.Config, req *http.Request) map[string]string {
host, port := serverHostPort(req)

q := map[string]string{
"span.kind": ext.SpanKindServer,
"server.address": host,
"server.port": port,
"url.path": req.URL.Path,
"http.request.method": req.Method,
}
return cfg.IntegrationTags.Get(config.ComponentName, q)
}

func serverHostPort(req *http.Request) (string, string) {
ctxLocalAddr := req.Context().Value(http.LocalAddrContextKey)
if ctxLocalAddr == nil {
return "", ""
}
addr, ok := ctxLocalAddr.(net.Addr)
if !ok {
return "", ""
}
host, port, _ := net.SplitHostPort(addr.String())
return host, port
}
42 changes: 22 additions & 20 deletions contrib/net/http/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,19 @@ type RoundTripperBeforeFunc func(*http.Request, ddtrace.Span)
// RoundTrip is made. It is possible for the http Response to be nil.
type RoundTripperAfterFunc func(*http.Response, ddtrace.Span)
type roundTripperConfig struct {
before RoundTripperBeforeFunc
after RoundTripperAfterFunc
analyticsRate float64
serviceName string
resourceNamer func(req *http.Request) string
spanNamer func(req *http.Request) string
ignoreRequest func(*http.Request) bool
spanOpts []ddtrace.StartSpanOption
propagation bool
errCheck func(err error) bool
queryString bool // reports whether the query string is included in the URL tag for http client spans
isStatusError func(statusCode int) bool
before RoundTripperBeforeFunc
after RoundTripperAfterFunc
analyticsRate float64
serviceName string
resourceNamer func(req *http.Request) string
spanNamer func(req *http.Request) string
ignoreRequest func(*http.Request) bool
spanOpts []ddtrace.StartSpanOption
propagation bool
errCheck func(err error) bool
queryString bool // reports whether the query string is included in the URL tag for http client spans
isStatusError func(statusCode int) bool
integrationTags *internal.IntegrationTags
}

func newRoundTripperConfig() *roundTripperConfig {
Expand All @@ -138,14 +139,15 @@ func newRoundTripperConfig() *roundTripperConfig {
}

c := &roundTripperConfig{
serviceName: namingschema.ServiceNameOverrideV0("", ""),
analyticsRate: globalconfig.AnalyticsRate(),
resourceNamer: defaultResourceNamer,
propagation: true,
spanNamer: defaultSpanNamer,
ignoreRequest: func(_ *http.Request) bool { return false },
queryString: internal.BoolEnv(envClientQueryStringEnabled, true),
isStatusError: isClientError,
serviceName: namingschema.ServiceNameOverrideV0("", ""),
analyticsRate: globalconfig.AnalyticsRate(),
resourceNamer: defaultResourceNamer,
propagation: true,
spanNamer: defaultSpanNamer,
ignoreRequest: func(_ *http.Request) bool { return false },
queryString: internal.BoolEnv(envClientQueryStringEnabled, true),
isStatusError: isClientError,
integrationTags: globalconfig.IntegrationTags(),
}
v := os.Getenv(envClientErrorStatuses)
if fn := httptrace.GetErrorCodesFromInput(v); fn != nil {
Expand Down
16 changes: 16 additions & 0 deletions contrib/net/http/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (rt *roundTripper) RoundTrip(req *http.Request) (res *http.Response, err er
if len(rt.cfg.spanOpts) > 0 {
opts = append(opts, rt.cfg.spanOpts...)
}
if tags := rt.clientIntegrationTags(req); tags != nil {
for k, v := range tags {
opts = append(opts, tracer.Tag(k, v))
}
}
span, ctx := tracer.StartSpanFromContext(req.Context(), spanName, opts...)
defer func() {
if rt.cfg.after != nil {
Expand Down Expand Up @@ -108,6 +113,17 @@ func (rt *roundTripper) Unwrap() http.RoundTripper {
return rt.base
}

func (rt *roundTripper) clientIntegrationTags(req *http.Request) map[string]string {
q := map[string]string{
"span.kind": "client",
"server.address": req.URL.Hostname(),
"server.port": req.URL.Port(),
"url.path": req.URL.Path,
"http.request.method": req.Method,
}
return rt.cfg.integrationTags.Get(config.ComponentName, q)
}

// WrapRoundTripper returns a new RoundTripper which traces all requests sent
// over the transport.
func WrapRoundTripper(rt http.RoundTripper, opts ...RoundTripperOption) http.RoundTripper {
Expand Down
93 changes: 93 additions & 0 deletions contrib/net/http/roundtripper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/addresses"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
internallog "gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -830,3 +831,95 @@ func TestAppsec(t *testing.T) {
})
}
}

func TestIntegrationTags(t *testing.T) {
srv1 := httptest.NewServer(nil)
defer srv1.Close()
srv1URL, err := url.Parse(srv1.URL)
require.NoError(t, err)

srv2 := httptest.NewServer(nil)
defer srv2.Close()
srv2URL, err := url.Parse(srv2.URL)
require.NoError(t, err)

integrationTagsEnv := `[
{
"component": "net/http",
"query": [
{"span.kind": "server", "server.port": "%s"},
{"span.kind": "client", "server.port": "%s", "http.request.method": "GET"}
],
"tags": {
"tag1": "val1"
}
},
{
"component": "net/http",
"query": [
{"span.kind": "server", "server.port": "%s"}
],
"tags": {
"tag2": "val2"
}
}
]`
t.Setenv("DD_TRACE_INTEGRATION_TAGS", fmt.Sprintf(integrationTagsEnv, srv1URL.Port(), srv2URL.Port(), srv2URL.Port()))

// force start-stop the real tracer to load the config
tracer.Start(tracer.WithLogger(internallog.DiscardLogger{}))
tracer.Stop()

mt := mocktracer.Start()
defer mt.Stop()

mux := NewServeMux()
mux.HandleFunc("/200", handler200)
mux.HandleFunc("/500", handler500)

srv1.Config.Handler = mux
srv2.Config.Handler = mux

client := WrapClient(&http.Client{})

req, err := http.NewRequest(http.MethodGet, srv1.URL+"/200", nil)
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

req, err = http.NewRequest(http.MethodGet, srv2.URL+"/200", nil)
require.NoError(t, err)
resp, err = client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

spans := mt.FinishedSpans()
require.Len(t, spans, 4)

spanSrv1 := spans[0]
spanClient1 := spans[1]
spanSrv2 := spans[2]
spanClient2 := spans[3]

assert.Equal(t, "server", spanSrv1.Tag("span.kind"))
assert.Equal(t, "server", spanSrv2.Tag("span.kind"))
assert.Equal(t, "client", spanClient1.Tag("span.kind"))
assert.Equal(t, "client", spanClient2.Tag("span.kind"))

// server1
assert.Equal(t, "val1", spanSrv1.Tag("tag1"), "server1 missing tag1")
assert.Nil(t, spanSrv1.Tag("tag2"), "server1 should not have tag2")

// client1
assert.Nil(t, spanClient1.Tag("tag1"), "client should not have tag1")
assert.Nil(t, spanClient1.Tag("tag2"), "client should not have tag2")

// server2
assert.Nil(t, spanSrv2.Tag("tag1"), "server2 should not have tag1")
assert.Equal(t, "val2", spanSrv2.Tag("tag2"), "server2 missing tag2")

// client2
assert.Equal(t, "val1", spanClient2.Tag("tag1"), "client2 missing tag1")
assert.Nil(t, spanClient2.Tag("tag2"), "client2 should not have tag2")
}
13 changes: 13 additions & 0 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ type config struct {

// traceRateLimitPerSecond specifies the rate limit for traces.
traceRateLimitPerSecond float64

// integrationTags allows to set custom tags on integrations.
integrationTags []internal.IntegrationTagsRule
}

// orchestrionConfig contains Orchestrion configuration.
Expand Down Expand Up @@ -605,6 +608,16 @@ func newConfig(opts ...StartOption) *config {
c.runtimeMetricsV2 = false
}

if iTagsStr := os.Getenv("DD_TRACE_INTEGRATION_TAGS"); iTagsStr != "" {
var iTagsRules []internal.IntegrationTagsRule
if err := json.Unmarshal([]byte(iTagsStr), &iTagsRules); err != nil {
log.Warn("failed to parse DD_TRACE_INTEGRATION_TAGS: %v", err)
} else {
c.integrationTags = iTagsRules
globalconfig.SetIntegrationTagsRules(iTagsRules)
}
}

return c
}

Expand Down
31 changes: 24 additions & 7 deletions internal/globalconfig/globalconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ var cfg = &config{
analyticsRate: math.NaN(),
runtimeID: uuid.New().String(),
headersAsTags: internal.NewLockMap(map[string]string{}),
integrationTags: &internal.IntegrationTags{
Rules: nil,
Cache: make(map[string]map[string]string),
},
}

type config struct {
mu sync.RWMutex
analyticsRate float64
serviceName string
runtimeID string
headersAsTags *internal.LockMap
dogstatsdAddr string
statsTags []string
mu sync.RWMutex
analyticsRate float64
serviceName string
runtimeID string
headersAsTags *internal.LockMap
dogstatsdAddr string
statsTags []string
integrationTags *internal.IntegrationTags
}

// AnalyticsRate returns the sampling rate at which events should be marked. It uses
Expand Down Expand Up @@ -130,3 +135,15 @@ func HeaderTagsLen() int {
func ClearHeaderTags() {
cfg.headersAsTags.Clear()
}

func SetIntegrationTagsRules(rules []internal.IntegrationTagsRule) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.integrationTags.Rules = rules
}

func IntegrationTags() *internal.IntegrationTags {
cfg.mu.RLock()
defer cfg.mu.RUnlock()
return cfg.integrationTags
}
Loading
Loading