Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Proto bytes as attribute #216

Merged
merged 18 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .chloggen/dinesh.gurumurthy_OTEL-1305.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
# The name of the component (e.g. pkg/quantile)
component: pkg/otlp/metrics
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: StatsPayload can now be sent on out channel provided.
# The PR related to this change
issues: [216]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .golangci.yml
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ linters-settings:
- github.com/lightstep/go-expohisto
- github.com/patrickmn/go-cache
- github.com/stretchr/testify
- google.golang.org/protobuf/proto
- golang.org/x/exp
- gopkg.in/yaml.v3
- go.uber.org
Expand Down
11 changes: 10 additions & 1 deletion pkg/otlp/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type translatorConfig struct {
deltaTTL int64

fallbackSourceProvider source.Provider
// statsOut is the channel where the translator will send its APM statsPayload bytes
statsOut chan<- []byte
}

// TranslatorOption is a translator creation option.
Expand Down Expand Up @@ -123,7 +125,6 @@ const (
// The default mode is HistogramModeOff.
func WithHistogramMode(mode HistogramMode) TranslatorOption {
return func(t *translatorConfig) error {

switch mode {
case HistogramModeNoBuckets, HistogramModeCounters, HistogramModeDistributions:
t.HistMode = mode
Expand Down Expand Up @@ -171,6 +172,14 @@ func WithNumberMode(mode NumberMode) TranslatorOption {
}
}

// WithStatsOut sets the channel where the translator will send its APM statsPayload bytes
func WithStatsOut(statsOut chan<- []byte) TranslatorOption {
return func(t *translatorConfig) error {
t.statsOut = statsOut
return nil
}
}

// InitialCumulMonoValueMode defines what the exporter should do with the initial value
// of a cumulative monotonic sum when under the 'cumulative_to_delta' mode.
// It is not used when the mode is 'raw_value'.
Expand Down
2 changes: 1 addition & 1 deletion pkg/otlp/metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
google.golang.org/protobuf v1.31.0
)

require (
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
16 changes: 13 additions & 3 deletions pkg/otlp/metrics/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ import (
"strings"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
otelmetric "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/exp/slices"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics/internal/instrumentationlibrary"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics/internal/instrumentationscope"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
)

const (
Expand Down Expand Up @@ -716,6 +716,16 @@ func (t *Translator) MapMetrics(ctx context.Context, md pmetric.Metrics, consume
newMetrics := pmetric.NewMetricSlice()
for k := 0; k < metricsArray.Len(); k++ {
md := metricsArray.At(k)
if md.Name() == keyStatsPayload && md.Type() == pmetric.MetricTypeSum {

// these metrics are an APM Stats payload; consume it as such
for l := 0; l < md.Sum().DataPoints().Len(); l++ {
if payload, ok := md.Sum().DataPoints().At(l).Attributes().Get(keyStatsPayload); ok && t.cfg.statsOut != nil && payload.Type() == pcommon.ValueTypeBytes {
t.cfg.statsOut <- payload.Bytes().AsRaw()
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
}
}
continue
}
if v, ok := runtimeMetricsMappings[md.Name()]; ok {
metadata.Languages = extractLanguageTag(md.Name(), metadata.Languages)
for _, mp := range v {
Expand Down
39 changes: 33 additions & 6 deletions pkg/otlp/metrics/metrics_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"testing"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile/summary"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile/summary"
"google.golang.org/protobuf/proto"
)

func TestIsCumulativeMonotonic(t *testing.T) {
Expand Down Expand Up @@ -98,12 +98,13 @@ func (t testProvider) Source(context.Context) (source.Source, error) {
}, nil
}

func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
func newTranslatorWithStatsChannel(t *testing.T, logger *zap.Logger, ch chan []byte) *Translator {
options := []TranslatorOption{
WithFallbackSourceProvider(testProvider(fallbackHostname)),
WithHistogramMode(HistogramModeDistributions),
WithNumberMode(NumberModeCumulativeToDelta),
WithHistogramAggregations(),
WithStatsOut(ch),
}

set := componenttest.NewNopTelemetrySettings()
Expand All @@ -117,6 +118,10 @@ func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
return tr
}

func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
return newTranslatorWithStatsChannel(t, logger, nil)
}

type metric struct {
name string
typ DataType
Expand Down Expand Up @@ -921,6 +926,28 @@ func TestMapAPMStats(t *testing.T) {
require.Equal(t, consumer.apmstats, statsPayloads)
}

func TestMapAPMStatsWithBytes(t *testing.T) {
consumer := &mockFullConsumer{}
logger, err := zap.NewDevelopment()
require.NoError(t, err)
ch := make(chan []byte, 10)
tr := newTranslatorWithStatsChannel(t, logger, ch)
want := &pb.StatsPayload{
Stats: []*pb.ClientStatsPayload{statsPayloads[0], statsPayloads[1]},
}
md, err := tr.StatsToMetrics(want)
assert.NoError(t, err)

ctx := context.Background()
tr.MapMetrics(ctx, md, consumer)
got := &pb.StatsPayload{}

payload := <-ch
err = proto.Unmarshal(payload, got)
assert.NoError(t, err)
assert.True(t, proto.Equal(want, got))
}

func TestMapDoubleMonotonicReportDiffForFirstValue(t *testing.T) {
ctx := context.Background()
tr := newTranslator(t, zap.NewNop())
Expand Down
30 changes: 27 additions & 3 deletions pkg/otlp/metrics/statspayload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/sketches-go/ddsketch"
"github.com/DataDog/sketches-go/ddsketch/mapping"
"github.com/DataDog/sketches-go/ddsketch/pb/sketchpb"
Expand All @@ -27,16 +29,17 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
)

// keyAPMStats specifies the key name of the resource attribute which identifies resource metrics
// as being an APM Stats Payload. The presence of the key results in them being treated and consumed
// differently by the Translator.
const keyAPMStats = "_dd.apm_stats"

// keyStatsPayload is the key for the stats payload in the attributes map.
// This is used as Metric name and Attribute key.
const keyStatsPayload = "dd.internal.stats.payload"

// This group of constants specifies the metric attribute keys used for APM Stats aggregation keys.
const (
statsKeyHostname = "dd.hostname"
Expand Down Expand Up @@ -473,3 +476,24 @@ func getInt(m pcommon.Map, k string) uint64 {
}
return uint64(v.Int())
}

// StatsToMetrics converts a StatsPayload to a pdata.Metrics
func (t *Translator) StatsToMetrics(sp *pb.StatsPayload) (pmetric.Metrics, error) {
bytes, err := proto.Marshal(sp)
if err != nil {
t.logger.Error("Failed to marshal stats payload", zap.Error(err))
return pmetric.NewMetrics(), err
}
mmx := pmetric.NewMetrics()
rmx := mmx.ResourceMetrics().AppendEmpty()
smx := rmx.ScopeMetrics().AppendEmpty()
mslice := smx.Metrics()
mx := mslice.AppendEmpty()
mx.SetName(keyStatsPayload)
sum := mx.SetEmptySum()
sum.SetIsMonotonic(false)
dp := sum.DataPoints().AppendEmpty()
byteSlice := dp.Attributes().PutEmptyBytes(keyStatsPayload)
byteSlice.Append(bytes...)
return mmx, nil
}
26 changes: 26 additions & 0 deletions pkg/otlp/metrics/statspayload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,32 @@ func TestConversion(t *testing.T) {
if found != len(want.Stats) {
t.Fatalf("Found %d/%d", found, len(want.Stats))
}
mx, err := trans.StatsToMetrics(want)
assert.NoError(t, err)
var results []*pb.StatsPayload
for i := 0; i < mx.ResourceMetrics().Len(); i++ {
rm := mx.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(i)
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
for k := 0; k < sm.Metrics().Len(); k++ {
md := sm.Metrics().At(k)
// these metrics are an APM Stats payload; consume it as such
for l := 0; l < md.Sum().DataPoints().Len(); l++ {
if payload, ok := md.Sum().DataPoints().At(l).Attributes().Get(keyStatsPayload); ok {

stats := &pb.StatsPayload{}
err = proto.Unmarshal(payload.Bytes().AsRaw(), stats)
assert.NoError(t, err)
results = append(results, stats)
}
}
assert.NoError(t, err)
}
}
}

assert.Len(t, results, 1)
assert.True(t, proto.Equal(want, results[0]))
})
}

Expand Down
Loading