Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add overview APIs for Celery and Kafka in messaging queues integration #6756

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,32 +2501,57 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
// RegisterMessagingQueuesRoutes adds messaging-queues routes
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {

// SubRouter for kafka
kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
// Main messaging queues router
messagingQueuesRouter := router.PathPrefix("/api/v1/messaging-queues").Subrouter()

// Queue Overview route
messagingQueuesRouter.HandleFunc("/queue-overview", am.ViewAccess(aH.getQueueOverview)).Methods(http.MethodPost)

// -------------------------------------------------
// Kafka-specific routes
kafkaRouter := messagingQueuesRouter.PathPrefix("/kafka").Subrouter()

onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()

onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)

partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter()

partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost)
partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost)

consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()

consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)

topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter()

topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost)

spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter()

spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost)

// -------------------------------------------------
// Celery-specific routes
celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter()

// Celery overview routes
celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost)

// Celery tasks routes
celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost)

// Celery performance routes
celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost)

// for other messaging queues, add SubRouters here
}

Expand Down Expand Up @@ -4196,7 +4221,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *

switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, false)
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
Expand Down Expand Up @@ -4936,3 +4961,39 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) {
}
aH.WriteJSON(w, r, field)
}

func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
// ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters

messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview")

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)

aH.Respond(w, results)
}

func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery overview logic for both worker and tasks types
}

func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery tasks logic for both state and list types
}

func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery performance logic for error, rate, and latency types
}
34 changes: 34 additions & 0 deletions pkg/query-service/app/integrations/messagingQueues/kafka/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,37 @@ type OnboardingResponse struct {
Message string `json:"error_message"`
Status string `json:"status"`
}

// QueueFilters
// ToDo: add capability of dynamic filtering based on any of the filters
type QueueFilters struct {
ServiceName []string
SpanName []string
Queue []string
Destination []string
Kind []string
}
shivanshuraj1333 marked this conversation as resolved.
Show resolved Hide resolved

type CeleryTask struct {
kind string
status string
}

type CeleryTasks interface {
GetKind() string
GetStatus() string
Set(string, string)
}

func (r *CeleryTask) GetKind() string {
return r.kind
}

func (r *CeleryTask) GetStatus() string {
return r.status
}

func (r *CeleryTask) Set(kind, status string) {
r.kind = kind
r.status = status
}
134 changes: 134 additions & 0 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"strings"
)

func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
Expand Down Expand Up @@ -318,6 +319,139 @@ GROUP BY
return query
}

// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
// If a filter slice is empty, the query does not constrain on that field.
func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
startSeconds := float64(start) / 1e9
endSeconds := float64(end) / 1e9

// Compute time range difference in Go
timeRangeSecs := endSeconds - startSeconds

// Example ts_bucket boundaries (could be your own logic)
tsBucketStart := startSeconds - 1800
tsBucketEnd := endSeconds

// Build WHERE clauses for optional filters
// We always require messaging_system IN ('kafka', 'celery'), but
// we add additional AND conditions only if the slices are non-empty.
var whereClauses []string

// Mandatory base filter: show only kafka/celery
whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')")

if len(filters.ServiceName) > 0 {
whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName))
}
if len(filters.SpanName) > 0 {
whereClauses = append(whereClauses, inClause("span_name", filters.SpanName))
}
if len(filters.Queue) > 0 {
// "queue" in the struct refers to the messaging_system in the DB
whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue))
}
if len(filters.Destination) > 0 {
whereClauses = append(whereClauses, inClause("destination", filters.Destination))
}
if len(filters.Kind) > 0 {
whereClauses = append(whereClauses, inClause("kind_string", filters.Kind))
}

// Combine all WHERE clauses with AND
whereSQL := strings.Join(whereClauses, "\n AND ")

if len(whereSQL) > 0 {
whereSQL = fmt.Sprintf("AND %s", whereSQL)
}

// Final query string
// Note the use of %f for float64 values in fmt.Sprintf
query := fmt.Sprintf(`
WITH
processed_traces AS (
SELECT
resource_string_service$$name AS service_name,
name AS span_name,
CASE
ankitnayan marked this conversation as resolved.
Show resolved Hide resolved
WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
ankitnayan marked this conversation as resolved.
Show resolved Hide resolved
ELSE 'undefined'
END AS messaging_system,
kind_string,
COALESCE(
NULLIF(attributes_string['messaging.destination.name'], ''),
NULLIF(attributes_string['messaging.destination'], '')
) AS destination,
ankitnayan marked this conversation as resolved.
Show resolved Hide resolved
durationNano,
status_code
FROM signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= toDateTime64(%f, 9)
AND timestamp <= toDateTime64(%f, 9)
AND ts_bucket_start >= toDateTime64(%f, 9)
AND ts_bucket_start <= toDateTime64(%f, 9)
AND (
attribute_string_messaging$$system = 'kafka'
OR has(attributes_string, 'celery.action')
OR has(attributes_string, 'celery.task_name')
)
%s
),
aggregated_metrics AS (
SELECT
service_name,
span_name,
messaging_system,
destination,
kind_string,
count(*) AS total_count,
sumIf(1, status_code = 2) AS error_count,
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
shivanshuraj1333 marked this conversation as resolved.
Show resolved Hide resolved
FROM
processed_traces
GROUP BY
service_name,
span_name,
messaging_system,
destination,
kind_string
)
SELECT
aggregated_metrics.service_name,
aggregated_metrics.span_name,
aggregated_metrics.messaging_system,
aggregated_metrics.destination,
aggregated_metrics.kind_string,
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
aggregated_metrics.p95_latency
FROM
aggregated_metrics
ORDER BY
aggregated_metrics.service_name,
aggregated_metrics.span_name;
`,
startSeconds, endSeconds,
tsBucketStart, tsBucketEnd,
whereSQL, timeRangeSecs,
)

return query
}

// inClause returns SQL like "fieldName IN ('val1','val2','val3')"
func inClause(fieldName string, values []string) string {
// Quote and escape each value for safety
var quoted []string
for _, v := range values {
// Simple escape: replace any single quotes in v
safeVal := strings.ReplaceAll(v, "'", "''")
quoted = append(quoted, fmt.Sprintf("'%s'", safeVal))
}
return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ","))
}
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
Expand Down
Loading
Loading