Skip to content

Commit

Permalink
feat: celery overview
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 committed Jan 6, 2025
1 parent b1d4d38 commit 4f9a90e
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 5 deletions.
34 changes: 33 additions & 1 deletion pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4987,7 +4987,39 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
}

func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery overview logic for both worker and tasks types
messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

queryRangeParams, err := mq.CeleryClickHouseQuery(messagingQueue, "celery-overview")

if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)

if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
return
}

resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
}
aH.Respond(w, resp)
}

func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ type QueueFilters struct {
type CeleryTask struct {
kind string
status string
name string
}

type CeleryTasks interface {
GetKind() string
GetStatus() string
Set(string, string)
GetName() string
}

func (r *CeleryTask) GetKind() string {
Expand All @@ -52,7 +53,6 @@ func (r *CeleryTask) GetStatus() string {
return r.status
}

func (r *CeleryTask) Set(kind, status string) {
r.kind = kind
r.status = status
func (r *CeleryTask) GetName() string {
return r.name
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"go.uber.org/zap"

"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
Expand Down Expand Up @@ -125,6 +126,101 @@ func buildBuilderQueriesProducerBytes(
return bq, nil
}

func CeleryClickHouseQuery(
messagingQueue *MessagingQueue,
queryContext string,
) (*v3.QueryRangeParamsV3, error) {

//start := messagingQueue.Start
//end := messagingQueue.End

unixMilliStart := messagingQueue.Start / 1000000
unixMilliEnd := messagingQueue.End / 1000000

var kind string
//var status string
//var taskName string

if value, ok := messagingQueue.Variables["kind"]; !ok {
zap.L().Error("kind not found in celery /api/v1/messaging-queues/celery/tasks api call")
kind = value
}
if _, ok := messagingQueue.Variables["status"]; !ok {
zap.L().Error("status not found in celery /api/v1/messaging-queues/celery/tasks api call")
//status = value
}
//taskName, _ = messagingQueue.Variables["status"]

var cq *v3.CompositeQuery

switch queryContext {
case "celery-overview":

metrics := ""

if kind == "worker" {
metrics = "flower_worker_online"
} else if kind == "tasks" {
metrics = "flower_worker_number_of_currently_executing_tasks"
}

query, err := buildCeleryOverviewQuery(metrics, queryContext)
if err != nil {
return nil, err
}
cq = &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: query,
PanelType: v3.PanelTypeGraph,
FillGaps: false,
}
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: unixMilliStart,
End: unixMilliEnd,
Step: defaultStepInterval,
CompositeQuery: cq,
Version: "v4",
FormatForWeb: true,
}

return queryRangeParams, nil

}

func buildCeleryOverviewQuery(metrics string, queryContext string) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)

chq := &v3.BuilderQuery{
QueryName: queryContext,
DataSource: v3.DataSourceMetrics,

AggregateAttribute: v3.AttributeKey{
Key: metrics, // flower_worker_online
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
IsJSON: false,
},

Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,

Expression: queryContext,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{
{
Key: "worker",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
}
bq[queryContext] = chq
return bq, nil
}

func buildBuilderQueriesNetwork(
unixMilliStart, unixMilliEnd int64,
attributeCache *Clients,
Expand Down

0 comments on commit 4f9a90e

Please sign in to comment.