Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(summary): added update metrics metadata api #7235

Open
wants to merge 4 commits into
base: feat/7076_1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
&opAmpModel.AllAgents, agentConfMgr,
)

errorList := qb.PreloadMetricsMetadata(context.Background())
for _, er := range errorList {
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
}

return s, nil
}

Expand Down
161 changes: 160 additions & 1 deletion pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ const (
signozTSLocalTableNameV41Week = "time_series_v4_1week"
signozTSTableNameV41Week = "distributed_time_series_v4_1week"

signozUpdatedMetricsMetadataLocalTable = "updated_metadata"
signozUpdatedMetricsMetadataTable = "distributed_updated_metadata"

minTimespanForProgressiveSearch = time.Hour
minTimespanForProgressiveSearchMargin = time.Minute
maxProgressiveSteps = 4
Expand Down Expand Up @@ -2889,8 +2892,22 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
}

func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) {

metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
var metricNamesToQuery []string
for _, metricName := range metricNames {
updatedMetadata, apiErr := r.GetUpdatedMetricsMetadata(ctx, metricName)
if apiErr != nil {
return nil, apiErr.Err
}
if updatedMetadata != nil {
if _, exists := metricNameToTemporality[metricName]; !exists {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][updatedMetadata.Temporality] = true
} else {
metricNamesToQuery = append(metricNamesToQuery, metricName)
}
}

query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)

Expand Down Expand Up @@ -3720,6 +3737,11 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(
continue
}

metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
if apiError == nil && metadata != nil {
typ = string(metadata.MetricType)
}

// Non-monotonic cumulative sums are treated as gauges
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
typ = "Gauge"
Expand Down Expand Up @@ -3840,6 +3862,16 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
deltaExists = true
}
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
if apiError == nil && metadata != nil {
metricType = string(metadata.MetricType)
temporality = string(metadata.Temporality)
if temporality == string(v3.Delta) {
deltaExists = true
}
isMonotonic = metadata.IsMonotonic
description = metadata.Description
}

query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
Expand Down Expand Up @@ -6569,3 +6601,130 @@ LIMIT 40`,

return fingerprints, nil
}

func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError {
delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable)
err := r.db.Exec(ctx, delQuery, metricName)
if err != nil {
return &model.ApiError{Typ: "ClickHouseError", Err: err}
}
r.cache.Remove(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName)
return nil
}

func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError {
if req.MetricType == v3.MetricTypeHistogram {
labels := []string{"le"}
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
if apiError != nil {
return apiError
}
if !hasLabels {
return &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("metric '%s' cannot be set as histogram type", req.MetricName),
}
}
}

if req.MetricType == v3.MetricTypeSummary {
labels := []string{"quantile"}
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
if apiError != nil {
return apiError
}
if !hasLabels {
return &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("metric '%s' cannot be set as summary type", req.MetricName),
}
}
}

apiErr := r.DeleteMetricsMetadata(ctx, req.MetricName)
if apiErr != nil {
return apiErr
}
insertQuery := fmt.Sprintf(`INSERT INTO %s.%s (metric_name, temporality, is_monotonic, type, description, unit, created_at)
VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
err := r.db.Exec(ctx, insertQuery, req.MetricName, req.Temporality, req.IsMonotonic, req.MetricType, req.Description, req.Unit, req.CreatedAt)
if err != nil {
return &model.ApiError{Typ: "ClickHouseError", Err: err}
}
err = r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
if err != nil {
return &model.ApiError{Typ: "CachingErr", Err: err}
}
return nil
}

func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError) {
if len(labels) == 0 {
return true, nil
}

conditions := "metric_name = ?"
for range labels {
conditions += " AND JSONHas(labels, ?) = 1"
}

query := fmt.Sprintf(`
SELECT count(*) > 0 as has_le
FROM %s.%s
WHERE %s
LIMIT 1`, signozMetricDBName, signozTSTableNameV41Day, conditions)

args := make([]interface{}, 0, len(labels)+1)
args = append(args, metricName)
for _, label := range labels {
args = append(args, label)
}

var hasLE bool
err := r.db.QueryRow(ctx, query, args...).Scan(&hasLE)
if err != nil {
return false, &model.ApiError{
Typ: "ClickHouseError",
Err: fmt.Errorf("error checking summary labels: %v", err),
}
}
return hasLE, nil
}

func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) {
metricsMetadata := new(model.UpdateMetricsMetadata)
retrieve, err := r.cache.Retrieve(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName, metricsMetadata, true)
if err != nil {
query := fmt.Sprintf(`SELECT metric_name, type, description
FROM %s.%s
WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
err := r.db.Select(ctx, query, metricName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incomplete? What are we reading the result into and returning when there is a cache miss?

if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
}
if retrieve == cache.RetrieveStatusHit {
return metricsMetadata, nil
}
return nil, nil
}

func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
var allMetricsMetadata []model.UpdateMetricsMetadata
var errorList []error
// Fetch all rows from ClickHouse
query := fmt.Sprintf(`SELECT metric_name, type, description FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a description in cache? Shouldn't there be temporality?

err := r.db.Select(ctx, &allMetricsMetadata, query)
if err != nil {
errorList = append(errorList, err)
return errorList
}
for _, m := range allMetricsMetadata {
err := r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
if err != nil {
errorList = append(errorList, err)
}
}

return errorList
}
4 changes: 3 additions & 1 deletion pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2)
//explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache))
summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager)

