Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added series aggregation for group by with value type panel | 5949 #6744

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package helpers

import (
"fmt"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

func AddSecondaryAggregation(seriesAggregator v3.SecondaryAggregation, query string) string {
queryImpl := "SELECT %s as aggregated_value, ts" +
" FROM (%s)" +
" GROUP BY ts" +
" ORDER BY ts"

var op string
switch seriesAggregator {
case v3.SecondaryAggregationAvg:
op = "avg(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationSum:
op = "sum(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationMin:
op = "min(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationMax:
op = "max(value)"
query = fmt.Sprintf(queryImpl, op, query)
}
return query
}
4 changes: 4 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
mq.SpaceAggregation = percentileOperator
}

if panelType == v3.PanelTypeValue && len(mq.GroupBy) > 0 {
query = helpers.AddSecondaryAggregation(mq.SecondaryAggregation, query)
}

return query, nil
}

Expand Down
146 changes: 146 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,149 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
})
}
}

func TestPrepareMetricQueryValueTypePanelWithGroupBY(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = cumulative, panel = value, series agg = max group by state",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Delta,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SecondaryAggregation: v3.SecondaryAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Delta' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC) GROUP BY ts ORDER BY ts",
},
{
name: "test temporality = cumulative, panel = value, series agg = max group by state, host_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Cumulative,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SecondaryAggregation: v3.SecondaryAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
{
Key: "host_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, host_name, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Cumulative' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, host_name, ts ORDER BY state desc, host_name ASC, ts ASC) GROUP BY ts ORDER BY ts",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1735891811000 - Friday, 3 January 2025 13:40:11 GMT+05:30
// 1735894811000 - Friday, 3 January 2025 14:30:11 GMT+05:30
query, err := PrepareMetricQuery(1735891811000, 1735894811000, v3.QueryTypeBuilder, v3.PanelTypeValue, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
3 changes: 3 additions & 0 deletions pkg/query-service/app/queryBuilder/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
for idx, groupBy := range query.GroupBy {
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
}
if params.CompositeQuery.PanelType == v3.PanelTypeValue {
parts = append(parts, fmt.Sprintf("secondaryAggregation=%s", query.SecondaryAggregation))
}
}

if len(query.Having) > 0 {
Expand Down
17 changes: 9 additions & 8 deletions pkg/query-service/app/queryBuilder/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,13 +1300,14 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
Temporality: v3.Delta,
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
SecondaryAggregation: v3.SecondaryAggregationMax,
Expression: "A",
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
Expand All @@ -1333,7 +1334,7 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
},
},
expectedCacheKeys: map[string]string{
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&having-0=column:value,op:>,value:100",
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&secondaryAggregation=max&having-0=column:value,op:>,value:100",
},
},
{
Expand Down
71 changes: 50 additions & 21 deletions pkg/query-service/model/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,28 @@ func GetPercentileFromOperator(operator SpaceAggregation) float64 {
}
}

type SecondaryAggregation string

const (
SecondaryAggregationUnspecified SecondaryAggregation = ""
SecondaryAggregationSum SecondaryAggregation = "sum"
SecondaryAggregationAvg SecondaryAggregation = "avg"
SecondaryAggregationMin SecondaryAggregation = "min"
SecondaryAggregationMax SecondaryAggregation = "max"
)

func (s SecondaryAggregation) Validate() error {
switch s {
case SecondaryAggregationSum,
SecondaryAggregationAvg,
SecondaryAggregationMin,
SecondaryAggregationMax:
return nil
default:
return fmt.Errorf("invalid series aggregation: %s", s)
}
}

type FunctionName string

const (
Expand Down Expand Up @@ -784,27 +806,28 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter {
}

type BuilderQuery struct {
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
Expand Down Expand Up @@ -958,6 +981,12 @@ func (b *BuilderQuery) Validate(panelType PanelType) error {
// return fmt.Errorf("group by is not supported for list panel type")
// }

if panelType == PanelTypeValue {
if err := b.SecondaryAggregation.Validate(); err != nil {
return fmt.Errorf("series aggregation is required for value type panel with group by: %w", err)
}
}
Copy link
Member

@srikanthccv srikanthccv Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panels are going to throw an error because you are making it mandatory to have a valid aggregation input for secondary aggregation for value panel types, please make it optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but its inside if b.GroupBy != nil {} so only if groupBY is not nil and value type is panel, then only it will validate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grouby can be a non-nil but empty list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the panelType == PanelTypeValue && len(b.GroupBy) > 0 check


for _, groupBy := range b.GroupBy {
if err := groupBy.Validate(); err != nil {
return fmt.Errorf("group by is invalid %w", err)
Expand Down
Loading