diff --git a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml index 0b9171260e..becc6fcda3 100644 --- a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml @@ -130,7 +130,7 @@ services: restart_policy: condition: on-failure query-service: - image: signoz/query-service:0.67.0 + image: signoz/query-service:0.67.1 command: ["-config=/root/config/prometheus.yml", "--use-logs-new-schema=true", "--use-trace-new-schema=true"] # ports: # - "6060:6060" # pprof port @@ -158,7 +158,7 @@ services: condition: on-failure !!merge <<: *db-depend frontend: - image: signoz/frontend:0.67.0 + image: signoz/frontend:0.67.1 deploy: restart_policy: condition: on-failure diff --git a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml index 4f56e9889c..5b3c2c9966 100644 --- a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml @@ -145,7 +145,7 @@ services: - --storage.path=/data # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` query-service: - image: signoz/query-service:${DOCKER_TAG:-0.67.0} + image: signoz/query-service:${DOCKER_TAG:-0.67.1} container_name: signoz-query-service command: ["-config=/root/config/prometheus.yml", "--use-logs-new-schema=true", "--use-trace-new-schema=true"] # ports: @@ -172,7 +172,7 @@ services: retries: 3 !!merge <<: *db-depend frontend: - image: signoz/frontend:${DOCKER_TAG:-0.67.0} + image: signoz/frontend:${DOCKER_TAG:-0.67.1} container_name: signoz-frontend restart: on-failure depends_on: diff --git a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml index 7f6f124c76..f87ef1c07c 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml @@ -148,7 +148,7 @@ services: - --storage.path=/data # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` query-service: - image: signoz/query-service:${DOCKER_TAG:-0.67.0} + image: signoz/query-service:${DOCKER_TAG:-0.67.1} container_name: signoz-query-service command: ["-config=/root/config/prometheus.yml", "-gateway-url=https://api.staging.signoz.cloud", "--use-logs-new-schema=true", "--use-trace-new-schema=true"] # ports: @@ -176,7 +176,7 @@ services: retries: 3 !!merge <<: *db-depend frontend: - image: signoz/frontend:${DOCKER_TAG:-0.67.0} + image: signoz/frontend:${DOCKER_TAG:-0.67.1} container_name: signoz-frontend restart: on-failure depends_on: diff --git a/frontend/public/locales/en/billings.json b/frontend/public/locales/en/billings.json index fb706e002f..ca368eeed6 100644 --- a/frontend/public/locales/en/billings.json +++ b/frontend/public/locales/en/billings.json @@ -3,7 +3,9 @@ "billing": "Billing", "manage_billing_and_costs": "Manage your billing information, invoices, and monitor costs.", "enterprise_cloud": "Enterprise Cloud", + "teams_cloud": "Teams Cloud", "enterprise": "Enterprise", + "teams": "Teams", "card_details_recieved_and_billing_info": "We have received your card details, your billing will only start after the end of your free trial period.", "upgrade_plan": "Upgrade Plan", "manage_billing": "Manage Billing", diff --git a/frontend/src/container/BillingContainer/BillingContainer.tsx b/frontend/src/container/BillingContainer/BillingContainer.tsx index 9ddbd8fa92..55e35deaa6 100644 --- a/frontend/src/container/BillingContainer/BillingContainer.tsx +++ b/frontend/src/container/BillingContainer/BillingContainer.tsx @@ -430,7 +430,7 @@ export default function BillingContainer(): JSX.Element { - {isCloudUserVal ? t('enterprise_cloud') : t('enterprise')}{' '} + {isCloudUserVal ? t('teams_cloud') : t('teams')}{' '} {isFreeTrial ? Free Trial : ''} {!isLoading && !isFetchingBillingData ? ( diff --git a/frontend/src/container/GridCardLayout/GridCard/index.tsx b/frontend/src/container/GridCardLayout/GridCard/index.tsx index 66ce70fb86..09b6b65e1a 100644 --- a/frontend/src/container/GridCardLayout/GridCard/index.tsx +++ b/frontend/src/container/GridCardLayout/GridCard/index.tsx @@ -44,6 +44,7 @@ function GridCardGraph({ toScrollWidgetId, setToScrollWidgetId, variablesToGetUpdated, + setDashboardQueryRangeCalled, } = useDashboard(); const { minTime, maxTime, selectedTime: globalSelectedInterval } = useSelector< AppState, @@ -202,11 +203,13 @@ function GridCardGraph({ refetchOnMount: false, onError: (error) => { setErrorMessage(error.message); + setDashboardQueryRangeCalled(true); }, onSettled: (data) => { dataAvailable?.( isDataAvailableByPanelType(data?.payload?.data, widget?.panelTypes), ); + setDashboardQueryRangeCalled(true); }, }, ); diff --git a/frontend/src/container/GridCardLayout/GridCardLayout.tsx b/frontend/src/container/GridCardLayout/GridCardLayout.tsx index 180539fb84..fa125091e6 100644 --- a/frontend/src/container/GridCardLayout/GridCardLayout.tsx +++ b/frontend/src/container/GridCardLayout/GridCardLayout.tsx @@ -1,5 +1,6 @@ import './GridCardLayout.styles.scss'; +import * as Sentry from '@sentry/react'; import { Color } from '@signozhq/design-tokens'; import { Button, Form, Input, Modal, Typography } from 'antd'; import { useForm } from 'antd/es/form/Form'; @@ -61,6 +62,8 @@ function GraphLayout(props: GraphLayoutProps): JSX.Element { setPanelMap, setSelectedDashboard, isDashboardLocked, + dashboardQueryRangeCalled, + setDashboardQueryRangeCalled, } = useDashboard(); const { data } = selectedDashboard || {}; const { pathname } = useLocation(); @@ -124,6 +127,25 @@ function GraphLayout(props: GraphLayoutProps): JSX.Element { setDashboardLayout(sortLayout(layouts)); }, [layouts]); + useEffect(() => { + setDashboardQueryRangeCalled(false); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + useEffect(() => { + const timeoutId = setTimeout(() => { + // Send Sentry event if query_range is not called within expected timeframe (2 mins) when there are widgets + if (!dashboardQueryRangeCalled && data?.widgets?.length) { + Sentry.captureEvent({ + message: `Dashboard query range not called within expected timeframe even when there are ${data?.widgets?.length} widgets`, + level: 'warning', + }); + } + }, 120000); + + return (): void => clearTimeout(timeoutId); + }, [dashboardQueryRangeCalled, data?.widgets?.length]); + const logEventCalledRef = useRef(false); useEffect(() => { if (!logEventCalledRef.current && !isUndefined(data)) { diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/DashboardVariableSelection.tsx b/frontend/src/container/NewDashboard/DashboardVariablesSelection/DashboardVariableSelection.tsx index 813185e0b6..8276266cec 100644 --- a/frontend/src/container/NewDashboard/DashboardVariablesSelection/DashboardVariableSelection.tsx +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/DashboardVariableSelection.tsx @@ -1,9 +1,19 @@ import { Row } from 'antd'; -import { isNull } from 'lodash-es'; +import { isEmpty } from 'lodash-es'; import { useDashboard } from 'providers/Dashboard/Dashboard'; import { memo, useEffect, useState } from 'react'; +import { useSelector } from 'react-redux'; +import { AppState } from 'store/reducers'; import { IDashboardVariable } from 'types/api/dashboard/getAll'; - +import { GlobalReducer } from 'types/reducer/globalTime'; + +import { + buildDependencies, + buildDependencyGraph, + buildParentDependencyGraph, + IDependencyData, + onUpdateVariableNode, +} from './util'; import VariableItem from './VariableItem'; function DashboardVariableSelection(): JSX.Element | null { @@ -21,6 +31,14 @@ function DashboardVariableSelection(): JSX.Element | null { const [variablesTableData, setVariablesTableData] = useState([]); + const [dependencyData, setDependencyData] = useState( + null, + ); + + const { maxTime, minTime } = useSelector( + (state) => state.globalTime, + ); + useEffect(() => { if (variables) { const tableRowData = []; @@ -43,35 +61,46 @@ function DashboardVariableSelection(): JSX.Element | null { } }, [variables]); - const onVarChanged = (name: string): void => { - /** - * this function takes care of adding the dependent variables to current update queue and removing - * the updated variable name from the queue - */ - const dependentVariables = variablesTableData - ?.map((variable: any) => { - if (variable.type === 'QUERY') { - const re = new RegExp(`\\{\\{\\s*?\\.${name}\\s*?\\}\\}`); // regex for `{{.var}}` - const queryValue = variable.queryValue || ''; - const dependVarReMatch = queryValue.match(re); - if (dependVarReMatch !== null && dependVarReMatch.length > 0) { - return variable.name; - } - } - return null; - }) - .filter((val: string | null) => !isNull(val)); - setVariablesToGetUpdated((prev) => [ - ...prev.filter((v) => v !== name), - ...dependentVariables, - ]); - }; + useEffect(() => { + if (variablesTableData.length > 0) { + const depGrp = buildDependencies(variablesTableData); + const { order, graph } = buildDependencyGraph(depGrp); + const parentDependencyGraph = buildParentDependencyGraph(graph); + + // cleanup order to only include variables that are of type 'QUERY' + const cleanedOrder = order.filter((variable) => { + const variableData = variablesTableData.find( + (v: IDashboardVariable) => v.name === variable, + ); + return variableData?.type === 'QUERY'; + }); + + setDependencyData({ + order: cleanedOrder, + graph, + parentDependencyGraph, + }); + } + }, [setVariablesToGetUpdated, variables, variablesTableData]); + + // this handles the case where the dependency order changes i.e. variable list updated via creation or deletion etc. and we need to refetch the variables + // also trigger when the global time changes + useEffect( + () => { + if (!isEmpty(dependencyData?.order)) { + setVariablesToGetUpdated(dependencyData?.order || []); + } + }, + // eslint-disable-next-line react-hooks/exhaustive-deps + [JSON.stringify(dependencyData?.order), minTime, maxTime], + ); const onValueUpdate = ( name: string, id: string, value: IDashboardVariable['selectedValue'], allSelected: boolean, + // isMountedCall?: boolean, // eslint-disable-next-line sonarjs/cognitive-complexity ): void => { if (id) { @@ -111,7 +140,20 @@ function DashboardVariableSelection(): JSX.Element | null { }); } - onVarChanged(name); + if (dependencyData) { + const updatedVariables: string[] = []; + onUpdateVariableNode( + name, + dependencyData.graph, + dependencyData.order, + (node) => updatedVariables.push(node), + ); + setVariablesToGetUpdated((prev) => [ + ...new Set([...prev, ...updatedVariables.filter((v) => v !== name)]), + ]); + } else { + setVariablesToGetUpdated((prev) => prev.filter((v) => v !== name)); + } } }; @@ -139,6 +181,7 @@ function DashboardVariableSelection(): JSX.Element | null { onValueUpdate={onValueUpdate} variablesToGetUpdated={variablesToGetUpdated} setVariablesToGetUpdated={setVariablesToGetUpdated} + dependencyData={dependencyData} /> ))} diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.test.tsx b/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.test.tsx index 1cb89d6b95..823cf53923 100644 --- a/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.test.tsx +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.test.tsx @@ -49,6 +49,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); @@ -65,6 +70,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); @@ -80,6 +90,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); @@ -109,6 +124,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); @@ -133,6 +153,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); @@ -149,6 +174,11 @@ describe('VariableItem', () => { onValueUpdate={mockOnValueUpdate} variablesToGetUpdated={[]} setVariablesToGetUpdated={(): void => {}} + dependencyData={{ + order: [], + graph: {}, + parentDependencyGraph: {}, + }} /> , ); diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.tsx b/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.tsx index 398ade8259..7e6c050653 100644 --- a/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.tsx +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/VariableItem.tsx @@ -35,12 +35,10 @@ import { popupContainer } from 'utils/selectPopupContainer'; import { variablePropsToPayloadVariables } from '../utils'; import { SelectItemStyle } from './styles'; -import { areArraysEqual } from './util'; +import { areArraysEqual, checkAPIInvocation, IDependencyData } from './util'; const ALL_SELECT_VALUE = '__ALL__'; -const variableRegexPattern = /\{\{\s*?\.([^\s}]+)\s*?\}\}/g; - enum ToggleTagValue { Only = 'Only', All = 'All', @@ -57,6 +55,7 @@ interface VariableItemProps { ) => void; variablesToGetUpdated: string[]; setVariablesToGetUpdated: React.Dispatch>; + dependencyData: IDependencyData | null; } const getSelectValue = ( @@ -79,6 +78,7 @@ function VariableItem({ onValueUpdate, variablesToGetUpdated, setVariablesToGetUpdated, + dependencyData, }: VariableItemProps): JSX.Element { const [optionsData, setOptionsData] = useState<(string | number | boolean)[]>( [], @@ -88,60 +88,20 @@ function VariableItem({ (state) => state.globalTime, ); - useEffect(() => { - if (variableData.allSelected && variableData.type === 'QUERY') { - setVariablesToGetUpdated((prev) => { - const variablesQueue = [...prev.filter((v) => v !== variableData.name)]; - if (variableData.name) { - variablesQueue.push(variableData.name); - } - return variablesQueue; - }); + const validVariableUpdate = (): boolean => { + if (!variableData.name) { + return false; } - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [minTime, maxTime]); - const [errorMessage, setErrorMessage] = useState(null); - - const getDependentVariables = (queryValue: string): string[] => { - const matches = queryValue.match(variableRegexPattern); - - // Extract variable names from the matches array without {{ . }} - return matches - ? matches.map((match) => match.replace(variableRegexPattern, '$1')) - : []; - }; - - const getQueryKey = (variableData: IDashboardVariable): string[] => { - let dependentVariablesStr = ''; - - const dependentVariables = getDependentVariables( - variableData.queryValue || '', + // variableData.name is present as the top element or next in the queue - variablesToGetUpdated + return Boolean( + variablesToGetUpdated.length && + variablesToGetUpdated[0] === variableData.name, ); - - const variableName = variableData.name || ''; - - dependentVariables?.forEach((element) => { - const [, variable] = - Object.entries(existingVariables).find( - ([, value]) => value.name === element, - ) || []; - - dependentVariablesStr += `${element}${variable?.selectedValue}`; - }); - - const variableKey = dependentVariablesStr.replace(/\s/g, ''); - - // added this time dependency for variables query as API respects the passed time range now - return [ - REACT_QUERY_KEY.DASHBOARD_BY_ID, - variableName, - variableKey, - `${minTime}`, - `${maxTime}`, - ]; }; + const [errorMessage, setErrorMessage] = useState(null); + // eslint-disable-next-line sonarjs/cognitive-complexity const getOptions = (variablesRes: VariableResponseProps | null): void => { if (variablesRes && variableData.type === 'QUERY') { @@ -184,9 +144,7 @@ function VariableItem({ if ( variableData.type === 'QUERY' && variableData.name && - (variablesToGetUpdated.includes(variableData.name) || - valueNotInList || - variableData.allSelected) + (validVariableUpdate() || valueNotInList || variableData.allSelected) ) { let value = variableData.selectedValue; let allSelected = false; @@ -224,36 +182,64 @@ function VariableItem({ } }; - const { isLoading } = useQuery(getQueryKey(variableData), { - enabled: variableData && variableData.type === 'QUERY', - queryFn: () => - dashboardVariablesQuery({ - query: variableData.queryValue || '', - variables: variablePropsToPayloadVariables(existingVariables), - }), - refetchOnWindowFocus: false, - onSuccess: (response) => { - getOptions(response.payload); - }, - onError: (error: { - details: { - error: string; - }; - }) => { - const { details } = error; - - if (details.error) { - let message = details.error; - if (details.error.includes('Syntax error:')) { - message = - 'Please make sure query is valid and dependent variables are selected'; + const { isLoading } = useQuery( + [ + REACT_QUERY_KEY.DASHBOARD_BY_ID, + variableData.name || '', + `${minTime}`, + `${maxTime}`, + JSON.stringify(dependencyData?.order), + ], + { + enabled: + variableData && + variableData.type === 'QUERY' && + checkAPIInvocation( + variablesToGetUpdated, + variableData, + dependencyData?.parentDependencyGraph, + ), + queryFn: () => + dashboardVariablesQuery({ + query: variableData.queryValue || '', + variables: variablePropsToPayloadVariables(existingVariables), + }), + refetchOnWindowFocus: false, + onSuccess: (response) => { + getOptions(response.payload); + setVariablesToGetUpdated((prev) => + prev.filter((v) => v !== variableData.name), + ); + }, + onError: (error: { + details: { + error: string; + }; + }) => { + const { details } = error; + + if (details.error) { + let message = details.error; + if (details.error.includes('Syntax error:')) { + message = + 'Please make sure query is valid and dependent variables are selected'; + } + setErrorMessage(message); } - setErrorMessage(message); - } + }, }, - }); + ); const handleChange = (value: string | string[]): void => { + // if value is equal to selected value then return + if ( + value === variableData.selectedValue || + (Array.isArray(value) && + Array.isArray(variableData.selectedValue) && + areArraysEqual(value, variableData.selectedValue)) + ) { + return; + } if (variableData.name) { if ( value === ALL_SELECT_VALUE || diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/dashboardVariables.test.tsx b/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/dashboardVariables.test.tsx new file mode 100644 index 0000000000..0add4c5cad --- /dev/null +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/dashboardVariables.test.tsx @@ -0,0 +1,241 @@ +import { + buildDependencies, + buildDependencyGraph, + buildParentDependencyGraph, + checkAPIInvocation, + onUpdateVariableNode, + VariableGraph, +} from '../util'; +import { + buildDependenciesMock, + buildGraphMock, + checkAPIInvocationMock, + onUpdateVariableNodeMock, +} from './mock'; + +describe('dashboardVariables - utilities and processors', () => { + describe('onUpdateVariableNode', () => { + const { graph, topologicalOrder } = onUpdateVariableNodeMock; + const testCases = [ + { + scenario: 'root element', + nodeToUpdate: 'deployment_environment', + expected: [ + 'deployment_environment', + 'service_name', + 'endpoint', + 'http_status_code', + ], + }, + { + scenario: 'middle child', + nodeToUpdate: 'k8s_node_name', + expected: ['k8s_node_name', 'k8s_namespace_name'], + }, + { + scenario: 'leaf element', + nodeToUpdate: 'http_status_code', + expected: ['http_status_code'], + }, + { + scenario: 'node not in graph', + nodeToUpdate: 'unknown', + expected: [], + }, + { + scenario: 'node not in topological order', + nodeToUpdate: 'unknown', + expected: [], + }, + ]; + + test.each(testCases)( + 'should update variable node when $scenario', + ({ nodeToUpdate, expected }) => { + const updatedVariables: string[] = []; + const callback = (node: string): void => { + updatedVariables.push(node); + }; + + onUpdateVariableNode(nodeToUpdate, graph, topologicalOrder, callback); + + expect(updatedVariables).toEqual(expected); + }, + ); + + it('should return empty array when topological order is empty', () => { + const updatedVariables: string[] = []; + onUpdateVariableNode('http_status_code', graph, [], (node) => + updatedVariables.push(node), + ); + expect(updatedVariables).toEqual([]); + }); + }); + + describe('checkAPIInvocation', () => { + const { + variablesToGetUpdated, + variableData, + parentDependencyGraph, + } = checkAPIInvocationMock; + + const mockRootElement = { + name: 'deployment_environment', + key: '036a47cd-9ffc-47de-9f27-0329198964a8', + id: '036a47cd-9ffc-47de-9f27-0329198964a8', + modificationUUID: '5f71b591-f583-497c-839d-6a1590c3f60f', + selectedValue: 'production', + type: 'QUERY', + // ... other properties omitted for brevity + } as any; + + describe('edge cases', () => { + it('should return false when variableData is empty', () => { + expect( + checkAPIInvocation( + variablesToGetUpdated, + variableData, + parentDependencyGraph, + ), + ).toBeFalsy(); + }); + + it('should return true when parentDependencyGraph is empty', () => { + expect( + checkAPIInvocation(variablesToGetUpdated, variableData, {}), + ).toBeFalsy(); + }); + }); + + describe('variable sequences', () => { + it('should return true for valid sequence', () => { + expect( + checkAPIInvocation( + ['k8s_node_name', 'k8s_namespace_name'], + variableData, + parentDependencyGraph, + ), + ).toBeTruthy(); + }); + + it('should return false for invalid sequence', () => { + expect( + checkAPIInvocation( + ['k8s_cluster_name', 'k8s_node_name', 'k8s_namespace_name'], + variableData, + parentDependencyGraph, + ), + ).toBeFalsy(); + }); + + it('should return false when variableData is not in sequence', () => { + expect( + checkAPIInvocation( + ['deployment_environment', 'service_name', 'endpoint'], + variableData, + parentDependencyGraph, + ), + ).toBeFalsy(); + }); + }); + + describe('root element behavior', () => { + it('should return true for valid root element sequence', () => { + expect( + checkAPIInvocation( + [ + 'deployment_environment', + 'service_name', + 'endpoint', + 'http_status_code', + ], + mockRootElement, + parentDependencyGraph, + ), + ).toBeTruthy(); + }); + + it('should return true for empty variablesToGetUpdated array', () => { + expect( + checkAPIInvocation([], mockRootElement, parentDependencyGraph), + ).toBeTruthy(); + }); + }); + }); + + describe('Graph Building Utilities', () => { + const { graph } = buildGraphMock; + const { variables } = buildDependenciesMock; + + describe('buildParentDependencyGraph', () => { + it('should build parent dependency graph with correct relationships', () => { + const expected = { + deployment_environment: [], + service_name: ['deployment_environment'], + endpoint: ['deployment_environment', 'service_name'], + http_status_code: ['endpoint'], + k8s_cluster_name: [], + k8s_node_name: ['k8s_cluster_name'], + k8s_namespace_name: ['k8s_cluster_name', 'k8s_node_name'], + environment: [], + }; + + expect(buildParentDependencyGraph(graph)).toEqual(expected); + }); + + it('should handle empty graph', () => { + expect(buildParentDependencyGraph({})).toEqual({}); + }); + }); + + describe('buildDependencyGraph', () => { + it('should build complete dependency graph with correct structure and order', () => { + const expected = { + graph: { + deployment_environment: ['service_name', 'endpoint'], + service_name: ['endpoint'], + endpoint: ['http_status_code'], + http_status_code: [], + k8s_cluster_name: ['k8s_node_name', 'k8s_namespace_name'], + k8s_node_name: ['k8s_namespace_name'], + k8s_namespace_name: [], + environment: [], + }, + order: [ + 'deployment_environment', + 'k8s_cluster_name', + 'environment', + 'service_name', + 'k8s_node_name', + 'endpoint', + 'k8s_namespace_name', + 'http_status_code', + ], + }; + + expect(buildDependencyGraph(graph)).toEqual(expected); + }); + }); + + describe('buildDependencies', () => { + it('should build dependency map from variables array', () => { + const expected: VariableGraph = { + deployment_environment: ['service_name', 'endpoint'], + service_name: ['endpoint'], + endpoint: ['http_status_code'], + http_status_code: [], + k8s_cluster_name: ['k8s_node_name', 'k8s_namespace_name'], + k8s_node_name: ['k8s_namespace_name'], + k8s_namespace_name: [], + environment: [], + }; + + expect(buildDependencies(variables)).toEqual(expected); + }); + + it('should handle empty variables array', () => { + expect(buildDependencies([])).toEqual({}); + }); + }); + }); +}); diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/mock.ts b/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/mock.ts new file mode 100644 index 0000000000..c39841fcf4 --- /dev/null +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/__test__/mock.ts @@ -0,0 +1,251 @@ +/* eslint-disable sonarjs/no-duplicate-string */ +export const checkAPIInvocationMock = { + variablesToGetUpdated: [], + variableData: { + name: 'k8s_node_name', + key: '4d71d385-beaf-4434-8dbf-c62be68049fc', + allSelected: false, + customValue: '', + description: '', + id: '4d71d385-beaf-4434-8dbf-c62be68049fc', + modificationUUID: '77233d3c-96d7-4ccb-aa9d-11b04d563068', + multiSelect: false, + order: 6, + queryValue: + "SELECT JSONExtractString(labels, 'k8s_node_name') AS k8s_node_name\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'k8s_node_cpu_time' AND JSONExtractString(labels, 'k8s_cluster_name') = {{.k8s_cluster_name}}\nGROUP BY k8s_node_name", + selectedValue: 'gke-signoz-saas-si-consumer-bsc-e2sd4-a6d430fa-gvm2', + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + }, + parentDependencyGraph: { + deployment_environment: [], + service_name: ['deployment_environment'], + endpoint: ['deployment_environment', 'service_name'], + http_status_code: ['endpoint'], + k8s_cluster_name: [], + environment: [], + k8s_node_name: ['k8s_cluster_name'], + k8s_namespace_name: ['k8s_cluster_name', 'k8s_node_name'], + }, +} as any; + +export const onUpdateVariableNodeMock = { + nodeToUpdate: 'deployment_environment', + graph: { + deployment_environment: ['service_name', 'endpoint'], + service_name: ['endpoint'], + endpoint: ['http_status_code'], + http_status_code: [], + k8s_cluster_name: ['k8s_node_name', 'k8s_namespace_name'], + environment: [], + k8s_node_name: ['k8s_namespace_name'], + k8s_namespace_name: [], + }, + topologicalOrder: [ + 'deployment_environment', + 'k8s_cluster_name', + 'environment', + 'service_name', + 'k8s_node_name', + 'endpoint', + 'k8s_namespace_name', + 'http_status_code', + ], + callback: jest.fn(), +}; + +export const buildGraphMock = { + graph: { + deployment_environment: ['service_name', 'endpoint'], + service_name: ['endpoint'], + endpoint: ['http_status_code'], + http_status_code: [], + k8s_cluster_name: ['k8s_node_name', 'k8s_namespace_name'], + environment: [], + k8s_node_name: ['k8s_namespace_name'], + k8s_namespace_name: [], + }, +}; + +export const buildDependenciesMock = { + variables: [ + { + key: '036a47cd-9ffc-47de-9f27-0329198964a8', + name: 'deployment_environment', + allSelected: false, + customValue: '', + description: '', + id: '036a47cd-9ffc-47de-9f27-0329198964a8', + modificationUUID: '5f71b591-f583-497c-839d-6a1590c3f60f', + multiSelect: false, + order: 0, + queryValue: + "SELECT DISTINCT JSONExtractString(labels, 'deployment_environment') AS deployment_environment\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'signoz_calls_total'", + selectedValue: 'production', + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + }, + { + key: 'eed5c917-1860-4c7e-bf6d-a05b97bafbc9', + name: 'service_name', + allSelected: true, + customValue: '', + description: '', + id: 'eed5c917-1860-4c7e-bf6d-a05b97bafbc9', + modificationUUID: '85db928b-ac9b-4e9f-b274-791112102fdf', + multiSelect: true, + order: 1, + queryValue: + "SELECT DISTINCT JSONExtractString(labels, 'service_name') FROM signoz_metrics.distributed_time_series_v4_1day\n WHERE metric_name = 'signoz_calls_total' and JSONExtractString(labels, 'deployment_environment') = {{.deployment_environment}}", + selectedValue: ['otelgateway'], + showALLOption: true, + sort: 'ASC', + textboxValue: '', + type: 'QUERY', + }, + { + key: '4022d3c1-e845-4952-8984-78f25f575c7a', + name: 'endpoint', + allSelected: true, + customValue: '', + description: '', + id: '4022d3c1-e845-4952-8984-78f25f575c7a', + modificationUUID: 'c0107fa1-ebb7-4dd3-aa9d-6ba08ecc594d', + multiSelect: true, + order: 2, + queryValue: + "SELECT DISTINCT JSONExtractString(labels, 'operation') FROM signoz_metrics.distributed_time_series_v4_1day\n WHERE metric_name = 'signoz_calls_total' AND JSONExtractString(labels, 'service_name') IN {{.service_name}} and JSONExtractString(labels, 'deployment_environment') = {{.deployment_environment}}", + selectedValue: [ + '//v1/traces', + '/logs/heroku', + '/logs/json', + '/logs/vector', + '/v1/logs', + '/v1/metrics', + '/v1/traces', + 'SELECT', + 'exporter/signozkafka/logs', + 'exporter/signozkafka/metrics', + 'exporter/signozkafka/traces', + 'extension/signozkeyauth/Authenticate', + 'get', + 'hmget', + 'opentelemetry.proto.collector.logs.v1.LogsService/Export', + 'opentelemetry.proto.collector.metrics.v1.MetricsService/Export', + 'opentelemetry.proto.collector.trace.v1.TraceService/Export', + 'processor/signozlimiter/LogsProcessed', + 'processor/signozlimiter/MetricsProcessed', + 'processor/signozlimiter/TracesProcessed', + 'receiver/otlp/LogsReceived', + 'receiver/otlp/MetricsReceived', + 'receiver/otlp/TraceDataReceived', + 'receiver/signozhttplog/heroku/LogsReceived', + 'receiver/signozhttplog/json/LogsReceived', + 'receiver/signozhttplog/vector/LogsReceived', + 'redis.dial', + 'redis.pipeline eval', + 'sadd', + 'set', + 'sismember', + ], + showALLOption: true, + sort: 'ASC', + textboxValue: '', + type: 'QUERY', + }, + { + key: '5e8a3cd9-3cd9-42df-a76c-79471a0f75bd', + name: 'http_status_code', + customValue: '', + description: '', + id: '5e8a3cd9-3cd9-42df-a76c-79471a0f75bd', + modificationUUID: '9a4021cc-a80a-4f15-8899-78892b763ca7', + multiSelect: true, + order: 3, + queryValue: + "SELECT DISTINCT JSONExtractString(labels, 'http_status_code') FROM signoz_metrics.distributed_time_series_v4_1day\n WHERE metric_name = 'signoz_calls_total' AND JSONExtractString(labels, 'operation') IN {{.endpoint}}", + showALLOption: true, + sort: 'ASC', + textboxValue: '', + type: 'QUERY', + selectedValue: ['', '200', '301', '400', '401', '405', '415', '429'], + allSelected: true, + }, + { + key: '48e9aa64-05ca-41c2-a1bd-6c8aeca659f1', + name: 'k8s_cluster_name', + allSelected: false, + customValue: 'test-1,\ntest-2,\ntest-3', + description: '', + id: '48e9aa64-05ca-41c2-a1bd-6c8aeca659f1', + modificationUUID: '44722322-368c-4613-bb7f-d0b12867d57a', + multiSelect: false, + order: 4, + queryValue: + "SELECT JSONExtractString(labels, 'k8s_cluster_name') AS k8s_cluster_name\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'k8s_node_cpu_time'\nGROUP BY k8s_cluster_name", + selectedValue: 'saasmonitor-cluster', + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + }, + { + key: '3ea18ba2-30cf-4220-b03b-720b5eaf35f8', + name: 'environment', + allSelected: false, + customValue: '', + description: '', + id: '3ea18ba2-30cf-4220-b03b-720b5eaf35f8', + modificationUUID: '9f76cb06-1b9f-460f-a174-0b210bb3cf93', + multiSelect: false, + order: 5, + queryValue: + "SELECT DISTINCT JSONExtractString(labels, 'deployment_environment') AS environment\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'signoz_calls_total'", + selectedValue: 'production', + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + }, + { + key: '4d71d385-beaf-4434-8dbf-c62be68049fc', + name: 'k8s_node_name', + allSelected: false, + customValue: '', + description: '', + id: '4d71d385-beaf-4434-8dbf-c62be68049fc', + modificationUUID: '77233d3c-96d7-4ccb-aa9d-11b04d563068', + multiSelect: false, + order: 6, + queryValue: + "SELECT JSONExtractString(labels, 'k8s_node_name') AS k8s_node_name\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'k8s_node_cpu_time' AND JSONExtractString(labels, 'k8s_cluster_name') = {{.k8s_cluster_name}}\nGROUP BY k8s_node_name", + selectedValue: 'gke-signoz-saas-si-consumer-bsc-e2sd4-a6d430fa-gvm2', + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + }, + { + key: '937ecbae-b24b-4d6d-8cc4-5d5b8d53569b', + name: 'k8s_namespace_name', + customValue: '', + description: '', + id: '937ecbae-b24b-4d6d-8cc4-5d5b8d53569b', + modificationUUID: '8ad2442d-8b4d-4c64-848e-af847d1d0eec', + multiSelect: false, + order: 7, + queryValue: + "SELECT JSONExtractString(labels, 'k8s_namespace_name') AS k8s_namespace_name\nFROM signoz_metrics.distributed_time_series_v4_1day\nWHERE metric_name = 'k8s_pod_cpu_time' AND JSONExtractString(labels, 'k8s_cluster_name') = {{.k8s_cluster_name}} AND JSONExtractString(labels, 'k8s_node_name') IN {{.k8s_node_name}}\nGROUP BY k8s_namespace_name", + showALLOption: false, + sort: 'DISABLED', + textboxValue: '', + type: 'QUERY', + selectedValue: 'saasmonitor', + allSelected: false, + }, + ] as any, +}; diff --git a/frontend/src/container/NewDashboard/DashboardVariablesSelection/util.ts b/frontend/src/container/NewDashboard/DashboardVariablesSelection/util.ts index a3fe59ccd8..3b79111bfa 100644 --- a/frontend/src/container/NewDashboard/DashboardVariablesSelection/util.ts +++ b/frontend/src/container/NewDashboard/DashboardVariablesSelection/util.ts @@ -1,3 +1,4 @@ +import { isEmpty, isNull } from 'lodash-es'; import { Dashboard, IDashboardVariable } from 'types/api/dashboard/getAll'; export function areArraysEqual( @@ -29,3 +30,185 @@ export const convertVariablesToDbFormat = ( result[id] = obj; return result; }, {}); + +const getDependentVariablesBasedOnVariableName = ( + variableName: string, + variables: IDashboardVariable[], +): string[] => { + if (!variables || !Array.isArray(variables)) { + return []; + } + + return variables + ?.map((variable: any) => { + if (variable.type === 'QUERY') { + // Combined pattern for all formats + // {{.variable_name}} - original format + // $variable_name - dollar prefix format + // [[variable_name]] - square bracket format + // {{variable_name}} - without dot format + const patterns = [ + `\\{\\{\\s*?\\.${variableName}\\s*?\\}\\}`, // {{.var}} + `\\{\\{\\s*${variableName}\\s*\\}\\}`, // {{var}} + `\\$${variableName}\\b`, // $var + `\\[\\[\\s*${variableName}\\s*\\]\\]`, // [[var]] + ]; + const combinedRegex = new RegExp(patterns.join('|')); + + const queryValue = variable.queryValue || ''; + const dependVarReMatch = queryValue.match(combinedRegex); + if (dependVarReMatch !== null && dependVarReMatch.length > 0) { + return variable.name; + } + } + return null; + }) + .filter((val: string | null) => !isNull(val)); +}; +export type VariableGraph = Record; + +export const buildDependencies = ( + variables: IDashboardVariable[], +): VariableGraph => { + const graph: VariableGraph = {}; + + // Initialize empty arrays for all variables first + variables.forEach((variable) => { + if (variable.name) { + graph[variable.name] = []; + } + }); + + // For each QUERY variable, add it as a dependent to its referenced variables + variables.forEach((variable) => { + if (variable.name) { + const dependentVariables = getDependentVariablesBasedOnVariableName( + variable.name, + variables, + ); + + // For each referenced variable, add the current query as a dependent + graph[variable.name] = dependentVariables; + } + }); + + return graph; +}; + +// Function to build the dependency graph +export const buildDependencyGraph = ( + dependencies: VariableGraph, +): { order: string[]; graph: VariableGraph } => { + const inDegree: Record = {}; + const adjList: VariableGraph = {}; + + // Initialize in-degree and adjacency list + Object.keys(dependencies).forEach((node) => { + if (!inDegree[node]) inDegree[node] = 0; + if (!adjList[node]) adjList[node] = []; + dependencies[node].forEach((child) => { + if (!inDegree[child]) inDegree[child] = 0; + inDegree[child]++; + adjList[node].push(child); + }); + }); + + // Topological sort using Kahn's Algorithm + const queue: string[] = Object.keys(inDegree).filter( + (node) => inDegree[node] === 0, + ); + const topologicalOrder: string[] = []; + + while (queue.length > 0) { + const current = queue.shift(); + if (current === undefined) { + break; + } + topologicalOrder.push(current); + + adjList[current].forEach((neighbor) => { + inDegree[neighbor]--; + if (inDegree[neighbor] === 0) queue.push(neighbor); + }); + } + + if (topologicalOrder.length !== Object.keys(dependencies).length) { + console.error('Cycle detected in the dependency graph!'); + } + + return { order: topologicalOrder, graph: adjList }; +}; + +export const onUpdateVariableNode = ( + nodeToUpdate: string, + graph: VariableGraph, + topologicalOrder: string[], + callback: (node: string) => void, +): void => { + const visited = new Set(); + + // Start processing from the node to update + topologicalOrder.forEach((node) => { + if (node === nodeToUpdate || visited.has(node)) { + visited.add(node); + callback(node); + (graph[node] || []).forEach((child) => { + visited.add(child); + }); + } + }); +}; + +export const buildParentDependencyGraph = ( + graph: VariableGraph, +): VariableGraph => { + const parentGraph: VariableGraph = {}; + + // Initialize empty arrays for all nodes + Object.keys(graph).forEach((node) => { + parentGraph[node] = []; + }); + + // For each node and its children in the original graph + Object.entries(graph).forEach(([node, children]) => { + // For each child, add the current node as its parent + children.forEach((child) => { + parentGraph[child].push(node); + }); + }); + + return parentGraph; +}; + +export const checkAPIInvocation = ( + variablesToGetUpdated: string[], + variableData: IDashboardVariable, + parentDependencyGraph?: VariableGraph, +): boolean => { + if (isEmpty(variableData.name)) { + return false; + } + + if (isEmpty(parentDependencyGraph)) { + return false; + } + + // if no dependency then true + const haveDependency = + parentDependencyGraph?.[variableData.name || '']?.length > 0; + if (!haveDependency) { + return true; + } + + // if variable is in the list and has dependency then check if its the top element in the queue then true else false + return ( + variablesToGetUpdated.length > 0 && + variablesToGetUpdated[0] === variableData.name + ); +}; + +export interface IDependencyData { + order: string[]; + graph: VariableGraph; + parentDependencyGraph: VariableGraph; +} diff --git a/frontend/src/pages/InfrastructureMonitoring/InfrastructureMonitoringPage.tsx b/frontend/src/pages/InfrastructureMonitoring/InfrastructureMonitoringPage.tsx index a86f7ec56e..99f0455be9 100644 --- a/frontend/src/pages/InfrastructureMonitoring/InfrastructureMonitoringPage.tsx +++ b/frontend/src/pages/InfrastructureMonitoring/InfrastructureMonitoringPage.tsx @@ -5,12 +5,12 @@ import { TabRoutes } from 'components/RouteTab/types'; import history from 'lib/history'; import { useLocation } from 'react-use'; -import { Hosts, Kubernetes } from './constants'; +import { Hosts } from './constants'; export default function InfrastructureMonitoringPage(): JSX.Element { const { pathname } = useLocation(); - const routes: TabRoutes[] = [Hosts, Kubernetes]; + const routes: TabRoutes[] = [Hosts]; return (
diff --git a/frontend/src/providers/Dashboard/Dashboard.tsx b/frontend/src/providers/Dashboard/Dashboard.tsx index 8714cdba2c..ee3d196e01 100644 --- a/frontend/src/providers/Dashboard/Dashboard.tsx +++ b/frontend/src/providers/Dashboard/Dashboard.tsx @@ -69,6 +69,8 @@ const DashboardContext = createContext({ updateLocalStorageDashboardVariables: () => {}, variablesToGetUpdated: [], setVariablesToGetUpdated: () => {}, + dashboardQueryRangeCalled: false, + setDashboardQueryRangeCalled: () => {}, }); interface Props { @@ -85,6 +87,11 @@ export function DashboardProvider({ const [isDashboardLocked, setIsDashboardLocked] = useState(false); + const [ + dashboardQueryRangeCalled, + setDashboardQueryRangeCalled, + ] = useState(false); + const isDashboardPage = useRouteMatch({ path: ROUTES.DASHBOARD, exact: true, @@ -407,6 +414,8 @@ export function DashboardProvider({ updateLocalStorageDashboardVariables, variablesToGetUpdated, setVariablesToGetUpdated, + dashboardQueryRangeCalled, + setDashboardQueryRangeCalled, }), // eslint-disable-next-line react-hooks/exhaustive-deps [ @@ -424,6 +433,8 @@ export function DashboardProvider({ currentDashboard, variablesToGetUpdated, setVariablesToGetUpdated, + dashboardQueryRangeCalled, + setDashboardQueryRangeCalled, ], ); diff --git a/frontend/src/providers/Dashboard/types.ts b/frontend/src/providers/Dashboard/types.ts index e19c00e422..8ca214cfaf 100644 --- a/frontend/src/providers/Dashboard/types.ts +++ b/frontend/src/providers/Dashboard/types.ts @@ -43,4 +43,6 @@ export interface IDashboardContext { ) => void; variablesToGetUpdated: string[]; setVariablesToGetUpdated: React.Dispatch>; + dashboardQueryRangeCalled: boolean; + setDashboardQueryRangeCalled: (value: boolean) => void; } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 2fcc35a9fb..cf8724f78f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -30,6 +30,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/inframetrics" "go.signoz.io/signoz/pkg/query-service/app/integrations" + queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" @@ -51,11 +52,11 @@ import ( "go.uber.org/zap" - mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" + "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" - signozio "go.signoz.io/signoz/pkg/query-service/integrations/signozio" + "go.signoz.io/signoz/pkg/query-service/integrations/signozio" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/rules" @@ -2572,14 +2573,14 @@ func (aH *APIHandler) onboardProducers( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_producers") if err != nil { zap.L().Error(err.Error()) @@ -2595,7 +2596,7 @@ func (aH *APIHandler) onboardProducers( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse for _, result := range results { @@ -2608,7 +2609,7 @@ func (aH *APIHandler) onboardProducers( attribute = "telemetry ingestion" if intValue != 0 { entries = nil - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: "No data available in the given time range", Status: "0", @@ -2652,7 +2653,7 @@ func (aH *APIHandler) onboardProducers( } } - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: message, Status: status, @@ -2674,14 +2675,14 @@ func (aH *APIHandler) onboardConsumers( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers") + chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_consumers") if err != nil { zap.L().Error(err.Error()) @@ -2697,7 +2698,7 @@ func (aH *APIHandler) onboardConsumers( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse for _, result := range result { for key, value := range result.Data { @@ -2709,7 +2710,7 @@ func (aH *APIHandler) onboardConsumers( attribute = "telemetry ingestion" if intValue != 0 { entries = nil - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: "No data available in the given time range", Status: "0", @@ -2793,7 +2794,7 @@ func (aH *APIHandler) onboardConsumers( } } - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: message, Status: status, @@ -2814,14 +2815,14 @@ func (aH *APIHandler) onboardKafka( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - queryRangeParams, err := mq.BuildBuilderQueriesKafkaOnboarding(messagingQueue) + queryRangeParams, err := kafka.BuildBuilderQueriesKafkaOnboarding(messagingQueue) if err != nil { zap.L().Error(err.Error()) @@ -2836,7 +2837,7 @@ func (aH *APIHandler) onboardKafka( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse var fetchLatencyState, consumerLagState bool @@ -2860,7 +2861,7 @@ func (aH *APIHandler) onboardKafka( } if !fetchLatencyState && !consumerLagState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "telemetry ingestion", Message: "No data available in the given time range", Status: "0", @@ -2868,26 +2869,26 @@ func (aH *APIHandler) onboardKafka( } if !fetchLatencyState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_fetch_latency_avg", Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.", Status: "0", }) } else { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_fetch_latency_avg", Status: "1", }) } if !consumerLagState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_group_lag", Message: "Metric kafka_consumer_group_lag is not present in the given time range.", Status: "0", }) } else { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_group_lag", Status: "1", }) @@ -2899,10 +2900,10 @@ func (aH *APIHandler) onboardKafka( func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { - attributeCache := &mq.Clients{ + attributeCache := &kafka.Clients{ Hash: make(map[string]struct{}), } - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2910,7 +2911,7 @@ func (aH *APIHandler) getNetworkData( return } - queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache) + queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2949,7 +2950,7 @@ func (aH *APIHandler) getNetworkData( } } - queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache) + queryRangeParams, err = kafka.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2999,7 +3000,7 @@ func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { // parse the query params to retrieve the messaging queue struct - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3007,7 +3008,7 @@ func (aH *APIHandler) getProducerData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3040,7 +3041,7 @@ func (aH *APIHandler) getProducerData( func (aH *APIHandler) getConsumerData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3048,7 +3049,7 @@ func (aH *APIHandler) getConsumerData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3082,7 +3083,7 @@ func (aH *APIHandler) getConsumerData( func (aH *APIHandler) getPartitionOverviewLatencyData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3090,7 +3091,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3124,7 +3125,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData( func (aH *APIHandler) getConsumerPartitionLatencyData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3132,7 +3133,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3169,7 +3170,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData( func (aH *APIHandler) getProducerThroughputOverview( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3177,11 +3178,11 @@ func (aH *APIHandler) getProducerThroughputOverview( return } - attributeCache := &mq.Clients{ + attributeCache := &kafka.Clients{ Hash: make(map[string]struct{}), } - producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) + producerQueryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3219,7 +3220,7 @@ func (aH *APIHandler) getProducerThroughputOverview( } } - queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache) + queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3272,7 +3273,7 @@ func (aH *APIHandler) getProducerThroughputOverview( func (aH *APIHandler) getProducerThroughputDetails( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3280,7 +3281,7 @@ func (aH *APIHandler) getProducerThroughputDetails( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-throughput-details") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3314,7 +3315,7 @@ func (aH *APIHandler) getProducerThroughputDetails( func (aH *APIHandler) getConsumerThroughputOverview( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3322,7 +3323,7 @@ func (aH *APIHandler) getConsumerThroughputOverview( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3356,7 +3357,7 @@ func (aH *APIHandler) getConsumerThroughputOverview( func (aH *APIHandler) getConsumerThroughputDetails( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3364,7 +3365,7 @@ func (aH *APIHandler) getConsumerThroughputDetails( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3401,7 +3402,7 @@ func (aH *APIHandler) getConsumerThroughputDetails( func (aH *APIHandler) getProducerConsumerEval( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3409,7 +3410,7 @@ func (aH *APIHandler) getProducerConsumerEval( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3438,15 +3439,24 @@ func (aH *APIHandler) getProducerConsumerEval( aH.Respond(w, resp) } -// ParseMessagingQueueBody parse for messaging queue params -func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { - messagingQueue := new(mq.MessagingQueue) +// ParseKafkaQueueBody parse for messaging queue params +func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) { + messagingQueue := new(kafka.MessagingQueue) if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} } return messagingQueue, nil } +// ParseQueueBody parses for any queue +func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) { + queue := new(queues2.QueueListRequest) + if err := json.NewDecoder(r.Body).Decode(queue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return queue, nil +} + // Preferences func (aH *APIHandler) getUserPreference( @@ -5121,9 +5131,8 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { - // ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters - messagingQueue, apiErr := ParseMessagingQueueBody(r) + queueListRequest, apiErr := ParseQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -5131,11 +5140,11 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview") + chq, err := queues2.BuildOverviewQuery(queueListRequest) if err != nil { zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) return } @@ -5145,7 +5154,6 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) { - // TODO: Implement celery overview logic for both worker and tasks types } func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/integrations/messagingQueues/celery/translator.go b/pkg/query-service/app/integrations/messagingQueues/celery/translator.go new file mode 100644 index 0000000000..bf7e67e150 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/celery/translator.go @@ -0,0 +1 @@ +package celery diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 08b13a1ffb..de5d83487b 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -22,37 +22,3 @@ type OnboardingResponse struct { Message string `json:"error_message"` Status string `json:"status"` } - -// QueueFilters -// ToDo: add capability of dynamic filtering based on any of the filters -type QueueFilters struct { - ServiceName []string - SpanName []string - Queue []string - Destination []string - Kind []string -} - -type CeleryTask struct { - kind string - status string -} - -type CeleryTasks interface { - GetKind() string - GetStatus() string - Set(string, string) -} - -func (r *CeleryTask) GetKind() string { - return r.kind -} - -func (r *CeleryTask) GetStatus() string { - return r.status -} - -func (r *CeleryTask) Set(kind, status string) { - r.kind = kind - r.status = status -} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 8f1e010939..9b943acbc8 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -2,7 +2,6 @@ package kafka import ( "fmt" - "strings" ) func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { @@ -319,139 +318,6 @@ GROUP BY return query } -// generateOverviewSQL builds the ClickHouse SQL query with optional filters. -// If a filter slice is empty, the query does not constrain on that field. -func generateOverviewSQL(start, end int64, filters *QueueFilters) string { - // Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse - startSeconds := float64(start) / 1e9 - endSeconds := float64(end) / 1e9 - - // Compute time range difference in Go - timeRangeSecs := endSeconds - startSeconds - - // Example ts_bucket boundaries (could be your own logic) - tsBucketStart := startSeconds - 1800 - tsBucketEnd := endSeconds - - // Build WHERE clauses for optional filters - // We always require messaging_system IN ('kafka', 'celery'), but - // we add additional AND conditions only if the slices are non-empty. - var whereClauses []string - - // Mandatory base filter: show only kafka/celery - whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')") - - if len(filters.ServiceName) > 0 { - whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName)) - } - if len(filters.SpanName) > 0 { - whereClauses = append(whereClauses, inClause("span_name", filters.SpanName)) - } - if len(filters.Queue) > 0 { - // "queue" in the struct refers to the messaging_system in the DB - whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue)) - } - if len(filters.Destination) > 0 { - whereClauses = append(whereClauses, inClause("destination", filters.Destination)) - } - if len(filters.Kind) > 0 { - whereClauses = append(whereClauses, inClause("kind_string", filters.Kind)) - } - - // Combine all WHERE clauses with AND - whereSQL := strings.Join(whereClauses, "\n AND ") - - if len(whereSQL) > 0 { - whereSQL = fmt.Sprintf("AND %s", whereSQL) - } - - // Final query string - // Note the use of %f for float64 values in fmt.Sprintf - query := fmt.Sprintf(` -WITH - processed_traces AS ( - SELECT - resource_string_service$$name AS service_name, - name AS span_name, - CASE - WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system - WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery' - ELSE 'undefined' - END AS messaging_system, - kind_string, - COALESCE( - NULLIF(attributes_string['messaging.destination.name'], ''), - NULLIF(attributes_string['messaging.destination'], '') - ) AS destination, - durationNano, - status_code - FROM signoz_traces.distributed_signoz_index_v3 - WHERE - timestamp >= toDateTime64(%f, 9) - AND timestamp <= toDateTime64(%f, 9) - AND ts_bucket_start >= toDateTime64(%f, 9) - AND ts_bucket_start <= toDateTime64(%f, 9) - AND ( - attribute_string_messaging$$system = 'kafka' - OR has(attributes_string, 'celery.action') - OR has(attributes_string, 'celery.task_name') - ) - %s - ), - aggregated_metrics AS ( - SELECT - service_name, - span_name, - messaging_system, - destination, - kind_string, - count(*) AS total_count, - sumIf(1, status_code = 2) AS error_count, - quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms - FROM - processed_traces - GROUP BY - service_name, - span_name, - messaging_system, - destination, - kind_string - ) -SELECT - aggregated_metrics.service_name, - aggregated_metrics.span_name, - aggregated_metrics.messaging_system, - aggregated_metrics.destination, - aggregated_metrics.kind_string, - COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, - COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, - aggregated_metrics.p95_latency -FROM - aggregated_metrics -ORDER BY - aggregated_metrics.service_name, - aggregated_metrics.span_name; -`, - startSeconds, endSeconds, - tsBucketStart, tsBucketEnd, - whereSQL, timeRangeSecs, - ) - - return query -} - -// inClause returns SQL like "fieldName IN ('val1','val2','val3')" -func inClause(fieldName string, values []string) string { - // Quote and escape each value for safety - var quoted []string - for _, v := range values { - // Simple escape: replace any single quotes in v - safeVal := strings.ReplaceAll(v, "'", "''") - quoted = append(quoted, fmt.Sprintf("'%s'", safeVal)) - } - return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ",")) -} - func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 tsBucketStart := (start / 1000000000) - 1800 diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index b5fca5cf29..f5a669755f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,11 +2,9 @@ package kafka import ( "fmt" - "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" - "strings" ) var defaultStepInterval int64 = 60 @@ -21,6 +19,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) queueType := KafkaQueue chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) + if err != nil { return nil, err } @@ -321,34 +320,6 @@ func BuildQRParamsWithCache( return queryRangeParams, err } -func getFilters(variables map[string]string) *QueueFilters { - return &QueueFilters{ - ServiceName: parseFilter(variables["service_name"]), - SpanName: parseFilter(variables["span_name"]), - Queue: parseFilter(variables["queue"]), - Destination: parseFilter(variables["destination"]), - Kind: parseFilter(variables["kind"]), - } -} - -// parseFilter splits a comma-separated string into a []string. -// Returns an empty slice if the input is blank. -func parseFilter(val string) []string { - if val == "" { - return []string{} - } - // Split on commas, trim whitespace around each part - parts := strings.Split(val, ",") - var out []string - for _, p := range parts { - trimmed := strings.TrimSpace(p) - if trimmed != "" { - out = append(out, trimmed) - } - } - return out -} - func BuildClickHouseQuery( messagingQueue *MessagingQueue, queueType string, @@ -385,8 +356,6 @@ func BuildClickHouseQuery( var query string switch queryContext { - case "overview": - query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables)) case "producer": query = generateProducerSQL(start, end, topic, partition, queueType) case "consumer": diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/model.go b/pkg/query-service/app/integrations/messagingQueues/queues/model.go new file mode 100644 index 0000000000..eca7bf870c --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/model.go @@ -0,0 +1,27 @@ +package queues + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +type QueueListRequest struct { + Start int64 `json:"start"` // unix nano + End int64 `json:"end"` // unix nano + Filters *v3.FilterSet `json:"filters"` + Limit int `json:"limit"` +} + +func (qr *QueueListRequest) Validate() error { + + err := qr.Filters.Validate() + if err != nil { + return err + } + + if qr.Start < 0 || qr.End < 0 { + return fmt.Errorf("start and end must be unixnano time") + } + return nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go b/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go new file mode 100644 index 0000000000..cbfe0ad0d1 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go @@ -0,0 +1,19 @@ +package queues + +import ( + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func BuildOverviewQuery(queueList *QueueListRequest) (*v3.ClickHouseQuery, error) { + + err := queueList.Validate() + if err != nil { + return nil, err + } + + query := generateOverviewSQL(queueList.Start, queueList.End, queueList.Filters.Items) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/sql.go b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go new file mode 100644 index 0000000000..450c9d0773 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go @@ -0,0 +1,117 @@ +package queues + +import ( + "fmt" + "strings" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + format "go.signoz.io/signoz/pkg/query-service/utils" +) + +// generateOverviewSQL builds the ClickHouse SQL query with optional filters. +// If a filter slice is empty, the query does not constrain on that field. +func generateOverviewSQL(start, end int64, item []v3.FilterItem) string { + // Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse + startSeconds := float64(start) / 1e9 + endSeconds := float64(end) / 1e9 + + timeRangeSecs := endSeconds - startSeconds + + tsBucketStart := startSeconds - 1800 + tsBucketEnd := endSeconds + + var whereClauses []string + + whereClauses = append(whereClauses, fmt.Sprintf("timestamp >= toDateTime64(%f, 9)", startSeconds)) + whereClauses = append(whereClauses, fmt.Sprintf("timestamp <= toDateTime64(%f, 9)", endSeconds)) + + for _, filter := range item { + switch filter.Key.Key { + case "service.name": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "service_name", format.ClickHouseFormattedValue(filter.Value))) + case "name": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "span_name", format.ClickHouseFormattedValue(filter.Value))) + case "destination": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "destination", format.ClickHouseFormattedValue(filter.Value))) + case "queue": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "messaging_system", format.ClickHouseFormattedValue(filter.Value))) + case "kind_string": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "kind_string", format.ClickHouseFormattedValue(filter.Value))) + } + } + + // Combine all WHERE clauses with AND + whereSQL := strings.Join(whereClauses, "\n AND ") + + if len(whereSQL) > 0 { + whereSQL = fmt.Sprintf("AND %s", whereSQL) + } + + query := fmt.Sprintf(` +WITH + processed_traces AS ( + SELECT + resource_string_service$$name AS service_name, + name AS span_name, + CASE + WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system + WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery' + ELSE 'undefined' + END AS messaging_system, + kind_string, + COALESCE( + NULLIF(attributes_string['messaging.destination.name'], ''), + NULLIF(attributes_string['messaging.destination'], '') + ) AS destination, + durationNano, + status_code + FROM signoz_traces.distributed_signoz_index_v3 + WHERE + ts_bucket_start >= toDateTime64(%f, 9) + AND ts_bucket_start <= toDateTime64(%f, 9) + AND ( + attribute_string_messaging$$system = 'kafka' + OR has(attributes_string, 'celery.action') + OR has(attributes_string, 'celery.task_name') + ) + %s + ), + aggregated_metrics AS ( + SELECT + service_name, + span_name, + messaging_system, + destination, + kind_string, + count(*) AS total_count, + sumIf(1, status_code = 2) AS error_count, + quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms + FROM + processed_traces + GROUP BY + service_name, + span_name, + messaging_system, + destination, + kind_string + ) +SELECT + aggregated_metrics.service_name, + aggregated_metrics.span_name, + aggregated_metrics.messaging_system, + aggregated_metrics.destination, + aggregated_metrics.kind_string, + COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, + COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, + aggregated_metrics.p95_latency +FROM + aggregated_metrics +ORDER BY + aggregated_metrics.service_name, + aggregated_metrics.span_name; +`, tsBucketStart, tsBucketEnd, + whereSQL, timeRangeSecs, + ) + + return query +}