Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: added series aggregation for group by with value type panel #6744

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package helpers

import (
"fmt"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

func AddSeriesAggregation(seriesAggregator v3.SeriesAggregation, query string) string {
queryImpl := "SELECT %s as aggregated_value, ts" +
" FROM (%s)" +
" GROUP BY ts" +
" ORDER BY ts"

var op string
switch seriesAggregator {
case v3.SeriesAggregationAvg:
op = "avg(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SeriesAggregationSum:
op = "sum(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SeriesAggregationMin:
op = "min(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SeriesAggregationMax:
op = "max(value)"
query = fmt.Sprintf(queryImpl, op, query)
}
return query
}
4 changes: 4 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
mq.SpaceAggregation = percentileOperator
}

if panelType == v3.PanelTypeValue && len(mq.GroupBy) > 0 {
query = helpers.AddSeriesAggregation(mq.SeriesAggregation, query)
}

return query, nil
}

Expand Down
146 changes: 146 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,149 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
})
}
}

func TestPrepareMetricQueryValueTypePanelWithGroupBY(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = cumulative, panel = value, series agg = max group by state",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Delta,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SeriesAggregation: v3.SeriesAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Delta' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC) GROUP BY ts ORDER BY ts",
},
{
name: "test temporality = cumulative, panel = value, series agg = max group by state, host_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Cumulative,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SeriesAggregation: v3.SeriesAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
{
Key: "host_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, host_name, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Cumulative' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, host_name, ts ORDER BY state desc, host_name ASC, ts ASC) GROUP BY ts ORDER BY ts",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1735891811000 - Friday, 3 January 2025 13:40:11 GMT+05:30
// 1735894811000 - Friday, 3 January 2025 14:30:11 GMT+05:30
query, err := PrepareMetricQuery(1735891811000, 1735894811000, v3.QueryTypeBuilder, v3.PanelTypeValue, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
3 changes: 3 additions & 0 deletions pkg/query-service/app/queryBuilder/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
for idx, groupBy := range query.GroupBy {
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
}
if params.CompositeQuery.PanelType == v3.PanelTypeValue {
parts = append(parts, fmt.Sprintf("seriesAggregation=%s", query.SeriesAggregation))
}
}

if len(query.Having) > 0 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/query-service/app/queryBuilder/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
SeriesAggregation: v3.SeriesAggregationMax,
Expression: "A",
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
Temporality: v3.Delta,
Expand Down Expand Up @@ -1333,7 +1334,7 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
},
},
expectedCacheKeys: map[string]string{
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&having-0=column:value,op:>,value:100",
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&seriesAggregation=max&having-0=column:value,op:>,value:100",
},
},
{
Expand Down
29 changes: 29 additions & 0 deletions pkg/query-service/model/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,28 @@ func GetPercentileFromOperator(operator SpaceAggregation) float64 {
}
}

type SeriesAggregation string

const (
SeriesAggregationUnspecified SeriesAggregation = ""
SeriesAggregationSum SeriesAggregation = "sum"
SeriesAggregationAvg SeriesAggregation = "avg"
SeriesAggregationMin SeriesAggregation = "min"
SeriesAggregationMax SeriesAggregation = "max"
)

func (s SeriesAggregation) Validate() error {
switch s {
case SeriesAggregationSum,
SeriesAggregationAvg,
SeriesAggregationMin,
SeriesAggregationMax:
return nil
default:
return fmt.Errorf("invalid series aggregation: %s", s)
}
}

type FunctionName string

const (
Expand Down Expand Up @@ -804,6 +826,7 @@ type BuilderQuery struct {
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
SeriesAggregation SeriesAggregation `json:"seriesAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
Expand Down Expand Up @@ -958,6 +981,12 @@ func (b *BuilderQuery) Validate(panelType PanelType) error {
// return fmt.Errorf("group by is not supported for list panel type")
// }

if panelType == PanelTypeValue {
if err := b.SeriesAggregation.Validate(); err != nil {
return fmt.Errorf("series aggregation is required for value type panel with group by: %w", err)
}
}

for _, groupBy := range b.GroupBy {
if err := groupBy.Validate(); err != nil {
return fmt.Errorf("group by is invalid %w", err)
Expand Down
Loading