-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(summary): added update metrics metadata api #7235
base: feat/7076_1
Are you sure you want to change the base?
Changes from all commits
487fe7f
0d23b80
88b16c5
d6f4e64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,9 @@ const ( | |
signozTSLocalTableNameV41Week = "time_series_v4_1week" | ||
signozTSTableNameV41Week = "distributed_time_series_v4_1week" | ||
|
||
signozUpdatedMetricsMetadataLocalTable = "updated_metadata" | ||
signozUpdatedMetricsMetadataTable = "distributed_updated_metadata" | ||
|
||
minTimespanForProgressiveSearch = time.Hour | ||
minTimespanForProgressiveSearchMargin = time.Minute | ||
maxProgressiveSteps = 4 | ||
|
@@ -2889,8 +2892,22 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) { | |
} | ||
|
||
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) { | ||
|
||
metricNameToTemporality := make(map[string]map[v3.Temporality]bool) | ||
var metricNamesToQuery []string | ||
for _, metricName := range metricNames { | ||
updatedMetadata, apiErr := r.GetUpdatedMetricsMetadata(ctx, metricName) | ||
if apiErr != nil { | ||
return nil, apiErr.Err | ||
} | ||
if updatedMetadata != nil { | ||
if _, exists := metricNameToTemporality[metricName]; !exists { | ||
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) | ||
} | ||
metricNameToTemporality[metricName][updatedMetadata.Temporality] = true | ||
} else { | ||
metricNamesToQuery = append(metricNamesToQuery, metricName) | ||
} | ||
} | ||
|
||
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day) | ||
|
||
|
@@ -3720,6 +3737,11 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes( | |
continue | ||
} | ||
|
||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName) | ||
if apiError == nil && metadata != nil { | ||
typ = string(metadata.MetricType) | ||
} | ||
|
||
// Non-monotonic cumulative sums are treated as gauges | ||
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) { | ||
typ = "Gauge" | ||
|
@@ -3840,6 +3862,16 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se | |
deltaExists = true | ||
} | ||
} | ||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName) | ||
if apiError == nil && metadata != nil { | ||
metricType = string(metadata.MetricType) | ||
temporality = string(metadata.Temporality) | ||
if temporality == string(v3.Delta) { | ||
deltaExists = true | ||
} | ||
isMonotonic = metadata.IsMonotonic | ||
description = metadata.Description | ||
} | ||
|
||
query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day) | ||
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName) | ||
|
@@ -6569,3 +6601,130 @@ LIMIT 40`, | |
|
||
return fingerprints, nil | ||
} | ||
|
||
func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError { | ||
delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable) | ||
err := r.db.Exec(ctx, delQuery, metricName) | ||
if err != nil { | ||
return &model.ApiError{Typ: "ClickHouseError", Err: err} | ||
} | ||
r.cache.Remove(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName) | ||
return nil | ||
} | ||
|
||
func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError { | ||
if req.MetricType == v3.MetricTypeHistogram { | ||
labels := []string{"le"} | ||
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels) | ||
if apiError != nil { | ||
return apiError | ||
} | ||
if !hasLabels { | ||
return &model.ApiError{ | ||
Typ: model.ErrorBadData, | ||
Err: fmt.Errorf("metric '%s' cannot be set as histogram type", req.MetricName), | ||
} | ||
} | ||
} | ||
|
||
if req.MetricType == v3.MetricTypeSummary { | ||
labels := []string{"quantile"} | ||
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels) | ||
if apiError != nil { | ||
return apiError | ||
} | ||
if !hasLabels { | ||
return &model.ApiError{ | ||
Typ: model.ErrorBadData, | ||
Err: fmt.Errorf("metric '%s' cannot be set as summary type", req.MetricName), | ||
} | ||
} | ||
} | ||
|
||
apiErr := r.DeleteMetricsMetadata(ctx, req.MetricName) | ||
if apiErr != nil { | ||
return apiErr | ||
} | ||
insertQuery := fmt.Sprintf(`INSERT INTO %s.%s (metric_name, temporality, is_monotonic, type, description, unit, created_at) | ||
VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadataTable) | ||
err := r.db.Exec(ctx, insertQuery, req.MetricName, req.Temporality, req.IsMonotonic, req.MetricType, req.Description, req.Unit, req.CreatedAt) | ||
if err != nil { | ||
return &model.ApiError{Typ: "ClickHouseError", Err: err} | ||
} | ||
err = r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1) | ||
if err != nil { | ||
return &model.ApiError{Typ: "CachingErr", Err: err} | ||
} | ||
return nil | ||
} | ||
|
||
func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError) { | ||
if len(labels) == 0 { | ||
return true, nil | ||
} | ||
|
||
conditions := "metric_name = ?" | ||
for range labels { | ||
conditions += " AND JSONHas(labels, ?) = 1" | ||
} | ||
|
||
query := fmt.Sprintf(` | ||
SELECT count(*) > 0 as has_le | ||
FROM %s.%s | ||
WHERE %s | ||
LIMIT 1`, signozMetricDBName, signozTSTableNameV41Day, conditions) | ||
|
||
args := make([]interface{}, 0, len(labels)+1) | ||
args = append(args, metricName) | ||
for _, label := range labels { | ||
args = append(args, label) | ||
} | ||
|
||
var hasLE bool | ||
err := r.db.QueryRow(ctx, query, args...).Scan(&hasLE) | ||
if err != nil { | ||
return false, &model.ApiError{ | ||
Typ: "ClickHouseError", | ||
Err: fmt.Errorf("error checking summary labels: %v", err), | ||
} | ||
} | ||
return hasLE, nil | ||
} | ||
|
||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) { | ||
metricsMetadata := new(model.UpdateMetricsMetadata) | ||
retrieve, err := r.cache.Retrieve(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName, metricsMetadata, true) | ||
if err != nil { | ||
query := fmt.Sprintf(`SELECT metric_name, type, description | ||
FROM %s.%s | ||
WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataTable) | ||
err := r.db.Select(ctx, query, metricName) | ||
if err != nil { | ||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} | ||
} | ||
} | ||
if retrieve == cache.RetrieveStatusHit { | ||
return metricsMetadata, nil | ||
} | ||
return nil, nil | ||
} | ||
|
||
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error { | ||
var allMetricsMetadata []model.UpdateMetricsMetadata | ||
var errorList []error | ||
// Fetch all rows from ClickHouse | ||
query := fmt.Sprintf(`SELECT metric_name, type, description FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a description in cache? Shouldn't there be temporality? |
||
err := r.db.Select(ctx, &allMetricsMetadata, query) | ||
if err != nil { | ||
errorList = append(errorList, err) | ||
return errorList | ||
} | ||
for _, m := range allMetricsMetadata { | ||
err := r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1) | ||
if err != nil { | ||
errorList = append(errorList, err) | ||
} | ||
} | ||
|
||
return errorList | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,9 +3,12 @@ package metricsexplorer | |||||
import ( | ||||||
"encoding/json" | ||||||
"fmt" | ||||||
"github.com/gorilla/mux" | ||||||
"net/http" | ||||||
"strconv" | ||||||
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants" | ||||||
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" | ||||||
|
||||||
"go.signoz.io/signoz/pkg/query-service/model" | ||||||
|
@@ -82,8 +85,66 @@ func ParseInspectMetricsParams(r *http.Request) (*metrics_explorer.InspectMetric | |||||
if err := json.NewDecoder(r.Body).Decode(&inspectMetricParams); err != nil { | ||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} | ||||||
} | ||||||
if inspectMetricParams.End-inspectMetricParams.Start > 1800000 { // half hour only | ||||||
if inspectMetricParams.End-inspectMetricParams.Start > constants.InspectMetricsMaxTimeDiff { // half hour only | ||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("time duration shouldn't be more than 30 mins")} | ||||||
} | ||||||
return &inspectMetricParams, nil | ||||||
} | ||||||
|
||||||
func ParseUpdateMetricsMetadataParams(r *http.Request) (*metrics_explorer.UpdateMetricsMetadataRequest, *model.ApiError) { | ||||||
var updateMetricsMetadataReq metrics_explorer.UpdateMetricsMetadataRequest | ||||||
if err := json.NewDecoder(r.Body).Decode(&updateMetricsMetadataReq); err != nil { | ||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} | ||||||
} | ||||||
updateMetricsMetadataReq.MetricName = mux.Vars(r)["metric_name"] | ||||||
|
||||||
if updateMetricsMetadataReq.Description == "" { | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("description is required"), | ||||||
} | ||||||
} | ||||||
|
||||||
switch updateMetricsMetadataReq.MetricType { | ||||||
case v3.MetricTypeSum: | ||||||
if updateMetricsMetadataReq.Temporality == "" { | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("temporality is required when metric type is Sum"), | ||||||
} | ||||||
} | ||||||
|
||||||
if updateMetricsMetadataReq.Temporality != v3.Cumulative && updateMetricsMetadataReq.Temporality != v3.Delta { | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("invalid value for temporality"), | ||||||
} | ||||||
} | ||||||
case v3.MetricTypeHistogram: | ||||||
if updateMetricsMetadataReq.Temporality == "" { | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("temporality is required when metric type is Sum"), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the histogram case, the error message refers to 'Sum' instead of 'Histogram'. Update the error message to state that temporality is required when metric type is Histogram.
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
if updateMetricsMetadataReq.IsMonotonic && updateMetricsMetadataReq.Temporality == v3.Delta { | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("histogram metrics with Delta temporality can't be monotonically increased"), | ||||||
} | ||||||
} | ||||||
case v3.MetricTypeGauge: | ||||||
case v3.MetricTypeSummary: | ||||||
if updateMetricsMetadataReq.Temporality == "" { | ||||||
updateMetricsMetadataReq.Temporality = v3.Unspecified | ||||||
} | ||||||
|
||||||
default: | ||||||
return nil, &model.ApiError{ | ||||||
Typ: model.ErrorBadData, | ||||||
Err: fmt.Errorf("invalid metric type"), | ||||||
} | ||||||
} | ||||||
return &updateMetricsMetadataReq, nil | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add validation here. The update metric type should be one of the valid metric types, same for temporality. A gauge can't have temporality. If a metric is being updated to Histogram, and if there are not buckets in the data, we should reject etc... all sensible validations. |
||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incomplete? What are we reading the result into and returning when there is a cache miss?