Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented Incident.io Sink #1645

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/robusta/core/sinks/incidentio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from robusta.core.sinks.incidentio.incidentio_sink import IncidentioSink
from robusta.core.sinks.incidentio.incidentio_sink_params import IncidentioSinkConfigWrapper, IncidentioSinkParams
21 changes: 21 additions & 0 deletions src/robusta/core/sinks/incidentio/incidentio_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from urllib.parse import urljoin

class AlertEventsApi:
"""
Class to interact with the incident.io alert events API.
https://api-docs.incident.io/tag/Alert-Events-V2
"""

# API Endpoint
_endpoint = 'alert_events/http'

def __init__ (self, base_url: str, source_config_id: str):
self.base_url = base_url
self.source_config_id = source_config_id

def build_url(self) -> str:
"""
Build the full URL for the change_events API.
"""
return urljoin(self.base_url, f'{self._endpoint}/{self.source_config_id}')

18 changes: 18 additions & 0 deletions src/robusta/core/sinks/incidentio/incidentio_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import requests
import json

class IncidentIoClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}

def request(self, method: str, url: str, payload: dict) -> requests.Response:
"""
Perform an HTTP request to the Incident.io API.
"""
response = requests.request(method, url, headers=self.headers, data=json.dumps(payload))

return response
115 changes: 115 additions & 0 deletions src/robusta/core/sinks/incidentio/incidentio_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from typing import Optional, Dict, List, Any
from urllib.parse import urljoin
from robusta.core.sinks.incidentio.incidentio_client import IncidentIoClient
from robusta.core.sinks.incidentio.incidentio_api import AlertEventsApi

import requests

from robusta.core.reporting.base import BaseBlock, Finding, FindingSeverity, Enrichment, Link, LinkType
from robusta.core.reporting.blocks import (
HeaderBlock,
JsonBlock,
LinksBlock,
ListBlock,
MarkdownBlock,
TableBlock,
KubernetesDiffBlock,
)
from robusta.core.sinks.sink_base import SinkBase
from robusta.core.sinks.sink_config import SinkConfigBase
from robusta.core.sinks.incidentio.incidentio_sink_params import IncidentioSinkParams


class IncidentioSink(SinkBase):
def __init__(self, sink_config: SinkConfigBase, registry):
igorhrcek marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(sink_config.incidentio_sink, registry)
self.source_config_id = sink_config.incidentio_sink.source_config_id
self.client = IncidentIoClient(
base_url=sink_config.incidentio_sink.base_url,
token=sink_config.incidentio_sink.token
)

@staticmethod
def __to_incidentio_status_type(title: str) -> str:
# Map finding title to incident.io status
if title.startswith("[RESOLVED]"):
return "resolved"
return "firing"


def __send_event_to_incidentio(self, finding: Finding, platform_enabled: bool) -> dict:
metadata: Dict[str, Any] = {}
links: List[Dict[str, str]] = []

# Add Robusta links if platform is enabled
if platform_enabled:
links.append(
{
"text": "🔎 Investigate in Robusta",
"href": finding.get_investigate_uri(self.account_id, self.cluster_name),
}
)
else:
links.append(
{
"text": "🔎 Enable Robusta UI to investigate",
"href": "https://bit.ly/robusta-ui-incidentio",
igorhrcek marked this conversation as resolved.
Show resolved Hide resolved
}
)

# Collect metadata
metadata["resource"] = finding.subject.name
metadata["namespace"] = finding.subject.namespace
metadata["cluster"] = self.cluster_name
metadata["severity"] = finding.severity.name
metadata["description"] = finding.description or ""
metadata["source"] = finding.source.name
metadata["fingerprint_id"] = finding.fingerprint

# Convert blocks to metadata
for enrichment in finding.enrichments:
for block in enrichment.blocks:
text = self.__to_unformatted_text(block)
if text:
metadata["additional_info"] = metadata.get("additional_info", "") + text + "\n"

