Skip to content

Commit

Permalink
feat: celery performance APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 committed Jan 7, 2025
1 parent 53ba68a commit 8078cbb
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 3 deletions.
44 changes: 42 additions & 2 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5052,9 +5052,49 @@ func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request)
}

func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery tasks logic for both state and list types

}

func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery performance logic for error, rate, and latency types
messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

filters, err := mq.GetCeleryFilters(messagingQueue.Variables)

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

queryRangeParams, err := mq.CeleryClickHouseQuery(messagingQueue, "celeryperformance", filters)

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)

if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
return
}

resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
}
aH.Respond(w, resp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type QueueFilters struct {
Status []string

TaskName []string

LatencyType []string
}

type CeleryTask struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ func CeleryClickHouseQuery(
PanelType: v3.PanelTypeGraph,
FillGaps: false,
}
case "celeryperformance":

query, err := buildCeleryPerformanceQuery(filters, queryContext, unixMilliStart, unixMilliEnd)
if err != nil {
return nil, err
}
cq = &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: query,
PanelType: v3.PanelTypeGraph,
FillGaps: false,
}
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: unixMilliStart,
Expand All @@ -169,6 +181,81 @@ func CeleryClickHouseQuery(

}

func buildCeleryPerformanceQuery(filters *QueueFilters, queryContext string, unixMilliStart, unixMilliEnd int64) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)

var AttibuteKeys v3.AttributeKey
var AggregateOperator v3.AggregateOperator
var TimeAggregation = v3.TimeAggregationRate
var GroupByKey string
var filterSet = &v3.FilterSet{Operator: "AND"}
var filterItems = make([]v3.FilterItem, 0)

// latency
if filters.QueryFor[0] == "latency" {
AttibuteKeys = v3.AttributeKey{
Key: "duration_nano",
DataType: v3.AttributeKeyDataTypeFloat64,
IsColumn: true,
IsJSON: false,
}
TimeAggregation = v3.TimeAggregation(filters.LatencyType[0])
AggregateOperator = v3.AggregateOperator(filters.LatencyType[0])
if filters.Status[0] == "task" {
GroupByKey = "celery.task_name"
} else if filters.Status[0] == "worker" {
GroupByKey = "celery.hostname"
}
} else if filters.QueryFor[0] == "task" {
TimeAggregation = v3.TimeAggregationRate
AggregateOperator = v3.AggregateOperatorRate
GroupByKey = "celery.hostname"
if filters.Status[0] == "error" {
filterItems = append(filterItems, v3.FilterItem{Key: v3.AttributeKey{Key: "has_error", DataType: v3.AttributeKeyDataTypeBool, IsColumn: true}, Operator: v3.FilterOperatorEqual, Value: "true"})
}

}
// filter by task name
if len(filters.TaskName) > 0 {
filterItems = append(filterItems, v3.FilterItem{Key: v3.AttributeKey{Key: "celery.task_name", DataType: v3.AttributeKeyDataTypeString}, Operator: v3.FilterOperatorIn, Value: filters.TaskName})
}

filterSet.Items = filterItems

chq := &v3.BuilderQuery{
QueryName: queryContext,
DataSource: v3.DataSourceTraces,

StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),

AggregateOperator: AggregateOperator,

AggregateAttribute: AttibuteKeys,

Temporality: v3.Unspecified,

TimeAggregation: TimeAggregation,
SpaceAggregation: v3.SpaceAggregationSum,

Expression: queryContext,

Filters: filterSet,

ReduceTo: v3.ReduceToOperatorAvg,

GroupBy: []v3.AttributeKey{
{
Key: GroupByKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
Limit: 10,
}
bq[queryContext] = chq
return bq, nil
}

func buildCeleryOverviewQuery(metrics string, queryContext string, unixMilliStart, unixMilliEnd int64) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)

Expand Down Expand Up @@ -404,7 +491,7 @@ func BuildQRParamsWithCache(
func GetCeleryFilters(variables map[string]string) (*QueueFilters, error) {
filters := getFilters(variables)
if len(filters.QueryFor) != 1 || len(filters.Status) != 1 {
return nil, fmt.Errorf("query_for and status not found in the request")
return nil, fmt.Errorf("query_for, status, or latency type is not properly set in the request")
}
return filters, nil
}
Expand Down Expand Up @@ -436,6 +523,9 @@ func getFilters(variables map[string]string) *QueueFilters {
if val, ok := variables["task_name"]; ok && val != "" {
filters.TaskName = parseFilter(val)
}
if val, ok := variables["latency_type"]; ok && val != "" {
filters.LatencyType = parseFilter(val)
}

return &filters
}
Expand Down

0 comments on commit 8078cbb

Please sign in to comment.