diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4161a665ce..030f715dbe 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2501,32 +2501,57 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response // RegisterMessagingQueuesRoutes adds messaging-queues routes func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { - // SubRouter for kafka - kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter() + // Main messaging queues router + messagingQueuesRouter := router.PathPrefix("/api/v1/messaging-queues").Subrouter() + + // Queue Overview route + messagingQueuesRouter.HandleFunc("/queue-overview", am.ViewAccess(aH.getQueueOverview)).Methods(http.MethodPost) + + // ------------------------------------------------- + // Kafka-specific routes + kafkaRouter := messagingQueuesRouter.PathPrefix("/kafka").Subrouter() onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter() + onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter() + partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost) partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost) consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() + consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter() + topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost) topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost) topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost) topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost) spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter() + spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost) + // ------------------------------------------------- + // Celery-specific routes + celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter() + + // Celery overview routes + celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost) + + // Celery tasks routes + celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost) + + // Celery performance routes + celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost) + // for other messaging queues, add SubRouters here } @@ -4196,7 +4221,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * switch req.DataSource { case v3.DataSourceMetrics: - response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, false) + response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true) case v3.DataSourceLogs: response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: @@ -4936,3 +4961,39 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) { } aH.WriteJSON(w, r, field) } + +func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { + // ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters + + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview") + + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + + aH.Respond(w, results) +} + +func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) { + // TODO: Implement celery overview logic for both worker and tasks types +} + +func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) { + // TODO: Implement celery tasks logic for both state and list types +} + +func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) { + // TODO: Implement celery performance logic for error, rate, and latency types +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index de5d83487b..08b13a1ffb 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -22,3 +22,37 @@ type OnboardingResponse struct { Message string `json:"error_message"` Status string `json:"status"` } + +// QueueFilters +// ToDo: add capability of dynamic filtering based on any of the filters +type QueueFilters struct { + ServiceName []string + SpanName []string + Queue []string + Destination []string + Kind []string +} + +type CeleryTask struct { + kind string + status string +} + +type CeleryTasks interface { + GetKind() string + GetStatus() string + Set(string, string) +} + +func (r *CeleryTask) GetKind() string { + return r.kind +} + +func (r *CeleryTask) GetStatus() string { + return r.status +} + +func (r *CeleryTask) Set(kind, status string) { + r.kind = kind + r.status = status +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 9b943acbc8..8f1e010939 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -2,6 +2,7 @@ package kafka import ( "fmt" + "strings" ) func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { @@ -318,6 +319,139 @@ GROUP BY return query } +// generateOverviewSQL builds the ClickHouse SQL query with optional filters. +// If a filter slice is empty, the query does not constrain on that field. +func generateOverviewSQL(start, end int64, filters *QueueFilters) string { + // Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse + startSeconds := float64(start) / 1e9 + endSeconds := float64(end) / 1e9 + + // Compute time range difference in Go + timeRangeSecs := endSeconds - startSeconds + + // Example ts_bucket boundaries (could be your own logic) + tsBucketStart := startSeconds - 1800 + tsBucketEnd := endSeconds + + // Build WHERE clauses for optional filters + // We always require messaging_system IN ('kafka', 'celery'), but + // we add additional AND conditions only if the slices are non-empty. + var whereClauses []string + + // Mandatory base filter: show only kafka/celery + whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')") + + if len(filters.ServiceName) > 0 { + whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName)) + } + if len(filters.SpanName) > 0 { + whereClauses = append(whereClauses, inClause("span_name", filters.SpanName)) + } + if len(filters.Queue) > 0 { + // "queue" in the struct refers to the messaging_system in the DB + whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue)) + } + if len(filters.Destination) > 0 { + whereClauses = append(whereClauses, inClause("destination", filters.Destination)) + } + if len(filters.Kind) > 0 { + whereClauses = append(whereClauses, inClause("kind_string", filters.Kind)) + } + + // Combine all WHERE clauses with AND + whereSQL := strings.Join(whereClauses, "\n AND ") + + if len(whereSQL) > 0 { + whereSQL = fmt.Sprintf("AND %s", whereSQL) + } + + // Final query string + // Note the use of %f for float64 values in fmt.Sprintf + query := fmt.Sprintf(` +WITH + processed_traces AS ( + SELECT + resource_string_service$$name AS service_name, + name AS span_name, + CASE + WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system + WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery' + ELSE 'undefined' + END AS messaging_system, + kind_string, + COALESCE( + NULLIF(attributes_string['messaging.destination.name'], ''), + NULLIF(attributes_string['messaging.destination'], '') + ) AS destination, + durationNano, + status_code + FROM signoz_traces.distributed_signoz_index_v3 + WHERE + timestamp >= toDateTime64(%f, 9) + AND timestamp <= toDateTime64(%f, 9) + AND ts_bucket_start >= toDateTime64(%f, 9) + AND ts_bucket_start <= toDateTime64(%f, 9) + AND ( + attribute_string_messaging$$system = 'kafka' + OR has(attributes_string, 'celery.action') + OR has(attributes_string, 'celery.task_name') + ) + %s + ), + aggregated_metrics AS ( + SELECT + service_name, + span_name, + messaging_system, + destination, + kind_string, + count(*) AS total_count, + sumIf(1, status_code = 2) AS error_count, + quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms + FROM + processed_traces + GROUP BY + service_name, + span_name, + messaging_system, + destination, + kind_string + ) +SELECT + aggregated_metrics.service_name, + aggregated_metrics.span_name, + aggregated_metrics.messaging_system, + aggregated_metrics.destination, + aggregated_metrics.kind_string, + COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, + COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, + aggregated_metrics.p95_latency +FROM + aggregated_metrics +ORDER BY + aggregated_metrics.service_name, + aggregated_metrics.span_name; +`, + startSeconds, endSeconds, + tsBucketStart, tsBucketEnd, + whereSQL, timeRangeSecs, + ) + + return query +} + +// inClause returns SQL like "fieldName IN ('val1','val2','val3')" +func inClause(fieldName string, values []string) string { + // Quote and escape each value for safety + var quoted []string + for _, v := range values { + // Simple escape: replace any single quotes in v + safeVal := strings.ReplaceAll(v, "'", "''") + quoted = append(quoted, fmt.Sprintf("'%s'", safeVal)) + } + return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ",")) +} + func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 tsBucketStart := (start / 1000000000) - 1800 diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 1a60b90e56..b5fca5cf29 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,9 +2,11 @@ package kafka import ( "fmt" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "strings" ) var defaultStepInterval int64 = 60 @@ -14,18 +16,19 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" { return nil, fmt.Errorf("span evaluation feature is disabled and is experimental") } + // ToDo: propagate this through APIs when there are different handlers queueType := KafkaQueue chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) - if err != nil { return nil, err } - var cq *v3.CompositeQuery - - cq, err = buildCompositeQuery(chq, queryContext) + cq, err := buildCompositeQuery(chq, queryContext) + if err != nil { + return nil, err + } queryRangeParams := &v3.QueryRangeParamsV3{ Start: messagingQueue.Start, @@ -42,6 +45,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End + consumerGroup, ok := messagingQueue.Variables["consumer_group"] if !ok { return nil, fmt.Errorf("consumer_group not found in the request") @@ -53,15 +57,18 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin } query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType) - return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { +func buildBuilderQueriesProducerBytes( + unixMilliStart, unixMilliEnd int64, + attributeCache *Clients, +) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) - queryName := fmt.Sprintf("byte_rate") + queryName := "byte_rate" chq := &v3.BuilderQuery{ QueryName: queryName, @@ -102,11 +109,12 @@ func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attrib }, Expression: queryName, ReduceTo: v3.ReduceToOperatorAvg, - GroupBy: []v3.AttributeKey{{ - Key: "service_name", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, { Key: "topic", DataType: v3.AttributeKeyDataTypeString, @@ -118,9 +126,13 @@ func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attrib return bq, nil } -func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { +func buildBuilderQueriesNetwork( + unixMilliStart, unixMilliEnd int64, + attributeCache *Clients, +) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) - queryName := fmt.Sprintf("latency") + queryName := "latency" chq := &v3.BuilderQuery{ QueryName: queryName, @@ -167,11 +179,12 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac }, Expression: queryName, ReduceTo: v3.ReduceToOperatorAvg, - GroupBy: []v3.AttributeKey{{ - Key: "service_name", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, { Key: "client_id", DataType: v3.AttributeKeyDataTypeString, @@ -189,6 +202,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac } func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.QueryRangeParamsV3, error) { + bq := make(map[string]*v3.BuilderQuery) unixMilliStart := messagingQueue.Start / 1000000 @@ -242,7 +256,11 @@ func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.Que return queryRangeParams, nil } -func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { +func BuildQRParamsWithCache( + messagingQueue *MessagingQueue, + queryContext string, + attributeCache *Clients, +) (*v3.QueryRangeParamsV3, error) { queueType := KafkaQueue @@ -254,11 +272,9 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, if queryContext == "throughput" { chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) - if err != nil { return nil, err } - cq, err = buildCompositeQuery(chq, queryContext) } else if queryContext == "fetch-latency" { @@ -271,14 +287,15 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, BuilderQueries: bhq, PanelType: v3.PanelTypeTable, } + } else if queryContext == "producer-throughput-overview" { start := messagingQueue.Start end := messagingQueue.End query := generateProducerPartitionThroughputSQL(start, end, queueType) - cq, err = buildCompositeQuery(&v3.ClickHouseQuery{ Query: query, }, queryContext) + } else if queryContext == "producer-throughput-overview-byte-rate" { bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { @@ -304,22 +321,60 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, return queryRangeParams, err } -func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { +func getFilters(variables map[string]string) *QueueFilters { + return &QueueFilters{ + ServiceName: parseFilter(variables["service_name"]), + SpanName: parseFilter(variables["span_name"]), + Queue: parseFilter(variables["queue"]), + Destination: parseFilter(variables["destination"]), + Kind: parseFilter(variables["kind"]), + } +} + +// parseFilter splits a comma-separated string into a []string. +// Returns an empty slice if the input is blank. +func parseFilter(val string) []string { + if val == "" { + return []string{} + } + // Split on commas, trim whitespace around each part + parts := strings.Split(val, ",") + var out []string + for _, p := range parts { + trimmed := strings.TrimSpace(p) + if trimmed != "" { + out = append(out, trimmed) + } + } + return out +} + +func BuildClickHouseQuery( + messagingQueue *MessagingQueue, + queueType string, + queryContext string, +) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start end := messagingQueue.End var topic, partition string + if queryContext == "producer" || queryContext == "consumer" || queryContext == "consumer_partition_latency" || queryContext == "producer-throughput-details" || queryContext == "consumer-throughput-details" { + var ok bool topic, ok = messagingQueue.Variables["topic"] if !ok { return nil, fmt.Errorf("invalid type for Topic") } - if !(queryContext == "consumer-throughput-details" || queryContext == "producer-throughput-details") { + + if !(queryContext == "consumer-throughput-details" || + queryContext == "producer-throughput-details") { + partition, ok = messagingQueue.Variables["partition"] if !ok { return nil, fmt.Errorf("invalid type for Partition") @@ -328,39 +383,44 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer } var query string - if queryContext == "producer" { + + switch queryContext { + case "overview": + query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables)) + case "producer": query = generateProducerSQL(start, end, topic, partition, queueType) - } else if queryContext == "consumer" { + case "consumer": consumerGroup, ok := messagingQueue.Variables["consumer_group"] if !ok { return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) - } else if queryContext == "producer-topic-throughput" { + case "producer-topic-throughput": query = generatePartitionLatencySQL(start, end, queueType) - } else if queryContext == "consumer_partition_latency" { + case "consumer_partition_latency": query = generateConsumerPartitionLatencySQL(start, end, topic, partition, queueType) - } else if queryContext == "producer-throughput-details" { + case "producer-throughput-details": svcName, ok := messagingQueue.Variables["service_name"] if !ok { return nil, fmt.Errorf("invalid type for service") } query = generateProducerTopicLatencySQL(start, end, topic, svcName, queueType) - } else if queryContext == "consumer-throughput-overview" { + case "consumer-throughput-overview": query = generateConsumerLatencySQL(start, end, queueType) - } else if queryContext == "consumer-throughput-details" { + case "consumer-throughput-details": svcName, ok := messagingQueue.Variables["service_name"] if !ok { return nil, fmt.Errorf("invalid type for service") } query = generateConsumerServiceLatencySQL(start, end, topic, svcName, queueType) - } else if queryContext == "producer-consumer-eval" { + case "producer-consumer-eval": query = generateProducerConsumerEvalSQL(start, end, queueType, messagingQueue.EvalTime) - } else if queryContext == "onboard_producers" { + case "onboard_producers": query = onboardProducersSQL(start, end, queueType) - } else if queryContext == "onboard_consumers" { + case "onboard_consumers": query = onboardConsumerSQL(start, end, queueType) } + return &v3.ClickHouseQuery{ Query: query, }, nil