diff --git a/extensions/impl/kafka/metrics.go b/extensions/impl/kafka/metrics.go deleted file mode 100644 index 58a3a506cf..0000000000 --- a/extensions/impl/kafka/metrics.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "github.com/prometheus/client_golang/prometheus" - - "github.com/lf-edge/ekuiper/v2/metrics" -) - -const ( - LblRequest = "req" - LblMessage = "message" - LblException = "exception" -) - -var ( - KafkaCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "kuiper", - Subsystem: "io", - Name: "kafka_count", - Help: "counter of Kafka IO", - }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) - - KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "kuiper", - Subsystem: "io", - Name: "kafka_duration", - Help: "Historgram of Kafka IO", - }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) -) diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index 0e1a9495da..c1785884fa 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -32,6 +32,11 @@ import ( "github.com/lf-edge/ekuiper/v2/pkg/cert" ) +const ( + LblKafka = "kafka" + LblMsg = "msg" +) + type KafkaSink struct { writer *kafkago.Writer kc *kafkaConf @@ -40,9 +45,11 @@ type KafkaSink struct { headerTemplate string saslConf *saslConf mechanism sasl.Mechanism + LastStats kafkago.WriterStats } type kafkaConf struct { + kafkaWriterConf Brokers string `json:"brokers"` Topic string `json:"topic"` MaxAttempts int `json:"maxAttempts"` @@ -54,6 +61,12 @@ type kafkaConf struct { Compression string `json:"compression"` } +type kafkaWriterConf struct { + BatchSize int `json:"batchSize"` + BatchTimeout time.Duration `json:"-"` + BatchBytes int64 `json:"batchBytes"` +} + func (c *kafkaConf) validate() error { if c.Topic == "" { return fmt.Errorf("topic can not be empty") @@ -65,11 +78,8 @@ func (c *kafkaConf) validate() error { } func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error { - c := &kafkaConf{ - RequiredACKs: -1, - MaxAttempts: 1, - } - err := cast.MapToStruct(configs, c) + c := getDefaultKafkaConf() + err := c.configure(configs) failpoint.Inject("kafkaErr", func(val failpoint.Value) { err = mockKakfaSourceErr(val.(int), castConfErr) }) @@ -149,7 +159,9 @@ func (k *KafkaSink) buildKafkaWriter() error { AllowAutoTopicCreation: true, MaxAttempts: k.kc.MaxAttempts, RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs), - BatchSize: 1, + BatchSize: k.kc.BatchSize, + BatchBytes: k.kc.BatchBytes, + BatchTimeout: k.kc.BatchTimeout, Transport: &kafkago.Transport{ SASL: k.mechanism, TLS: k.tlsConfig, @@ -176,29 +188,21 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) { defer func() { - if err != nil { - KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() - } + metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Inc() }() msgs, err := k.collect(ctx, item) if err != nil { return err } - KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() - KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs))) start := time.Now() defer func() { - KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) }() + metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs))) return k.writer.WriteMessages(ctx, msgs...) } func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) { - defer func() { - if err != nil { - KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() - } - }() allMsgs := make([]kafkago.Message, 0) items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool { msgs, err := k.collect(ctx, tuple) @@ -208,13 +212,13 @@ func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleLis allMsgs = append(allMsgs, msgs...) return true }) - KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs))) - KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() start := time.Now() defer func() { - KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) }() - return k.writer.WriteMessages(ctx, allMsgs...) + err = k.writer.WriteMessages(ctx, allMsgs...) + metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs))) + return err } func (k *KafkaSink) collect(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Message, error) { @@ -338,3 +342,26 @@ var ( _ api.TupleCollector = &KafkaSink{} _ util.PingableConn = &KafkaSink{} ) + +func getDefaultKafkaConf() *kafkaConf { + c := &kafkaConf{ + RequiredACKs: -1, + MaxAttempts: 1, + } + c.kafkaWriterConf = kafkaWriterConf{ + BatchSize: 100, + BatchTimeout: time.Microsecond, + BatchBytes: 10485760, // 10MB + } + return c +} + +func (kc *kafkaConf) configure(props map[string]interface{}) error { + if err := cast.MapToStruct(props, kc); err != nil { + return err + } + if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil { + return err + } + return nil +} diff --git a/extensions/impl/kafka/source.go b/extensions/impl/kafka/source.go index c3b25d706f..b16aa8c1e7 100644 --- a/extensions/impl/kafka/source.go +++ b/extensions/impl/kafka/source.go @@ -194,7 +194,7 @@ func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, i ingestError(ctx, err) continue } - KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSourceIO, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Inc() ingest(ctx, msg.Value, nil, timex.GetNow()) } } diff --git a/extensions/impl/sql/metrics.go b/extensions/impl/sql/metrics.go deleted file mode 100644 index 17f6296d3d..0000000000 --- a/extensions/impl/sql/metrics.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sql - -import ( - "github.com/prometheus/client_golang/prometheus" - - "github.com/lf-edge/ekuiper/v2/metrics" -) - -const ( - LblRequest = "req" - LblReconn = "reconnect" - LblException = "exception" -) - -var ( - SQLCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "kuiper", - Subsystem: "io", - Name: "sql_count", - Help: "counter of SQL IO", - }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) - - SQLHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "kuiper", - Subsystem: "io", - Name: "sql_duration", - Help: "Historgram of Kafka IO", - }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) -) diff --git a/extensions/impl/sql/sink.go b/extensions/impl/sql/sink.go index 638066e4a3..16dd186372 100644 --- a/extensions/impl/sql/sink.go +++ b/extensions/impl/sql/sink.go @@ -33,6 +33,12 @@ import ( "github.com/lf-edge/ekuiper/v2/pkg/errorx" ) +const ( + LblInsert = "insert" + LblUpdate = "update" + LblDel = "del" +) + type SQLSinkConnector struct { config *sqlSinkConfig cw *connection.ConnWrapper @@ -156,10 +162,10 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error { func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) { defer func() { if err != nil { - SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() } }() - SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc() return s.collect(ctx, item.ToMap()) } @@ -185,10 +191,10 @@ func (s *SQLSinkConnector) collect(ctx api.StreamContext, item map[string]any) ( func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) { defer func() { if err != nil { - SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc() } }() - SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc() return s.collectList(ctx, items.ToMaps()) } @@ -283,7 +289,7 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error { ctx.GetLogger().Debugf(sqlStr) if s.needReconnect { - SQLCounter.WithLabelValues(LblReconn, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReconn, ctx.GetRuleId(), ctx.GetOpId()).Inc() err := s.conn.Reconnect() if err != nil { return errorx.NewIOErr(err.Error()) @@ -298,7 +304,7 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error s.needReconnect = true return errorx.NewIOErr(err.Error()) } - SQLHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + metrics.IODurationHist.WithLabelValues(LblSql, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) s.needReconnect = false d, err := r.RowsAffected() if err != nil { diff --git a/extensions/impl/sql/source.go b/extensions/impl/sql/source.go index 5a67959bb3..98eade7906 100644 --- a/extensions/impl/sql/source.go +++ b/extensions/impl/sql/source.go @@ -35,6 +35,14 @@ import ( "github.com/lf-edge/ekuiper/v2/pkg/modules" ) +const ( + LblSql = "sql" + LblReq = "req" + LblReconn = "reconn" + LblException = "exception" + LblRecv = "recv" +) + type SQLSourceConnector struct { id string conf *SQLConf @@ -128,19 +136,19 @@ func (s *SQLSourceConnector) Close(ctx api.StreamContext) error { } func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { - SQLCounter.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc() s.queryData(ctx, recvTime, ingest, ingestError) } func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { logger := ctx.GetLogger() if s.needReconnect { - SQLCounter.WithLabelValues(LblReconn, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblReconn, ctx.GetRuleId(), ctx.GetOpId()).Inc() err := s.conn.Reconnect() if err != nil { logger.Errorf("reconnect db error %v", err) ingestError(ctx, err) - SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } } @@ -151,7 +159,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("Get sql query error %v", err) ingestError(ctx, err) - SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } logger.Debugf("Query the database with %s", query) @@ -160,12 +168,12 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, failpoint.Inject("QueryErr", func() { err = errors.New("QueryErr") }) - SQLHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + metrics.IODurationHist.WithLabelValues(LblSql, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) if err != nil { logger.Errorf("query sql error %v", err) s.needReconnect = true ingestError(ctx, err) - SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } else if s.needReconnect { s.needReconnect = false @@ -178,7 +186,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("query %v row ColumnTypes error %v", query, err) ingestError(ctx, err) - SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } for rows.Next() { @@ -192,12 +200,13 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("Run sql scan(%s) error %v", query, err) ingestError(ctx, err) - SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } scanIntoMap(data, columns, cols) s.Query.UpdateMaxIndexValue(data) ingest(ctx, data, nil, rcvTime) + metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblRecv, ctx.GetRuleId(), ctx.GetOpId()).Inc() } } diff --git a/internal/topo/planner/planner_sink.go b/internal/topo/planner/planner_sink.go index d0a5067e17..35e8a23318 100644 --- a/internal/topo/planner/planner_sink.go +++ b/internal/topo/planner/planner_sink.go @@ -170,7 +170,6 @@ func findTemplateProps(props map[string]any) []string { func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) { index := 0 result := make([]node.TopNode, 0) - // Batch enabled if sc.BatchSize > 0 || sc.LingerInterval > 0 { batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval)) if err != nil { diff --git a/metrics/io_metrics.go b/metrics/io_metrics.go new file mode 100644 index 0000000000..1267f2f647 --- /dev/null +++ b/metrics/io_metrics.go @@ -0,0 +1,38 @@ +// Copyright 2025 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +var ( + IOCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "counter", + Help: "counter of IO", + }, []string{LblType, LblIOType, LblStatusType, LblRuleIDType, LblOpIDType}) + + IODurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "duration_hist", + Help: "Historgram Duration of IO", + }, []string{LblType, LblIOType, LblRuleIDType, LblOpIDType}) +) + +func init() { + prometheus.MustRegister(IOCounter) + prometheus.MustRegister(IODurationHist) +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 641a301b1a..aab4a60338 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -27,8 +27,17 @@ const ( LblRuleStop = "stop" LblSourceIO = "source" LblSinkIO = "sink" + LblException = "err" + LblSuccess = "success" ) +func GetStatusValue(err error) string { + if err == nil { + return LblSuccess + } + return LblException +} + var ( RuleStatusCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "kuiper",