aH := &APIHandler{
Expand Down Expand Up @@ -634,6 +633,9 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddlewar
router.HandleFunc("/api/v1/metrics/inspect",
am.ViewAccess(ah.GetInspectMetricsData)).
Methods(http.MethodPost)
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
am.ViewAccess(ah.UpdateMetricsMetadataData)).
Methods(http.MethodPost)
}

func Intersection(a, b []int) (c []int) {
Expand Down
63 changes: 62 additions & 1 deletion pkg/query-service/app/metricsexplorer/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package metricsexplorer
import (
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"strconv"

"go.signoz.io/signoz/pkg/query-service/constants"

v3 "go.signoz.io/signoz/pkg/query-service/model/v3"

"go.signoz.io/signoz/pkg/query-service/model"
Expand Down Expand Up @@ -82,8 +85,66 @@ func ParseInspectMetricsParams(r *http.Request) (*metrics_explorer.InspectMetric
if err := json.NewDecoder(r.Body).Decode(&inspectMetricParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
if inspectMetricParams.End-inspectMetricParams.Start > 1800000 { // half hour only
if inspectMetricParams.End-inspectMetricParams.Start > constants.InspectMetricsMaxTimeDiff { // half hour only
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("time duration shouldn't be more than 30 mins")}
}
return &inspectMetricParams, nil
}

func ParseUpdateMetricsMetadataParams(r *http.Request) (*metrics_explorer.UpdateMetricsMetadataRequest, *model.ApiError) {
var updateMetricsMetadataReq metrics_explorer.UpdateMetricsMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&updateMetricsMetadataReq); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
updateMetricsMetadataReq.MetricName = mux.Vars(r)["metric_name"]

if updateMetricsMetadataReq.Description == "" {
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("description is required"),
}
}

switch updateMetricsMetadataReq.MetricType {
case v3.MetricTypeSum:
if updateMetricsMetadataReq.Temporality == "" {
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("temporality is required when metric type is Sum"),
}
}

if updateMetricsMetadataReq.Temporality != v3.Cumulative && updateMetricsMetadataReq.Temporality != v3.Delta {
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("invalid value for temporality"),
}
}
case v3.MetricTypeHistogram:
if updateMetricsMetadataReq.Temporality == "" {
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("temporality is required when metric type is Sum"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the histogram case, the error message refers to 'Sum' instead of 'Histogram'. Update the error message to state that temporality is required when metric type is Histogram.

Suggested change
Err: fmt.Errorf("temporality is required when metric type is Sum"),
Err: fmt.Errorf("temporality is required when metric type is Histogram"),

}
}

if updateMetricsMetadataReq.IsMonotonic && updateMetricsMetadataReq.Temporality == v3.Delta {
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("histogram metrics with Delta temporality can't be monotonically increased"),
}
}
case v3.MetricTypeGauge:
case v3.MetricTypeSummary:
if updateMetricsMetadataReq.Temporality == "" {
updateMetricsMetadataReq.Temporality = v3.Unspecified
}

default:
return nil, &model.ApiError{
Typ: model.ErrorBadData,
Err: fmt.Errorf("invalid metric type"),
}
}
return &updateMetricsMetadataReq, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add validation here. The update metric type should be one of the valid metric types, same for temporality. A gauge can't have temporality. If a metric is being updated to Histogram, and if there are not buckets in the data, we should reject etc... all sensible validations.

}
20 changes: 20 additions & 0 deletions pkg/query-service/app/metricsexplorer/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,23 @@ func (receiver *SummaryService) GetInspectMetrics(ctx context.Context, params *m

return baseResponse, nil
}

func (receiver *SummaryService) UpdateMetricsMetadata(ctx context.Context, params *metrics_explorer.UpdateMetricsMetadataRequest) *model.ApiError {
if params.MetricType == v3.MetricTypeSum && !params.IsMonotonic && params.Temporality == v3.Cumulative {
params.MetricType = v3.MetricTypeGauge
}
metadata := model.UpdateMetricsMetadata{
MetricName: params.MetricName,
MetricType: params.MetricType,
Temporality: params.Temporality,
Unit: params.Unit,
Description: params.Description,
IsMonotonic: params.IsMonotonic,
CreatedAt: time.Now(),
}
apiError := receiver.reader.UpdateMetricsMetadata(ctx, &metadata)
if apiError != nil {
return apiError
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/query-service/app/querier/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ func (q *querier) runBuilderQuery(
return
}

if builderQuery.DataSource == v3.DataSourceMetrics {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
if apiError == nil && metadata != nil {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
builderQuery.Temporality = metadata.Temporality
}
}

// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
Expand Down
8 changes: 8 additions & 0 deletions pkg/query-service/app/querier/v2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ func (q *querier) runBuilderQuery(
return
}

if builderQuery.DataSource == v3.DataSourceMetrics {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
if apiError == nil && metadata != nil {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
builderQuery.Temporality = metadata.Temporality
}
}

// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
Expand Down
5 changes: 5 additions & 0 deletions pkg/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
&opAmpModel.AllAgents, agentConfMgr,
)

errorList := reader.PreloadMetricsMetadata(context.Background())
for _, er := range errorList {
zap.L().Error("preload metrics updated metadata failed", zap.Error(er))
}

return s, nil
}

Expand Down
Loading
Loading