-
Notifications
You must be signed in to change notification settings - Fork 263
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Implemented Incident.io Sink (#1645)
* feat: Implemented Incident.io Sink * feat: Remove unnecessary links.append * feat: Tidy up * feat: Pass IncidentioSinkConfigWrapper to the Sink --------- Co-authored-by: arik <[email protected]>
- Loading branch information
Showing
6 changed files
with
195 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from robusta.core.sinks.incidentio.incidentio_sink import IncidentioSink | ||
from robusta.core.sinks.incidentio.incidentio_sink_params import IncidentioSinkConfigWrapper, IncidentioSinkParams |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from urllib.parse import urljoin | ||
|
||
class AlertEventsApi: | ||
""" | ||
Class to interact with the incident.io alert events API. | ||
https://api-docs.incident.io/tag/Alert-Events-V2 | ||
""" | ||
|
||
# API Endpoint | ||
_endpoint = 'alert_events/http' | ||
|
||
def __init__ (self, base_url: str, source_config_id: str): | ||
self.base_url = base_url | ||
self.source_config_id = source_config_id | ||
|
||
def build_url(self) -> str: | ||
""" | ||
Build the full URL for the change_events API. | ||
""" | ||
return urljoin(self.base_url, f'{self._endpoint}/{self.source_config_id}') | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import requests | ||
import json | ||
|
||
class IncidentIoClient: | ||
def __init__(self, base_url: str, token: str): | ||
self.base_url = base_url | ||
self.headers = { | ||
'Authorization': f'Bearer {token}', | ||
'Content-Type': 'application/json' | ||
} | ||
|
||
def request(self, method: str, url: str, payload: dict) -> requests.Response: | ||
""" | ||
Perform an HTTP request to the Incident.io API. | ||
""" | ||
response = requests.request(method, url, headers=self.headers, data=json.dumps(payload)) | ||
|
||
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import logging | ||
from typing import Optional, Dict, List, Any | ||
from robusta.core.sinks.incidentio.incidentio_client import IncidentIoClient | ||
from robusta.core.sinks.incidentio.incidentio_sink_params import IncidentioSinkParams, IncidentioSinkConfigWrapper | ||
from robusta.core.sinks.incidentio.incidentio_api import AlertEventsApi | ||
from robusta.core.sinks.sink_base import SinkBase | ||
|
||
from robusta.core.reporting.base import BaseBlock, Finding, FindingSeverity, Enrichment, Link, LinkType | ||
from robusta.core.reporting.blocks import ( | ||
HeaderBlock, | ||
JsonBlock, | ||
LinksBlock, | ||
ListBlock, | ||
MarkdownBlock, | ||
TableBlock, | ||
KubernetesDiffBlock, | ||
) | ||
|
||
|
||
class IncidentioSink(SinkBase): | ||
params: IncidentioSinkParams | ||
|
||
def __init__(self, sink_config: IncidentioSinkConfigWrapper, registry): | ||
super().__init__(sink_config.incidentio_sink, registry) | ||
self.source_config_id = sink_config.incidentio_sink.source_config_id | ||
self.client = IncidentIoClient( | ||
base_url=sink_config.incidentio_sink.base_url, | ||
token=sink_config.incidentio_sink.token | ||
) | ||
|
||
@staticmethod | ||
def __to_incidentio_status_type(title: str) -> str: | ||
# Map finding title to incident.io status | ||
if title.startswith("[RESOLVED]"): | ||
return "resolved" | ||
return "firing" | ||
|
||
|
||
def __send_event_to_incidentio(self, finding: Finding, platform_enabled: bool) -> dict: | ||
metadata: Dict[str, Any] = {} | ||
links: List[Dict[str, str]] = [] | ||
|
||
# Add Robusta links if platform is enabled | ||
if platform_enabled: | ||
links.append( | ||
{ | ||
"text": "🔎 Investigate in Robusta", | ||
"href": finding.get_investigate_uri(self.account_id, self.cluster_name), | ||
} | ||
) | ||
|
||
# Collect metadata | ||
metadata["resource"] = finding.subject.name | ||
metadata["namespace"] = finding.subject.namespace | ||
metadata["cluster"] = self.cluster_name | ||
metadata["severity"] = finding.severity.name | ||
metadata["description"] = finding.description or "" | ||
metadata["source"] = finding.source.name | ||
metadata["fingerprint_id"] = finding.fingerprint | ||
|
||
# Convert blocks to metadata | ||
for enrichment in finding.enrichments: | ||
for block in enrichment.blocks: | ||
text = self.__to_unformatted_text(block) | ||
if text: | ||
metadata["additional_info"] = metadata.get("additional_info", "") + text + "\n" | ||
|
||
return { | ||
"deduplication_key": finding.fingerprint, | ||
"title": finding.title, | ||
"description": finding.description or "No description provided.", | ||
"status": self.__to_incidentio_status_type(finding.title), | ||
"metadata": metadata, | ||
"source_url": finding.get_investigate_uri(self.account_id, self.cluster_name), | ||
"links": links, | ||
} | ||
|
||
def write_finding(self, finding: Finding, platform_enabled: bool) -> None: | ||
payload = self.__send_event_to_incidentio(finding, platform_enabled) | ||
|
||
response = self.client.request( | ||
"POST", | ||
AlertEventsApi(self.client.base_url, self.source_config_id).build_url(), | ||
payload | ||
) | ||
|
||
if not response.ok: | ||
logging.error( | ||
f"Error sending alert to Incident.io: {response.status_code}, {response.text}" | ||
) | ||
|
||
@staticmethod | ||
def __to_unformatted_text(block: BaseBlock) -> Optional[str]: | ||
if isinstance(block, HeaderBlock): | ||
return block.text | ||
elif isinstance(block, TableBlock): | ||
return block.to_table_string() | ||
elif isinstance(block, ListBlock): | ||
return "\n".join(block.items) | ||
elif isinstance(block, MarkdownBlock): | ||
return block.text | ||
elif isinstance(block, JsonBlock): | ||
return block.json_str | ||
elif isinstance(block, KubernetesDiffBlock): | ||
return "\n".join(diff.formatted_path for diff in block.diffs) | ||
return None |
44 changes: 44 additions & 0 deletions
44
src/robusta/core/sinks/incidentio/incidentio_sink_params.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import re | ||
from typing import Optional | ||
from urllib.parse import urlparse | ||
from robusta.core.playbooks.playbook_utils import get_env_replacement | ||
|
||
from pydantic import validator | ||
|
||
from robusta.core.sinks.sink_base_params import SinkBaseParams | ||
from robusta.core.sinks.sink_config import SinkConfigBase | ||
|
||
class IncidentioSinkParams(SinkBaseParams): | ||
base_url: Optional[str] = "https://api.incident.io/v2/" | ||
token: str | ||
source_config_id: str | ||
|
||
@classmethod | ||
def _get_sink_type(cls): | ||
return "incidentio" | ||
|
||
@validator("base_url") | ||
def validate_base_url(cls, base_url): | ||
parsed_url = urlparse(base_url) | ||
if not parsed_url.scheme or not parsed_url.netloc: | ||
raise ValueError(f"Invalid base_url: {base_url}. It must include a scheme and netloc (e.g., https://api.incident.io).") | ||
return base_url | ||
|
||
@validator("source_config_id") | ||
def validate_source_config_id(cls, source_config_id): | ||
""" | ||
Ensures source_config_id matches the expected format. | ||
""" | ||
pattern = r"^[A-Z0-9]{26}$" | ||
source_config_id = get_env_replacement(source_config_id) | ||
if not re.match(pattern, source_config_id): | ||
raise ValueError( | ||
f"Invalid source_config_id: {source_config_id}. It must be a 26-character string of uppercase letters and digits." | ||
) | ||
return source_config_id | ||
|
||
class IncidentioSinkConfigWrapper(SinkConfigBase): | ||
incidentio_sink: IncidentioSinkParams | ||
|
||
def get_params(self) -> SinkBaseParams: | ||
return self.incidentio_sink |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters