diff --git a/snuba/clickhouse/translators/snuba/mappers.py b/snuba/clickhouse/translators/snuba/mappers.py index 07f6384870e..452d140ef09 100644 --- a/snuba/clickhouse/translators/snuba/mappers.py +++ b/snuba/clickhouse/translators/snuba/mappers.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Mapping, Optional, Tuple +from typing import Optional, Tuple from snuba.clickhouse.translators.snuba import SnubaClickhouseStrictTranslator from snuba.clickhouse.translators.snuba.allowed import ( @@ -9,7 +9,7 @@ SubscriptableReferenceMapper, ValidColumnMappings, ) -from snuba.query.dsl import arrayElement, column, literal +from snuba.query.dsl import arrayElement from snuba.query.expressions import Column as ColumnExpr from snuba.query.expressions import CurriedFunctionCall, Expression from snuba.query.expressions import FunctionCall as FunctionCallExpr @@ -239,12 +239,6 @@ class SubscriptableHashBucketMapper(SubscriptableReferenceMapper): from_column_name: str to_col_table: Optional[str] to_col_name: str - # the result is cast to this type - data_type: str - # if you add {'sentry.span_id': 'span_id'} here, then if the user requests attr_blah[sentry.span_id], - # this mapper will return a reference to the actual column instead of attr_str. - # if specified, data_type must also be specified. - normalized_columns: Optional[Mapping[str, str]] = None def attempt_map( self, @@ -263,24 +257,12 @@ def attempt_map( return None bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS - expr: Expression = arrayElement( - None, + return arrayElement( + expression.alias, ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"), key, ) - if self.normalized_columns and key.value in self.normalized_columns: - expr = column(self.normalized_columns[key.value]) - - return FunctionCallExpr( - expression.alias, - "CAST", - ( - expr, - literal(self.data_type), - ), - ) - @dataclass(frozen=True) class ColumnToMapping(ColumnToExpression): diff --git a/snuba/query/processors/logical/eap_map_access_remapper.py b/snuba/query/processors/logical/eap_map_access_remapper.py new file mode 100644 index 00000000000..0eb315945e7 --- /dev/null +++ b/snuba/query/processors/logical/eap_map_access_remapper.py @@ -0,0 +1,102 @@ +from typing import Sequence + +from snuba.query.expressions import Column, Expression, FunctionCall, Literal +from snuba.query.logical import Query +from snuba.query.processors.logical import LogicalQueryProcessor +from snuba.query.query_settings import QuerySettings +from snuba.utils.constants import ATTRIBUTE_BUCKETS +from snuba.utils.hashes import fnv_1a + + +class HashBucketFunctionTransformer(LogicalQueryProcessor): + """ + In eap_spans, we split up map columns for better performance. + In the entity, attr_str Map(String, String) becomes + attr_str_0 Map(String, String), + attr_str_1 Map(String, String), + etc. + + This transformer converts mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...) + and the same for mapValues + + It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah') + """ + + def __init__(self, hash_bucket_names: Sequence[str]): + super().__init__() + self.hash_bucket_names = set(hash_bucket_names) + + def process_query(self, query: Query, query_settings: QuerySettings) -> None: + def transform_map_keys_and_values_expression(exp: Expression) -> Expression: + if not isinstance(exp, FunctionCall): + return exp + + if len(exp.parameters) != 1: + return exp + + param = exp.parameters[0] + if not isinstance(param, Column): + return exp + + if param.column_name not in self.hash_bucket_names: + return exp + + if exp.function_name not in ("mapKeys", "mapValues"): + return exp + + return FunctionCall( + alias=exp.alias, + function_name="arrayConcat", + parameters=tuple( + FunctionCall( + None, + function_name=exp.function_name, + parameters=( + Column( + None, + column_name=f"{param.column_name}_{i}", + table_name=param.table_name, + ), + ), + ) + for i in range(ATTRIBUTE_BUCKETS) + ), + ) + + def transform_map_contains_expression(exp: Expression) -> Expression: + if not isinstance(exp, FunctionCall): + return exp + + if len(exp.parameters) != 2: + return exp + + column = exp.parameters[0] + if not isinstance(column, Column): + return exp + + if column.column_name not in self.hash_bucket_names: + return exp + + if exp.function_name != "mapContains": + return exp + + key = exp.parameters[1] + if not isinstance(key, Literal) or not isinstance(key.value, str): + return exp + + bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS + return FunctionCall( + alias=exp.alias, + function_name=exp.function_name, + parameters=( + Column( + None, + None, + f"{column.column_name}_{bucket_idx}", + ), + key, + ), + ) + + query.transform_expressions(transform_map_keys_and_values_expression) + query.transform_expressions(transform_map_contains_expression)