Skip to content

Commit

Permalink
[chore] Remove obssender factory from BaseExporter, merge obs reporte…
Browse files Browse the repository at this point in the history
…r/sender (open-telemetry#12338)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 11, 2025
1 parent 22171a1 commit 4eeb468
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 235 deletions.
17 changes: 7 additions & 10 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"go.opentelemetry.io/collector/pipeline"
)

type ObsrepSenderFactory = func(obsrep *ObsReport, next Sender[internal.Request]) Sender[internal.Request]

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error

Expand Down Expand Up @@ -59,20 +57,15 @@ type BaseExporter struct {
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterSettings: set, Signal: signal})
if err != nil {
return nil, err
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
be := &BaseExporter{
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
queueFactory: exporterqueue.NewMemoryQueueFactory[internal.Request](),
}

for _, op := range options {
if err = op(be); err != nil {
if err := op(be); err != nil {
return nil, err
}
}
Expand All @@ -84,7 +77,11 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
be.firstSender = be.RetrySender
}

be.ObsrepSender = osf(obsReport, be.firstSender)
var err error
be.ObsrepSender, err = newObsReportSender(set, signal, be.firstSender)
if err != nil {
return nil, err
}
be.firstSender = be.ObsrepSender

if be.batcherCfg.Enabled {
Expand Down
30 changes: 11 additions & 19 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,11 @@ func newNoopExportSender() Sender[internal.Request] {
}}
}

func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] {
return &noopSender{SendFunc: next.Send}
}

func TestBaseExporter(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender)
be, err := NewBaseExporter(defaultSettings, defaultSignal)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -77,7 +73,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal, newNoopObsrepSender,
defaultSettings, defaultSignal,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutConfig()),
Expand All @@ -95,16 +91,16 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)

_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
Expand All @@ -126,7 +122,7 @@ func TestBaseExporterLogging(t *testing.T) {
rCfg.Enabled = false
qCfg := exporterqueue.NewDefaultConfig()
qCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender,
bs, err := NewBaseExporter(set, defaultSignal,
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
WithBatcher(exporterbatcher.NewDefaultConfig()),
WithRetry(rCfg))
Expand Down Expand Up @@ -197,20 +193,16 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, tt.queueOptions...)
be, err := NewBaseExporter(set, pipeline.SignalLogs, tt.queueOptions...)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.ObsrepSender.(*observabilityConsumerSender)
mockR := &requesttest.FakeRequest{Items: 2, ExportErr: errors.New("some error")}
ocs.run(func() {
require.Error(t, be.Send(context.Background(), mockR))
})
sink := requesttest.NewSink()
mockR := &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("some error")}
require.Error(t, be.Send(context.Background(), mockR))
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
ocs.awaitAsyncProcessing()
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
require.NoError(t, be.Shutdown(context.Background()))
assert.Empty(t, 0, sink.RequestsCount())
})
}
for _, tt := range tests {
Expand Down
124 changes: 0 additions & 124 deletions exporter/exporterhelper/internal/obs_report.go

This file was deleted.

113 changes: 107 additions & 6 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,129 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/pipeline"
)

const (
// spanNameSep is duplicate between receiver and exporter.
spanNameSep = "/"

// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// ItemsSent used to track number of items sent by exporters.
ItemsSent = "items.sent"
// ItemsFailed used to track number of items that failed to be sent by exporters.
ItemsFailed = "items.failed"
)

type obsReportSender[K internal.Request] struct {
component.StartFunc
component.ShutdownFunc
obsrep *ObsReport
next Sender[K]

spanName string
tracer trace.Tracer
spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
next Sender[K]
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep, next: next}
func newObsReportSender[K internal.Request](set exporter.Settings, signal pipeline.Signal, next Sender[K]) (Sender[K], error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}

idStr := set.ID.String()
expAttr := attribute.String(ExporterKey, idStr)

or := &obsReportSender[K]{
spanName: ExporterKey + spanNameSep + idStr + spanNameSep + signal.String(),
tracer: metadata.Tracer(set.TelemetrySettings),
spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, signal.String())),
metricAttr: metric.WithAttributeSet(attribute.NewSet(expAttr)),
next: next,
}

switch signal {
case pipeline.SignalTraces:
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans

case pipeline.SignalMetrics:
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints

case pipeline.SignalLogs:
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
}

return or, nil
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
c := ors.obsrep.StartOp(ctx)
c := ors.startOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.next.Send(c, req)
ors.obsrep.EndOp(c, items, err)
ors.endOp(c, items, err)
return err
}

// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs)
return ctx
}

// EndOp completes the export operation that was started with StartOp.
func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)

// No metrics recorded for profiles.
if ors.itemsSentInst != nil {
ors.itemsSentInst.Add(ctx, numSent, ors.metricAttr)
}
// No metrics recorded for profiles.
if ors.itemsFailedInst != nil {
ors.itemsFailedInst.Add(ctx, numFailedToSend, ors.metricAttr)
}

span := trace.SpanFromContext(ctx)
defer span.End()
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(ItemsSent, numSent),
attribute.Int64(ItemsFailed, numFailedToSend),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
}

func toNumItems(numExportedItems int, err error) (int64, int64) {
if err != nil {
return 0, int64(numExportedItems)
}
return int64(numExportedItems), 0
}
Loading

0 comments on commit 4eeb468

Please sign in to comment.