Skip to content

Commit

Permalink
feat: modify API to give worker counts
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 committed Jan 8, 2025
1 parent 66d56b2 commit 5213297
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 17 deletions.
31 changes: 30 additions & 1 deletion pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4995,7 +4995,15 @@ func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request)
return
}

queryRangeParams, err := mq.CeleryClickHouseQuery(messagingQueue, "celeryoverview")
filters, err := mq.GetCeleryFilters(messagingQueue.Variables)

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

queryRangeParams, err := mq.CeleryClickHouseQuery(messagingQueue, "celeryoverview", filters)

if err != nil {
zap.L().Error(err.Error())
Expand All @@ -5016,6 +5024,27 @@ func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request)
return
}

if filters.QueryFor[0] == "worker" {

var workerNames = make([]string, 0)
var workerCount int

for _, res := range resultFetchLatency {
for _, series := range res.Series {
if workerName, ok := series.Labels["worker"]; ok {
workerNames = append(workerNames, workerName)
workerCount++
}
}
}

aH.Respond(w, mq.WorkerResponse{
Count: workerCount,
Names: workerNames,
})
return
}

resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/query-service/app/integrations/messagingQueues/kafka/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type OnboardingResponse struct {
Status string `json:"status"`
}

type WorkerResponse struct {
Count int `json:"active_workers"`
Names []string `json:"worker_names"`
}

// QueueFilters
// ToDo: add capability of dynamic filtering based on any of the filters
type QueueFilters struct {
Expand All @@ -31,8 +36,11 @@ type QueueFilters struct {
Queue []string
Destination []string
Kind []string
QueryFor string
Status string

QueryFor []string
Status []string

TaskName []string
}

type CeleryTask struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,22 @@ func buildBuilderQueriesProducerBytes(

func CeleryClickHouseQuery(
messagingQueue *MessagingQueue,
queryContext string,
queryContext string, filters *QueueFilters,
) (*v3.QueryRangeParamsV3, error) {

//start := messagingQueue.Start
//end := messagingQueue.End

unixMilliStart := messagingQueue.Start / 1000000
unixMilliEnd := messagingQueue.End / 1000000

kind := messagingQueue.Variables["kind"]

var cq *v3.CompositeQuery

switch queryContext {
case "celeryoverview":

metrics := ""

if kind == "worker" {
if filters.QueryFor[0] == "worker" {
metrics = "flower_worker_online"
} else if kind == "tasks" {
} else if filters.QueryFor[0] == "tasks" {
metrics = "flower_worker_number_of_currently_executing_tasks"
}

Expand Down Expand Up @@ -407,14 +402,43 @@ func BuildQRParamsWithCache(
return queryRangeParams, err
}

func GetCeleryFilters(variables map[string]string) (*QueueFilters, error) {
filters := getFilters(variables)
if len(filters.QueryFor) != 1 || len(filters.Status) != 1 {
return nil, fmt.Errorf("query_for and status not found in the request")
}
return filters, nil
}

func getFilters(variables map[string]string) *QueueFilters {
return &QueueFilters{
ServiceName: parseFilter(variables["service_name"]),
SpanName: parseFilter(variables["span_name"]),
Queue: parseFilter(variables["queue"]),
Destination: parseFilter(variables["destination"]),
Kind: parseFilter(variables["kind"]),
var filters QueueFilters

if val, ok := variables["service_name"]; ok && val != "" {
filters.ServiceName = parseFilter(val)
}
if val, ok := variables["span_name"]; ok && val != "" {
filters.SpanName = parseFilter(val)
}
if val, ok := variables["queue"]; ok && val != "" {
filters.Queue = parseFilter(val)
}
if val, ok := variables["destination"]; ok && val != "" {
filters.Destination = parseFilter(val)
}
if val, ok := variables["kind"]; ok && val != "" {
filters.Kind = parseFilter(val)
}
if val, ok := variables["query_for"]; ok && val != "" {
filters.QueryFor = parseFilter(val)
}
if val, ok := variables["status"]; ok && val != "" {
filters.Status = parseFilter(val)
}
if val, ok := variables["task_name"]; ok && val != "" {
filters.TaskName = parseFilter(val)
}

return &filters
}

// parseFilter splits a comma-separated string into a []string.
Expand Down

0 comments on commit 5213297

Please sign in to comment.