Skip to content

Commit

Permalink
big refactor: simplify a lot of the remapping logic
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-sentry committed Jan 7, 2025
1 parent 005de34 commit 5eb4974
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 288 deletions.
37 changes: 0 additions & 37 deletions snuba/clickhouse/translators/snuba/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
Param,
String,
)
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.hashes import fnv_1a


# This is a workaround for a mypy bug, found here: https://github.com/python/mypy/issues/5374
Expand Down Expand Up @@ -229,41 +227,6 @@ def attempt_map(
return None


@dataclass(frozen=True)
class SubscriptableHashBucketMapper(SubscriptableReferenceMapper):
"""
Maps a key into the appropriate bucket by hashing the key. For example, hello[test] might go to attr_str_22['test']
"""

from_column_table: Optional[str]
from_column_name: str
to_col_table: Optional[str]
to_col_name: str

def attempt_map(
self,
expression: SubscriptableReference,
children_translator: SnubaClickhouseStrictTranslator,
) -> Optional[FunctionCallExpr]:
if (
expression.column.table_name != self.from_column_table
or expression.column.column_name != self.from_column_name
):
return None
key = expression.key.accept(children_translator)
if not isinstance(key, LiteralExpr):
return None
if not isinstance(key.value, str):
return None

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)


@dataclass(frozen=True)
class ColumnToMapping(ColumnToExpression):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ storages:
from_col_name: exclusive_time_ms
to_col_name: exclusive_time_micro

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_str
to_col_table: null
to_col_name: attr_str
data_type: String
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_num
to_col_table: null
to_col_name: attr_num
data_type: Float64

storage_selector:
selector: DefaultQueryStorageSelector

Expand All @@ -90,11 +74,16 @@ query_processors:
curried_aggregation_names:
- quantile
- quantileTDigestWeighted
- processor: HashBucketFunctionTransformer
- processor: EAPMapSharder
args:
src_bucket_name: attr_str
dest_bucket_name: attr_str
data_type: String
- processor: EAPMapSharder
args:
hash_bucket_name_mapping:
attr_str: attr_str
attr_num: attr_num
src_bucket_name: attr_num
dest_bucket_name: attr_num
data_type: Float64

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,69 +43,64 @@ storages:
to_table_name: null
to_col_name: _sort_timestamp

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_str
to_col_table: null
to_col_name: attr_str
data_type: String
normalized_columns:
sentry.name: name
sentry.service: service
sentry.span_id: span_id
sentry.parent_span_id: parent_span_id
sentry.segment_id: segment_id
sentry.segment_name: segment_name
sentry.start_timestamp: start_timestamp
sentry.end_timestamp: end_timestamp
sentry.timestamp: _sort_timestamp
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_f64
to_col_table: null
to_col_name: attr_num
data_type: Float64
normalized_columns:
sentry.exclusive_time_micro: exclusive_time_micro
sentry.duration_micro: duration_micro
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_i64
to_col_table: null
to_col_name: attr_num
data_type: Int64
normalized_columns:
sentry.organization_id: organization_id
sentry.project_id: project_id

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
# a processor that creates a virtual 'time' column to group by for generating timeseries.
- processor: TimeSeriesProcessor
args:
time_group_columns:
time: timestamp
time_parse_columns:
- start_timestamp
- end_timestamp
- processor: HashBucketFunctionTransformer
# maps (e.g.) attr_str[sentry.name] to the clickhouse column 'name'
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_str
data_type: String
keys:
sentry.name: name
sentry.service: service
sentry.parent_span_id: parent_span_id
sentry.segment_name: segment_name
sentry.start_timestamp: start_timestamp
sentry.end_timestamp: end_timestamp
sentry.timestamp: timestamp
# maps attr_str[span_id] to hex(span_id)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name_mapping:
attr_str: attr_str
attr_f64: attr_num
attr_i64: attr_num
hash_bucket_name: attr_str
data_type: hex
keys:
sentry.span_id: span_id
sentry.segment_id: segment_id
# maps attr_f64[duration_ms] to CAST(duration_ms, Float64)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_f64
data_type: Float64
keys:
sentry.exclusive_time_ms: exclusive_time_ms
sentry.duration_ms: duration_ms
# maps attr_i64[project_id] to CAST(project_id, Int64)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_i64
data_type: Int64
keys:
sentry.exclusive_time_micro: exclusive_time_micro
sentry.duration_micro: duration_micro
sentry.organization_id: organization_id
sentry.project_id: project_id
# maps avg(attr_i64[hello]) to avgIf(attr_i64['hello'], mapContains(attr_i64, 'hello'))
- processor: OptionalAttributeAggregationTransformer
args:
attribute_column_names:
- attr_f64
- attr_i64
aggregation_names:
- sum
- count
- avg
- avgWeighted
Expand All @@ -115,6 +110,26 @@ query_processors:
curried_aggregation_names:
- quantile
- quantileTDigestWeighted
# maps a few things:
# - attr_i64['blah'] to CAST(arrayIndex(attr_num_5, 'blah'), 'Int64')
# - mapContains(attr_str, blah) to mapContains(attr_str_5, blah)
# - mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...)
# - mapValues(attr_str) to arrayConcat(mapValues(attr_str_0), mapValues(attr_str_1), ...)
- processor: EAPMapSharder
args:
src_bucket_name: attr_str
dest_bucket_name: attr_str
data_type: String
- processor: EAPMapSharder
args:
src_bucket_name: attr_f64
dest_bucket_name: attr_num
data_type: Float64
- processor: EAPMapSharder
args:
src_bucket_name: attr_i64
dest_bucket_name: attr_num
data_type: Int64

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
Expand Down
106 changes: 30 additions & 76 deletions snuba/query/processors/logical/eap_map_access_remapper.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,56 @@
from typing import Sequence
from typing import Mapping

