Skip to content

Commit

Permalink
feat: add filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 committed Jan 6, 2025
1 parent c2fd105 commit a91a30b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type OnboardingResponse struct {
// QueueFilters
// ToDo: add capability of dynamic filtering based on any of the filters
type QueueFilters struct {
serviceName string
spanName string
queue string
destination string
kind string
ServiceName []string
SpanName []string
Queue []string
Destination []string
Kind []string
}

type CeleryTask struct {
Expand Down
93 changes: 69 additions & 24 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"strings"
)

func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
Expand Down Expand Up @@ -318,19 +319,50 @@ GROUP BY
return query
}

func generateOverviewSQL(start, end int64) string {
// Convert from nanoseconds to float seconds in Go
// to avoid decimal overflow in ClickHouse
// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
// If a filter slice is empty, the query does not constrain on that field.
func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
startSeconds := float64(start) / 1e9
endSeconds := float64(end) / 1e9

// For timeRange, if you want the difference in the DB:
timeRangeSecs := endSeconds - startSeconds // float64 difference in Go
// Compute time range difference in Go
timeRangeSecs := endSeconds - startSeconds

// If you really need tsBucketStart, do the math in Go as well
tsBucketStart := startSeconds - 1800 // example
// Example ts_bucket boundaries (could be your own logic)
tsBucketStart := startSeconds - 1800
tsBucketEnd := endSeconds

// Build WHERE clauses for optional filters
// We always require messaging_system IN ('kafka', 'celery'), but
// we add additional AND conditions only if the slices are non-empty.
var whereClauses []string

// Mandatory base filter: show only kafka/celery
whereClauses = append(whereClauses, "aggregated_metrics.messaging_system IN ('kafka', 'celery')")

if len(filters.ServiceName) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.service_name", filters.ServiceName))
}
if len(filters.SpanName) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.span_name", filters.SpanName))
}
if len(filters.Queue) > 0 {
// "queue" in the struct refers to the messaging_system in the DB
whereClauses = append(whereClauses, inClause("aggregated_metrics.messaging_system", filters.Queue))
}
if len(filters.Destination) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.destination", filters.Destination))
}
if len(filters.Kind) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.kind_string", filters.Kind))
}

// Combine all WHERE clauses with AND
whereSQL := strings.Join(whereClauses, "\n AND ")

// Final query string
// Note the use of %f for float64 values in fmt.Sprintf
query := fmt.Sprintf(`
WITH
timeRange AS (
Expand All @@ -345,7 +377,7 @@ WITH
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
ELSE 'other'
END AS messaging_system,
kind_string AS kind,
kind_string,
COALESCE(
NULLIF(attributes_string['messaging.destination.name'], ''),
NULLIF(attributes_string['messaging.destination'], '')
Expand All @@ -356,7 +388,6 @@ WITH
WHERE
timestamp >= toDateTime64(%f, 9)
AND timestamp <= toDateTime64(%f, 9)
-- Only if your schema has ts_bucket_start (Float64 or DateTime64)
AND ts_bucket_start >= toDateTime64(%f, 9)
AND ts_bucket_start <= toDateTime64(%f, 9)
AND (
Expand All @@ -371,7 +402,7 @@ WITH
span_name,
messaging_system,
destination,
kind,
kind_string,
count(*) AS total_count,
sumIf(1, status_code = 2) AS error_count,
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
Expand All @@ -382,33 +413,47 @@ WITH
span_name,
messaging_system,
destination,
kind
kind_string
)
SELECT
service_name,
span_name,
messaging_system,
destination,
kind,
COALESCE(total_count / timeRange.seconds, 0) AS throughput,
COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
p95_latency
aggregated_metrics.service_name,
aggregated_metrics.span_name,
aggregated_metrics.messaging_system,
aggregated_metrics.destination,
aggregated_metrics.kind_string,
COALESCE(aggregated_metrics.total_count / timeRange.seconds, 0) AS throughput,
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
aggregated_metrics.p95_latency
FROM
aggregated_metrics,
timeRange
aggregated_metrics
CROSS JOIN timeRange
WHERE
messaging_system IN ('kafka', 'celery')
%s
ORDER BY
service_name,
span_name;`,
aggregated_metrics.service_name,
aggregated_metrics.span_name;
`,
timeRangeSecs, // timeRange AS (SELECT %f AS seconds)
startSeconds, endSeconds,
tsBucketStart, tsBucketEnd,
whereSQL,
)

return query
}

// inClause returns SQL like "fieldName IN ('val1','val2','val3')"
func inClause(fieldName string, values []string) string {
// Quote and escape each value for safety
var quoted []string
for _, v := range values {
// Simple escape: replace any single quotes in v
safeVal := strings.ReplaceAll(v, "'", "''")
quoted = append(quoted, fmt.Sprintf("'%s'", safeVal))
}
return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ","))
}

func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"strings"
)

var defaultStepInterval int64 = 60
Expand Down Expand Up @@ -320,6 +321,34 @@ func BuildQRParamsWithCache(
return queryRangeParams, err
}

func getFilters(variables map[string]string) *QueueFilters {
return &QueueFilters{
ServiceName: parseFilter(variables["service_name"]),
SpanName: parseFilter(variables["span_name"]),
Queue: parseFilter(variables["queue"]),
Destination: parseFilter(variables["destination"]),
Kind: parseFilter(variables["kind"]),
}
}

// parseFilter splits a comma-separated string into a []string.
// Returns an empty slice if the input is blank.
func parseFilter(val string) []string {
if val == "" {
return []string{}
}
// Split on commas, trim whitespace around each part
parts := strings.Split(val, ",")
var out []string
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed != "" {
out = append(out, trimmed)
}
}
return out
}

func BuildClickHouseQuery(
messagingQueue *MessagingQueue,
queueType string,
Expand Down Expand Up @@ -357,7 +386,7 @@ func BuildClickHouseQuery(

switch queryContext {
case "overview":
query = generateOverviewSQL(start, end)
query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables))
case "producer":
query = generateProducerSQL(start, end, topic, partition, queueType)
case "consumer":
Expand Down

0 comments on commit a91a30b

Please sign in to comment.