Skip to content

Commit

Permalink
[feat] Add overview APIs for Celery and Kafka in messaging queues int…
Browse files Browse the repository at this point in the history
…egration (#6756)

* feat: added queue overview api for generic messaging system
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 authored Jan 8, 2025
1 parent 80740f6 commit 505757b
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 38 deletions.
67 changes: 64 additions & 3 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,32 +2501,57 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
// RegisterMessagingQueuesRoutes adds messaging-queues routes
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {

// SubRouter for kafka
kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
// Main messaging queues router
messagingQueuesRouter := router.PathPrefix("/api/v1/messaging-queues").Subrouter()

// Queue Overview route
messagingQueuesRouter.HandleFunc("/queue-overview", am.ViewAccess(aH.getQueueOverview)).Methods(http.MethodPost)

// -------------------------------------------------
// Kafka-specific routes
kafkaRouter := messagingQueuesRouter.PathPrefix("/kafka").Subrouter()

onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()

onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)

partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter()

partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost)
partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost)

consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()

consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)

topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter()

topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost)

spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter()

spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost)

// -------------------------------------------------
// Celery-specific routes
celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter()

// Celery overview routes
celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost)

// Celery tasks routes
celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost)

// Celery performance routes
celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost)

// for other messaging queues, add SubRouters here
}

Expand Down Expand Up @@ -4196,7 +4221,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *

switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, false)
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
Expand Down Expand Up @@ -4936,3 +4961,39 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) {
}
aH.WriteJSON(w, r, field)
}

func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
// ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters

messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview")

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)

aH.Respond(w, results)
}

func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery overview logic for both worker and tasks types
}

func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery tasks logic for both state and list types
}

func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery performance logic for error, rate, and latency types
}
34 changes: 34 additions & 0 deletions pkg/query-service/app/integrations/messagingQueues/kafka/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,37 @@ type OnboardingResponse struct {
Message string `json:"error_message"`
Status string `json:"status"`
}

// QueueFilters
// ToDo: add capability of dynamic filtering based on any of the filters
type QueueFilters struct {
ServiceName []string
SpanName []string
Queue []string
Destination []string
Kind []string
}

type CeleryTask struct {
kind string
status string
}

type CeleryTasks interface {
GetKind() string
GetStatus() string
Set(string, string)
}

func (r *CeleryTask) GetKind() string {
return r.kind
}

func (r *CeleryTask) GetStatus() string {
return r.status
}

func (r *CeleryTask) Set(kind, status string) {
r.kind = kind
r.status = status
}
134 changes: 134 additions & 0 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"strings"
)

func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
Expand Down Expand Up @@ -318,6 +319,139 @@ GROUP BY
return query
}

// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
// If a filter slice is empty, the query does not constrain on that field.
func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
startSeconds := float64(start) / 1e9
endSeconds := float64(end) / 1e9

// Compute time range difference in Go
timeRangeSecs := endSeconds - startSeconds

// Example ts_bucket boundaries (could be your own logic)
tsBucketStart := startSeconds - 1800
tsBucketEnd := endSeconds

// Build WHERE clauses for optional filters
// We always require messaging_system IN ('kafka', 'celery'), but
// we add additional AND conditions only if the slices are non-empty.
var whereClauses []string

// Mandatory base filter: show only kafka/celery
whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')")

if len(filters.ServiceName) > 0 {
whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName))
}
if len(filters.SpanName) > 0 {
whereClauses = append(whereClauses, inClause("span_name", filters.SpanName))
}
if len(filters.Queue) > 0 {
// "queue" in the struct refers to the messaging_system in the DB
whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue))
}
if len(filters.Destination) > 0 {
whereClauses = append(whereClauses, inClause("destination", filters.Destination))
}
if len(filters.Kind) > 0 {
whereClauses = append(whereClauses, inClause("kind_string", filters.Kind))
}

// Combine all WHERE clauses with AND
whereSQL := strings.Join(whereClauses, "\n AND ")

if len(whereSQL) > 0 {
whereSQL = fmt.Sprintf("AND %s", whereSQL)
}

// Final query string
// Note the use of %f for float64 values in fmt.Sprintf
query := fmt.Sprintf(`
WITH
processed_traces AS (
SELECT
resource_string_service$$name AS service_name,
name AS span_name,
CASE
WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
ELSE 'undefined'
END AS messaging_system,
kind_string,
COALESCE(
NULLIF(attributes_string['messaging.destination.name'], ''),
NULLIF(attributes_string['messaging.destination'], '')
) AS destination,
durationNano,
status_code
FROM signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= toDateTime64(%f, 9)
AND timestamp <= toDateTime64(%f, 9)
AND ts_bucket_start >= toDateTime64(%f, 9)
AND ts_bucket_start <= toDateTime64(%f, 9)
AND (
attribute_string_messaging$$system = 'kafka'
OR has(attributes_string, 'celery.action')
OR has(attributes_string, 'celery.task_name')
)
%s
),
aggregated_metrics AS (
SELECT
service_name,
span_name,
messaging_system,
destination,
kind_string,
count(*) AS total_count,
sumIf(1, status_code = 2) AS error_count,
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
FROM
processed_traces
GROUP BY
service_name,
span_name,
messaging_system,
destination,
kind_string
)
SELECT
aggregated_metrics.service_name,
aggregated_metrics.span_name,
aggregated_metrics.messaging_system,
aggregated_metrics.destination,
aggregated_metrics.kind_string,
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
aggregated_metrics.p95_latency
FROM
aggregated_metrics
ORDER BY
aggregated_metrics.service_name,
aggregated_metrics.span_name;
`,
startSeconds, endSeconds,
tsBucketStart, tsBucketEnd,
whereSQL, timeRangeSecs,
)

return query
}

// inClause returns SQL like "fieldName IN ('val1','val2','val3')"
func inClause(fieldName string, values []string) string {
// Quote and escape each value for safety
var quoted []string
for _, v := range values {
// Simple escape: replace any single quotes in v
safeVal := strings.ReplaceAll(v, "'", "''")
quoted = append(quoted, fmt.Sprintf("'%s'", safeVal))
}
return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ","))
}

func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
Expand Down
Loading

0 comments on commit 505757b

Please sign in to comment.