diff --git a/contrib/net/http/internal/config/config.go b/contrib/net/http/internal/config/config.go index cfa06a42a9..d9f043d954 100644 --- a/contrib/net/http/internal/config/config.go +++ b/contrib/net/http/internal/config/config.go @@ -23,14 +23,15 @@ const ( ) type Config struct { - SpanOpts []ddtrace.StartSpanOption - FinishOpts []ddtrace.FinishOption - IgnoreRequest func(*http.Request) bool - ResourceNamer func(*http.Request) string - IsStatusError func(int) bool - HeaderTags *internal.LockMap - ServiceName string - AnalyticsRate float64 + SpanOpts []ddtrace.StartSpanOption + FinishOpts []ddtrace.FinishOption + IgnoreRequest func(*http.Request) bool + ResourceNamer func(*http.Request) string + IsStatusError func(int) bool + HeaderTags *internal.LockMap + ServiceName string + AnalyticsRate float64 + IntegrationTags *internal.IntegrationTags } // Option represents an option that can be passed to NewServeMux or WrapHandler. @@ -52,6 +53,7 @@ func Default() *Config { } cfg.IgnoreRequest = func(_ *http.Request) bool { return false } cfg.ResourceNamer = func(_ *http.Request) string { return "" } + cfg.IntegrationTags = globalconfig.IntegrationTags() return cfg } diff --git a/contrib/net/http/internal/wrap/mux.go b/contrib/net/http/internal/wrap/mux.go index aebfd1d802..6e5701d04d 100644 --- a/contrib/net/http/internal/wrap/mux.go +++ b/contrib/net/http/internal/wrap/mux.go @@ -6,6 +6,7 @@ package wrap import ( + "net" "net/http" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace" @@ -55,6 +56,11 @@ func (mux *ServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { so := make([]ddtrace.StartSpanOption, len(mux.cfg.SpanOpts), len(mux.cfg.SpanOpts)+1) copy(so, mux.cfg.SpanOpts) so = append(so, httptrace.HeaderTagsFromRequest(r, mux.cfg.HeaderTags)) + + for k, v := range serverIntegrationTags(mux.cfg, r) { + so = append(so, tracer.Tag(k, v)) + } + TraceAndServe(mux.ServeMux, w, r, &httptrace.ServeConfig{ Service: mux.cfg.ServiceName, Resource: resource, @@ -64,3 +70,29 @@ func (mux *ServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { RouteParams: patternValues(pattern, r), }) } + +func serverIntegrationTags(cfg *config.Config, req *http.Request) map[string]string { + host, port := serverHostPort(req) + + q := map[string]string{ + "span.kind": ext.SpanKindServer, + "server.address": host, + "server.port": port, + "url.path": req.URL.Path, + "http.request.method": req.Method, + } + return cfg.IntegrationTags.Get(config.ComponentName, q) +} + +func serverHostPort(req *http.Request) (string, string) { + ctxLocalAddr := req.Context().Value(http.LocalAddrContextKey) + if ctxLocalAddr == nil { + return "", "" + } + addr, ok := ctxLocalAddr.(net.Addr) + if !ok { + return "", "" + } + host, port, _ := net.SplitHostPort(addr.String()) + return host, port +} diff --git a/contrib/net/http/option.go b/contrib/net/http/option.go index fc292ea332..740ea736a3 100644 --- a/contrib/net/http/option.go +++ b/contrib/net/http/option.go @@ -114,18 +114,19 @@ type RoundTripperBeforeFunc func(*http.Request, ddtrace.Span) // RoundTrip is made. It is possible for the http Response to be nil. type RoundTripperAfterFunc func(*http.Response, ddtrace.Span) type roundTripperConfig struct { - before RoundTripperBeforeFunc - after RoundTripperAfterFunc - analyticsRate float64 - serviceName string - resourceNamer func(req *http.Request) string - spanNamer func(req *http.Request) string - ignoreRequest func(*http.Request) bool - spanOpts []ddtrace.StartSpanOption - propagation bool - errCheck func(err error) bool - queryString bool // reports whether the query string is included in the URL tag for http client spans - isStatusError func(statusCode int) bool + before RoundTripperBeforeFunc + after RoundTripperAfterFunc + analyticsRate float64 + serviceName string + resourceNamer func(req *http.Request) string + spanNamer func(req *http.Request) string + ignoreRequest func(*http.Request) bool + spanOpts []ddtrace.StartSpanOption + propagation bool + errCheck func(err error) bool + queryString bool // reports whether the query string is included in the URL tag for http client spans + isStatusError func(statusCode int) bool + integrationTags *internal.IntegrationTags } func newRoundTripperConfig() *roundTripperConfig { @@ -138,14 +139,15 @@ func newRoundTripperConfig() *roundTripperConfig { } c := &roundTripperConfig{ - serviceName: namingschema.ServiceNameOverrideV0("", ""), - analyticsRate: globalconfig.AnalyticsRate(), - resourceNamer: defaultResourceNamer, - propagation: true, - spanNamer: defaultSpanNamer, - ignoreRequest: func(_ *http.Request) bool { return false }, - queryString: internal.BoolEnv(envClientQueryStringEnabled, true), - isStatusError: isClientError, + serviceName: namingschema.ServiceNameOverrideV0("", ""), + analyticsRate: globalconfig.AnalyticsRate(), + resourceNamer: defaultResourceNamer, + propagation: true, + spanNamer: defaultSpanNamer, + ignoreRequest: func(_ *http.Request) bool { return false }, + queryString: internal.BoolEnv(envClientQueryStringEnabled, true), + isStatusError: isClientError, + integrationTags: globalconfig.IntegrationTags(), } v := os.Getenv(envClientErrorStatuses) if fn := httptrace.GetErrorCodesFromInput(v); fn != nil { diff --git a/contrib/net/http/roundtripper.go b/contrib/net/http/roundtripper.go index 79a46828ca..609080391a 100644 --- a/contrib/net/http/roundtripper.go +++ b/contrib/net/http/roundtripper.go @@ -57,6 +57,11 @@ func (rt *roundTripper) RoundTrip(req *http.Request) (res *http.Response, err er if len(rt.cfg.spanOpts) > 0 { opts = append(opts, rt.cfg.spanOpts...) } + if tags := rt.clientIntegrationTags(req); tags != nil { + for k, v := range tags { + opts = append(opts, tracer.Tag(k, v)) + } + } span, ctx := tracer.StartSpanFromContext(req.Context(), spanName, opts...) defer func() { if rt.cfg.after != nil { @@ -108,6 +113,17 @@ func (rt *roundTripper) Unwrap() http.RoundTripper { return rt.base } +func (rt *roundTripper) clientIntegrationTags(req *http.Request) map[string]string { + q := map[string]string{ + "span.kind": "client", + "server.address": req.URL.Hostname(), + "server.port": req.URL.Port(), + "url.path": req.URL.Path, + "http.request.method": req.Method, + } + return rt.cfg.integrationTags.Get(config.ComponentName, q) +} + // WrapRoundTripper returns a new RoundTripper which traces all requests sent // over the transport. func WrapRoundTripper(rt http.RoundTripper, opts ...RoundTripperOption) http.RoundTripper { diff --git a/contrib/net/http/roundtripper_test.go b/contrib/net/http/roundtripper_test.go index f532371593..3b29d3afe3 100644 --- a/contrib/net/http/roundtripper_test.go +++ b/contrib/net/http/roundtripper_test.go @@ -28,6 +28,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/addresses" "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" + internallog "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -830,3 +831,95 @@ func TestAppsec(t *testing.T) { }) } } + +func TestIntegrationTags(t *testing.T) { + srv1 := httptest.NewServer(nil) + defer srv1.Close() + srv1URL, err := url.Parse(srv1.URL) + require.NoError(t, err) + + srv2 := httptest.NewServer(nil) + defer srv2.Close() + srv2URL, err := url.Parse(srv2.URL) + require.NoError(t, err) + + integrationTagsEnv := `[ + { + "component": "net/http", + "query": [ + {"span.kind": "server", "server.port": "%s"}, + {"span.kind": "client", "server.port": "%s", "http.request.method": "GET"} + ], + "tags": { + "tag1": "val1" + } + }, + { + "component": "net/http", + "query": [ + {"span.kind": "server", "server.port": "%s"} + ], + "tags": { + "tag2": "val2" + } + } +]` + t.Setenv("DD_TRACE_INTEGRATION_TAGS", fmt.Sprintf(integrationTagsEnv, srv1URL.Port(), srv2URL.Port(), srv2URL.Port())) + + // force start-stop the real tracer to load the config + tracer.Start(tracer.WithLogger(internallog.DiscardLogger{})) + tracer.Stop() + + mt := mocktracer.Start() + defer mt.Stop() + + mux := NewServeMux() + mux.HandleFunc("/200", handler200) + mux.HandleFunc("/500", handler500) + + srv1.Config.Handler = mux + srv2.Config.Handler = mux + + client := WrapClient(&http.Client{}) + + req, err := http.NewRequest(http.MethodGet, srv1.URL+"/200", nil) + require.NoError(t, err) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + req, err = http.NewRequest(http.MethodGet, srv2.URL+"/200", nil) + require.NoError(t, err) + resp, err = client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + spans := mt.FinishedSpans() + require.Len(t, spans, 4) + + spanSrv1 := spans[0] + spanClient1 := spans[1] + spanSrv2 := spans[2] + spanClient2 := spans[3] + + assert.Equal(t, "server", spanSrv1.Tag("span.kind")) + assert.Equal(t, "server", spanSrv2.Tag("span.kind")) + assert.Equal(t, "client", spanClient1.Tag("span.kind")) + assert.Equal(t, "client", spanClient2.Tag("span.kind")) + + // server1 + assert.Equal(t, "val1", spanSrv1.Tag("tag1"), "server1 missing tag1") + assert.Nil(t, spanSrv1.Tag("tag2"), "server1 should not have tag2") + + // client1 + assert.Nil(t, spanClient1.Tag("tag1"), "client should not have tag1") + assert.Nil(t, spanClient1.Tag("tag2"), "client should not have tag2") + + // server2 + assert.Nil(t, spanSrv2.Tag("tag1"), "server2 should not have tag1") + assert.Equal(t, "val2", spanSrv2.Tag("tag2"), "server2 missing tag2") + + // client2 + assert.Equal(t, "val1", spanClient2.Tag("tag1"), "client2 missing tag1") + assert.Nil(t, spanClient2.Tag("tag2"), "client2 should not have tag2") +} diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index a9fedd5044..4ccb970bdc 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -308,6 +308,9 @@ type config struct { // traceRateLimitPerSecond specifies the rate limit for traces. traceRateLimitPerSecond float64 + + // integrationTags allows to set custom tags on integrations. + integrationTags []internal.IntegrationTagsRule } // orchestrionConfig contains Orchestrion configuration. @@ -605,6 +608,16 @@ func newConfig(opts ...StartOption) *config { c.runtimeMetricsV2 = false } + if iTagsStr := os.Getenv("DD_TRACE_INTEGRATION_TAGS"); iTagsStr != "" { + var iTagsRules []internal.IntegrationTagsRule + if err := json.Unmarshal([]byte(iTagsStr), &iTagsRules); err != nil { + log.Warn("failed to parse DD_TRACE_INTEGRATION_TAGS: %v", err) + } else { + c.integrationTags = iTagsRules + globalconfig.SetIntegrationTagsRules(iTagsRules) + } + } + return c } diff --git a/internal/globalconfig/globalconfig.go b/internal/globalconfig/globalconfig.go index a36f50035f..3b6b446362 100644 --- a/internal/globalconfig/globalconfig.go +++ b/internal/globalconfig/globalconfig.go @@ -20,16 +20,21 @@ var cfg = &config{ analyticsRate: math.NaN(), runtimeID: uuid.New().String(), headersAsTags: internal.NewLockMap(map[string]string{}), + integrationTags: &internal.IntegrationTags{ + Rules: nil, + Cache: make(map[string]map[string]string), + }, } type config struct { - mu sync.RWMutex - analyticsRate float64 - serviceName string - runtimeID string - headersAsTags *internal.LockMap - dogstatsdAddr string - statsTags []string + mu sync.RWMutex + analyticsRate float64 + serviceName string + runtimeID string + headersAsTags *internal.LockMap + dogstatsdAddr string + statsTags []string + integrationTags *internal.IntegrationTags } // AnalyticsRate returns the sampling rate at which events should be marked. It uses @@ -130,3 +135,15 @@ func HeaderTagsLen() int { func ClearHeaderTags() { cfg.headersAsTags.Clear() } + +func SetIntegrationTagsRules(rules []internal.IntegrationTagsRule) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + cfg.integrationTags.Rules = rules +} + +func IntegrationTags() *internal.IntegrationTags { + cfg.mu.RLock() + defer cfg.mu.RUnlock() + return cfg.integrationTags +} diff --git a/internal/utils.go b/internal/utils.go index bc9a12b4af..b68b9b2819 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -6,6 +6,9 @@ package internal import ( + "fmt" + "sort" + "strings" "sync" "sync/atomic" ) @@ -68,3 +71,72 @@ func (l *LockMap) Get(k string) string { defer l.RUnlock() return l.m[k] } + +type IntegrationTags struct { + Rules []IntegrationTagsRule + Cache map[string]map[string]string +} + +func (i *IntegrationTags) Get(component string, instanceKeys map[string]string) map[string]string { + instanceKeys["component"] = component + cacheKey := mapToKey(instanceKeys) + + fmt.Println(cacheKey) + if res, ok := i.Cache[cacheKey]; ok { + fmt.Printf("got from cache: %s\n", cacheKey) + return res + } + + tags := getRuleTags(i.Rules, component, instanceKeys) + i.Cache[cacheKey] = tags + return tags +} + +func mapToKey(m map[string]string) string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + var sb strings.Builder + for i, k := range keys { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(m[k]) + if i < len(keys)-1 { + sb.WriteString(";") + } + } + return sb.String() +} + +func getRuleTags(rules []IntegrationTagsRule, component string, instanceKeys map[string]string) map[string]string { + for _, rule := range rules { + if rule.Component != component { + continue + } + // the list of rules are joined by an OR logic + for _, q := range rule.Query { + // each rule's key/value pairs are joined by an AND logic (all must match) + match := true + for k, v := range q { + if val, ok := instanceKeys[k]; !ok || val != v { + // no match, check next rule + match = false + break + } + } + if match { + return rule.Tags + } + } + } + return nil +} + +type IntegrationTagsRule struct { + Component string `json:"component"` + Query []map[string]string `json:"query"` + Tags map[string]string `json:"tags"` +}