From 440acc7c58d68e7d8384837b2d4855bcaad19073 Mon Sep 17 00:00:00 2001 From: Robert Szefler Date: Tue, 9 Apr 2024 13:38:52 +0200 Subject: [PATCH] summaries --- src/robusta/core/sinks/sink_base.py | 26 +++++---- src/robusta/core/sinks/slack/slack_sink.py | 64 ++++++++++++++-------- src/robusta/integrations/slack/sender.py | 30 +++++++++- 3 files changed, 85 insertions(+), 35 deletions(-) diff --git a/src/robusta/core/sinks/sink_base.py b/src/robusta/core/sinks/sink_base.py index 0d04d450d..b64efd9ad 100644 --- a/src/robusta/core/sinks/sink_base.py +++ b/src/robusta/core/sinks/sink_base.py @@ -1,7 +1,7 @@ -import logging import threading from abc import abstractmethod, ABC -from typing import Any, List, Dict, Tuple +from collections import defaultdict +from typing import Any, List, Dict, Tuple, DefaultDict from robusta.core.model.k8s_operation_type import K8sOperationType from robusta.core.reporting.base import Finding @@ -15,12 +15,12 @@ class SinkBase(ABC): # The tuples in the types below holds all the attributes we are aggregating on. finding_group_start_ts: Dict[Tuple, float] # timestamps for message groups - finding_group_n_ignored: Dict[Tuple, int] # number of messages ignored for each group + finding_group_n_received: DefaultDict[Tuple, int] # number of messages ignored for each group finding_group_heads: Dict[Tuple, str] # a mapping from a set of parameters to the head of a thread # Summary groups - finding_sgroup_header: List # descriptive header for the summary table - finding_sgroup_counts: Dict[Tuple, Dict[Tuple, int]] # rows of the summary table + finding_summary_header: List[str] # descriptive header for the summary table + finding_summary_counts: Dict[Tuple, Dict[Tuple, Tuple[int, int]]] # rows of the summary table finding_group_lock: threading.Lock = threading.Lock() @@ -39,17 +39,13 @@ def __init__(self, sink_params: SinkBaseParams, registry): if sink_params.grouping: self.grouping_enabled = True - self.finding_group_start_ts = {} - self.finding_group_n_ignored = {} - self.finding_group_heads = {} if sink_params.grouping.notification_mode.summary: self.grouping_summary_mode = True - self.finding_sgroup_header = [] - self.finding_sgroup_counts = {} + self.finding_summary_header = [] if sink_params.grouping.notification_mode.summary.by: for attr in sink_params.grouping.notification_mode.summary.by: if isinstance(attr, str): - self.finding_sgroup_header.append("event" if attr == "identifier" else attr) + self.finding_summary_header.append("event" if attr == "identifier" else attr) elif isinstance(attr, dict): keys = list(attr.keys()) if len(keys) > 1: @@ -64,12 +60,18 @@ def __init__(self, sink_params: SinkBaseParams, registry): "(only labels/attributes allowed)" ) for label_or_attr_name in attr[key]: - self.finding_sgroup_header.append(f"{key[:-1]}:{label_or_attr_name}") + self.finding_summary_header.append(f"{key[:-1]}:{label_or_attr_name}") else: self.grouping_summary_mode = False else: self.grouping_enabled = False + self.reset_grouping_data() + def reset_grouping_data(self): + self.finding_group_start_ts = {} + self.finding_group_n_received = defaultdict(int) + self.finding_group_heads = {} + self.finding_summary_counts = {} def _build_time_slices_from_params(self, params: ActivityParams): if params is None: diff --git a/src/robusta/core/sinks/slack/slack_sink.py b/src/robusta/core/sinks/slack/slack_sink.py index 6478fae62..de57972fa 100644 --- a/src/robusta/core/sinks/slack/slack_sink.py +++ b/src/robusta/core/sinks/slack/slack_sink.py @@ -22,59 +22,72 @@ def write_finding(self, finding: Finding, platform_enabled: bool) -> None: self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled) def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) -> None: - # TODO support notification_mode.regular.ignore_first + # TODO support firing/resolved timestamp = time.time() finding_data = finding.attribute_map # The following will be e.g. Deployment, Job, etc. Sometimes it's undefined. - finding_data["workload"] = finding.service.resource_type if finding.service else None - summary_classification = self.classify_finding( - finding_data, self.params.grouping.notification_mode.summary.by - ) - group_by_classification = self.classify_finding(finding_data, self.params.grouping.group_by) - logging.warning(f"****** {group_by_classification=}") + finding_data["workload"] = finding.service.resource_type if finding.service else "-" + group_by_classification, _ = self.classify_finding(finding_data, self.params.grouping.group_by) with self.finding_group_lock: - if ( - summary_classification not in self.finding_group_start_ts - or self.finding_group_start_ts[group_by_classification] - timestamp > self.params.grouping.interval - ): + if self.finding_group_start_ts[group_by_classification] - timestamp > self.params.grouping.interval: + self.reset_grouping_data() + if group_by_classification not in self.finding_group_start_ts: # Create a new group/thread self.finding_group_start_ts[group_by_classification] = timestamp slack_thread_ts = None else: - slack_thread_ts = self.finding_group_heads[group_by_classification] + slack_thread_ts = self.finding_group_heads.get(group_by_classification) + self.finding_group_n_received[group_by_classification] += 1 + if ( + self.finding_group_n_received[group_by_classification] < + self.params.grouping.notification_mode.regular.ignore_first + ): + return if self.grouping_summary_mode: - logging.warning(f"****** {summary_classification=}") - slack_thread_ts = None + summary_classification, summary_classification_header = self.classify_finding( + finding_data, self.params.grouping.notification_mode.summary.by + ) if slack_thread_ts is not None: # Continue emitting findings in an already existing Slack thread if not self.grouping_summary_mode or self.params.grouping.notification_mode.summary.threaded: - logging.warning(f"Appending to Slack thread {slack_thread_ts}") + logging.info(f"Appending to Slack thread {slack_thread_ts}") self.slack_sender.send_finding_to_slack( finding, self.params, platform_enabled, thread_ts=slack_thread_ts ) if self.grouping_summary_mode: - pass + logging.info(f"Updating summaries in Slack thread {slack_thread_ts}") # TODO update totals in the summary message else: # Create the first Slack message if self.grouping_summary_mode: - # TODO - pass + self.finding_summary_counts[group_by_classification] = {group_by_classification: (1, 0)} + logging.info("Creating first Slack summarised thread") + slack_thread_ts = self.slack_sender.send_summary_message( + summary_classification_header, + self.finding_summary_header, + self.finding_summary_counts[group_by_classification], + self.params, + platform_enabled + ) + if self.params.grouping.notification_mode.summary.threaded: + self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled) else: slack_thread_ts = self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled) - self.finding_group_heads[summary_classification] = slack_thread_ts - logging.warning(f"Created new Slack thread {slack_thread_ts}") + self.finding_group_heads[summary_classification] = slack_thread_ts + logging.info(f"Created new Slack thread {slack_thread_ts}") - def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple: + def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple[Tuple[str], List[str]]: values = () + descriptions = [] for attr in attributes: if isinstance(attr, str): if attr not in finding_data: logging.warning(f"Notification grouping: tried to group on non-existent attribute {attr}") continue values += (finding_data.get(attr), ) + descriptions.append(f"{attr}={finding_data.get(attr)}") elif isinstance(attr, dict): if list(attr.keys()) not in [["labels"], ["attributes"]]: logging.warning(f"Notification grouping: tried to group on non-existent attribute(s) {attr}") @@ -83,4 +96,11 @@ def classify_finding(self, finding_data: Dict, attributes: List) -> Tuple: finding_data.get(top_level_attr_name, {}).get(subitem_name) for subitem_name in attr[top_level_attr_name] ) - return values + descriptions += [ + "%s=%s"%( + f"{top_level_attr_name}:{subitem_name}", + finding_data.get(top_level_attr_name, {}).get(subitem_name) + ) + for subitem_name in attr[top_level_attr_name] + ] + return values, descriptions diff --git a/src/robusta/integrations/slack/sender.py b/src/robusta/integrations/slack/sender.py index 2a80b629a..f30938b59 100644 --- a/src/robusta/integrations/slack/sender.py +++ b/src/robusta/integrations/slack/sender.py @@ -1,7 +1,7 @@ import logging import ssl import tempfile -from typing import Any, Dict, List, Set +from typing import Any, Dict, List, Set, Tuple import certifi from slack_sdk import WebClient @@ -382,3 +382,31 @@ def send_finding_to_slack( sink_params.get_slack_channel(self.cluster_name, finding.subject.labels, finding.subject.annotations), thread_ts ) + + def send_summary_message( + self, + summary_classification_header: List[str], + finding_summary_header: List[str], + summary_table: Dict[tuple, Tuple[int, int]], + sink_params: SlackSinkParams, + platform_enabled: bool, + msg_ts: str = None # message identifier (for updates) + ): + """Create or update a summary message with tabular information about the amount of events + firing/resolved and a header describing the event group that this information concerns.""" + logging.warning(f"XXX send_summary_table {summary_classification_header} {finding_summary_header} {summary_table}") + + # TODO contents + try: + resp = self.slack_client.chat_postMessage( + # TODO: for the purpose of the summary, we pretend labels and annotations are empty. Is this okay? + channel=sink_params.get_slack_channel(self.cluster_name, {}, {}), + text="A summary message", + # blocks=output_blocks, + display_as_bot=True, + ) + return resp["ts"] + except Exception as e: + logging.error( + f"error sending message to slack\ne={e}\ntext={message}\nchannel={channel}\nblocks={*output_blocks,}\nattachment_blocks={*attachment_blocks,}" + )