Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: revise kafka sink conf/metrics #3494

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions extensions/impl/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
)

const (
LblRequest = "req"
LblWriteMsgs = "write"
LblMessage = "message"
LblException = "exception"
)

var (
Expand All @@ -32,12 +31,18 @@ var (
Subsystem: "io",
Name: "kafka_count",
Help: "counter of Kafka IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblStatusType, metrics.LblRuleIDType, metrics.LblOpIDType})
Yisaer marked this conversation as resolved.
Show resolved Hide resolved

KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
KafkaDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "kafka_duration",
Help: "Historgram of Kafka IO",
Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
)

func init() {
prometheus.MustRegister(KafkaCounter)
prometheus.MustRegister(KafkaDurationHist)
}
65 changes: 44 additions & 21 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
Expand All @@ -40,9 +41,11 @@
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
LastStats kafkago.WriterStats
}

type kafkaConf struct {
kafkaWriterConf
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
Expand All @@ -54,6 +57,12 @@
Compression string `json:"compression"`
}

type kafkaWriterConf struct {
BatchSize int `json:"batchSize"`
BatchTimeout time.Duration `json:"-"`
BatchBytes int64 `json:"batchBytes"`
}

func (c *kafkaConf) validate() error {
if c.Topic == "" {
return fmt.Errorf("topic can not be empty")
Expand All @@ -65,11 +74,8 @@
}

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
err := cast.MapToStruct(configs, c)
c := getDefaultKafkaConf()
err := c.configure(configs)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
})
Expand Down Expand Up @@ -149,7 +155,9 @@
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
BatchSize: k.kc.BatchSize,
BatchBytes: k.kc.BatchBytes,
BatchTimeout: k.kc.BatchTimeout,
Transport: &kafkago.Transport{
SASL: k.mechanism,
TLS: k.tlsConfig,
Expand All @@ -176,29 +184,20 @@

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Inc()

Check warning on line 187 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L187

Added line #L187 was not covered by tests
}()
msgs, err := k.collect(ctx, item)
if err != nil {
return err
}
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs)))
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
KafkaDurationHist.WithLabelValues(LblWriteMsgs, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))

Check warning on line 195 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L195

Added line #L195 was not covered by tests
}()
return k.writer.WriteMessages(ctx, msgs...)
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
allMsgs := make([]kafkago.Message, 0)
items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool {
msgs, err := k.collect(ctx, tuple)
Expand All @@ -208,13 +207,14 @@
allMsgs = append(allMsgs, msgs...)
return true
})
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
conf.Log.Infof("send kafka cost %v", time.Since(start).String())
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
KafkaDurationHist.WithLabelValues(LblWriteMsgs, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))

Check warning on line 213 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L212-L213

Added lines #L212 - L213 were not covered by tests
}()
return k.writer.WriteMessages(ctx, allMsgs...)
err = k.writer.WriteMessages(ctx, allMsgs...)
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
return err

Check warning on line 217 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L215-L217

Added lines #L215 - L217 were not covered by tests
}

func (k *KafkaSink) collect(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Message, error) {
Expand Down Expand Up @@ -338,3 +338,26 @@
_ api.TupleCollector = &KafkaSink{}
_ util.PingableConn = &KafkaSink{}
)

func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
c.kafkaWriterConf = kafkaWriterConf{
BatchSize: 100,
BatchTimeout: time.Microsecond,
BatchBytes: 10485760, // 10MB
}
return c
}

func (kc *kafkaConf) configure(props map[string]interface{}) error {
if err := cast.MapToStruct(props, kc); err != nil {
return err
}

Check warning on line 358 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L357-L358

Added lines #L357 - L358 were not covered by tests
if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil {
return err
}

Check warning on line 361 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L360-L361

Added lines #L360 - L361 were not covered by tests
return nil
}
2 changes: 1 addition & 1 deletion extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
ingestError(ctx, err)
continue
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, metrics.LblSuccess, ctx.GetRuleId(), ctx.GetOpId()).Inc()

Check warning on line 197 in extensions/impl/kafka/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/source.go#L197

Added line #L197 was not covered by tests
ingest(ctx, msg.Value, nil, timex.GetNow())
}
}
Expand Down
7 changes: 6 additions & 1 deletion extensions/impl/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ var (
Help: "counter of SQL IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})

SQLHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
SQLDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "sql_duration",
Help: "Historgram of Kafka IO",
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
)

func init() {
prometheus.MustRegister(SQLCounter)
prometheus.MustRegister(SQLDurationHist)
}
2 changes: 1 addition & 1 deletion extensions/impl/sql/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error
s.needReconnect = true
return errorx.NewIOErr(err.Error())
}
SQLHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
SQLDurationHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
s.needReconnect = false
d, err := r.RowsAffected()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/sql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
failpoint.Inject("QueryErr", func() {
err = errors.New("QueryErr")
})
SQLHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
SQLDurationHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
if err != nil {
logger.Errorf("query sql error %v", err)
s.needReconnect = true
Expand Down
1 change: 0 additions & 1 deletion internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func findTemplateProps(props map[string]any) []string {
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) {
index := 0
result := make([]node.TopNode, 0)
// Batch enabled
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,17 @@
LblRuleStop = "stop"
LblSourceIO = "source"
LblSinkIO = "sink"
LblException = "err"
LblSuccess = "success"
)

func GetStatusValue(err error) string {
if err == nil {
return LblSuccess
}
return LblException

Check warning on line 38 in metrics/metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics/metrics.go#L34-L38

Added lines #L34 - L38 were not covered by tests
}

var (
RuleStatusCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "kuiper",
Expand Down
Loading
Loading