return {
"deduplication_key": finding.fingerprint,
"title": finding.title,
"description": finding.description or "No description provided.",
"status": self.__to_incidentio_status_type(finding.title),
"metadata": metadata,
"source_url": finding.get_investigate_uri(self.account_id, self.cluster_name),
"links": links,
}

def write_finding(self, finding: Finding, platform_enabled: bool) -> None:
payload = self.__send_event_to_incidentio(finding, platform_enabled)

response = self.client.request(
"POST",
AlertEventsApi(self.client.base_url, self.source_config_id).build_url(),
payload
)

if not response.ok:
logging.error(
f"Error sending alert to Incident.io: {response.status_code}, {response.text}"
)

@staticmethod
def __to_unformatted_text(block: BaseBlock) -> Optional[str]:
if isinstance(block, HeaderBlock):
return block.text
elif isinstance(block, TableBlock):
return block.to_table_string()
elif isinstance(block, ListBlock):
return "\n".join(block.items)
elif isinstance(block, MarkdownBlock):
return block.text
elif isinstance(block, JsonBlock):
return block.json_str
elif isinstance(block, KubernetesDiffBlock):
return "\n".join(diff.formatted_path for diff in block.diffs)
return None
45 changes: 45 additions & 0 deletions src/robusta/core/sinks/incidentio/incidentio_sink_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import re
from typing import Optional
from urllib.parse import urlparse
from robusta.core.playbooks.playbook_utils import get_env_replacement

from pydantic import validator

from robusta.core.sinks.sink_base_params import SinkBaseParams
from robusta.core.sinks.sink_config import SinkConfigBase

class IncidentioSinkParams(SinkBaseParams):
base_url: Optional[str] = "https://api.incident.io/v2/"
alert_events_api: Optional[str] = "alert_events/http"
token: str
source_config_id: str

@classmethod
def _get_sink_type(cls):
return "incidentio"

@validator("base_url")
def validate_base_url(cls, base_url):
parsed_url = urlparse(base_url)
if not parsed_url.scheme or not parsed_url.netloc:
raise ValueError(f"Invalid base_url: {base_url}. It must include a scheme and netloc (e.g., https://api.incident.io).")
return base_url

@validator("source_config_id")
def validate_source_config_id(cls, source_config_id):
"""
Ensures source_config_id matches the expected format.
"""
pattern = r"^[A-Z0-9]{26}$"
source_config_id = get_env_replacement(source_config_id)
if not re.match(pattern, source_config_id):
raise ValueError(
f"Invalid source_config_id: {source_config_id}. It must be a 26-character string of uppercase letters and digits."
)
return source_config_id

class IncidentioSinkConfigWrapper(SinkConfigBase):
incidentio_sink: IncidentioSinkParams

def get_params(self) -> SinkBaseParams:
return self.incidentio_sink
5 changes: 4 additions & 1 deletion src/robusta/core/sinks/sink_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from robusta.core.sinks.yamessenger import YaMessengerSink, YaMessengerSinkConfigWrapper
from robusta.core.sinks.pushover import PushoverSink, PushoverSinkConfigWrapper
from robusta.core.sinks.zulip import ZulipSink, ZulipSinkConfigWrapper
from robusta.core.sinks.incidentio.incidentio_sink import IncidentioSink
from robusta.core.sinks.incidentio.incidentio_sink_params import IncidentioSinkConfigWrapper

class SinkFactory:
__sink_config_mapping: Dict[Type[SinkConfigBase], Type[SinkBase]] = {
Expand All @@ -53,7 +55,8 @@ class SinkFactory:
PushoverSinkConfigWrapper: PushoverSink,
GoogleChatSinkConfigWrapper: GoogleChatSink,
ServiceNowSinkConfigWrapper: ServiceNowSink,
ZulipSinkConfigWrapper: ZulipSink
ZulipSinkConfigWrapper: ZulipSink,
IncidentioSinkConfigWrapper: IncidentioSink
}

@classmethod
Expand Down