from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.query.dsl import column, literal
from snuba.query.expressions import Expression, FunctionCall, SubscriptableReference
from snuba.query.logical import Query
from snuba.query.processors.logical import LogicalQueryProcessor
from snuba.query.query_settings import QuerySettings
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.hashes import fnv_1a


class HashBucketFunctionTransformer(LogicalQueryProcessor):
class EAPClickhouseColumnRemapper(LogicalQueryProcessor):
"""
In eap_spans, we split up map columns for better performance.
In the entity, attr_str Map(String, String) becomes
attr_str_0 Map(String, String),
attr_str_1 Map(String, String),
etc.
In EAP entities, all attributes are hidden behind some virtual maps: attr_str, attr_i64, etc
This transformer converts mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...)
and the same for mapValues
Sometimes a map access should refer to a 'real' column.
For example, you can use this processor to convert
attr_i64[duration_ms] to CAST(duration_ms, 'Int64')
It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah')
If data_type is the special value 'hex', the result is converted with the 'hex' function instead.
If there is no matching column, the map access remains as-is:
attr_str[derp] remains attr_str[derp]
"""

def __init__(self, hash_bucket_names: Sequence[str]):
def __init__(self, hash_bucket_name: str, keys: Mapping[str, str], data_type: str):
super().__init__()
self.hash_bucket_names = set(hash_bucket_names)
self.hash_bucket_name = hash_bucket_name
self.keys = keys
self.data_type = data_type

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

if len(exp.parameters) != 1:
return exp

param = exp.parameters[0]
if not isinstance(param, Column):
return exp

if param.column_name not in self.hash_bucket_names:
return exp

if exp.function_name not in ("mapKeys", "mapValues"):
return exp

return FunctionCall(
alias=exp.alias,
function_name="arrayConcat",
parameters=tuple(
FunctionCall(
None,
function_name=exp.function_name,
parameters=(
Column(
None,
column_name=f"{param.column_name}_{i}",
table_name=param.table_name,
),
),
)
for i in range(ATTRIBUTE_BUCKETS)
),
)

def transform_map_contains_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

if len(exp.parameters) != 2:
return exp

column = exp.parameters[0]
if not isinstance(column, Column):
return exp

if column.column_name not in self.hash_bucket_names:
def transform(exp: Expression) -> Expression:
if not isinstance(exp, SubscriptableReference):
return exp

if exp.function_name != "mapContains":
if exp.column.column_name != self.hash_bucket_name:
return exp

key = exp.parameters[1]
if not isinstance(key, Literal) or not isinstance(key.value, str):
if exp.key.value not in self.keys:
return exp

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
if self.data_type == "hex":
return FunctionCall(
alias=exp.alias,
function_name="hex",
parameters=(column(self.keys[exp.key.value]),),
)
return FunctionCall(
alias=exp.alias,
function_name=exp.function_name,
function_name="CAST",
parameters=(
Column(
None,
None,
f"{column.column_name}_{bucket_idx}",
),
key,
column(self.keys[exp.key.value]),
literal(self.data_type),
),
)

query.transform_expressions(transform_map_keys_and_values_expression)
query.transform_expressions(transform_map_contains_expression)
query.transform_expressions(transform)
Loading

0 comments on commit 5eb4974

Please sign in to comment.