diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 38c047e18b..505464a6ee 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" "io" "math" "net/http" @@ -30,6 +29,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/inframetrics" "go.signoz.io/signoz/pkg/query-service/app/integrations" + queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 57ab3a1fff..f5a669755f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -79,9 +79,10 @@ func buildBuilderQueriesProducerBytes( Type: v3.AttributeKeyType("Gauge"), IsColumn: true, }, - Temporality: v3.Unspecified, - TimeAggregation: v3.TimeAggregationAvg, - SpaceAggregation: v3.SpaceAggregationAvg, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/sql.go b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go index fcdc0b5a15..68aa85ea22 100644 --- a/pkg/query-service/app/integrations/messagingQueues/queues/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go @@ -12,19 +12,13 @@ func generateOverviewSQL(start, end int64, filters *QueueFilters) string { startSeconds := float64(start) / 1e9 endSeconds := float64(end) / 1e9 - // Compute time range difference in Go timeRangeSecs := endSeconds - startSeconds - // Example ts_bucket boundaries (could be your own logic) tsBucketStart := startSeconds - 1800 tsBucketEnd := endSeconds - // Build WHERE clauses for optional filters - // We always require messaging_system IN ('kafka', 'celery'), but - // we add additional AND conditions only if the slices are non-empty. var whereClauses []string - // Mandatory base filter: show only kafka/celery whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')") if len(filters.ServiceName) > 0 { @@ -34,7 +28,6 @@ func generateOverviewSQL(start, end int64, filters *QueueFilters) string { whereClauses = append(whereClauses, inClause("span_name", filters.SpanName)) } if len(filters.Queue) > 0 { - // "queue" in the struct refers to the messaging_system in the DB whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue)) } if len(filters.Destination) > 0 { @@ -51,8 +44,6 @@ func generateOverviewSQL(start, end int64, filters *QueueFilters) string { whereSQL = fmt.Sprintf("AND %s", whereSQL) } - // Final query string - // Note the use of %f for float64 values in fmt.Sprintf query := fmt.Sprintf(` WITH processed_traces AS (