diff --git a/acto/cli/collect_system_state.py b/acto/cli/collect_system_state.py new file mode 100644 index 0000000000..59ee9233a4 --- /dev/null +++ b/acto/cli/collect_system_state.py @@ -0,0 +1,58 @@ +import argparse +import logging +import os + +from acto.common import kubernetes_client +from acto.system_state.kubernetes_system_state import KubernetesSystemState + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="Collect the system state of a Kubernetes cluster under a namespace" + " and dump it to a file. Check the health of the system state." + ) + parser.add_argument( + "--output", + required=False, + default="system_state.json", + help="Path to dump the system state to", + ) + parser.add_argument( + "--kubeconfig", + required=False, + default=f"{os.environ['HOME']}/.kube/config", + help="Path to the kubeconfig file", + ) + parser.add_argument( + "--kubecontext", + required=False, + default="kind-kind", + help="Name of the Kubernetes context to use", + ) + parser.add_argument( + "--namespace", + required=False, + default="default", + help="Namespace to collect the system state under", + ) + args = parser.parse_args() + + api_client = kubernetes_client(args.kubeconfig, args.kubecontext) + + system_state = KubernetesSystemState.from_api_client( + api_client, args.namespace + ) + system_state.dump(args.output) + logging.info("System state dumped to %s", args.output) + + health_status = system_state.check_health() + if health_status.is_healthy() is False: + logging.error( + "System state is not healthy with errors: \n%s", + str(health_status), + ) + + +if __name__ == "__main__": + main() diff --git a/acto/system_state/__init__.py b/acto/system_state/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/acto/system_state/cluster_role.py b/acto/system_state/cluster_role.py new file mode 100644 index 0000000000..f786162b6f --- /dev/null +++ b/acto/system_state/cluster_role.py @@ -0,0 +1,34 @@ +"""ClusterRole state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesDictObject, + list_object_helper, +) + + +class ClusterRoleState(KubernetesDictObject): + """ClusterRole state object.""" + + root: dict[str, kubernetes_models.V1ClusterRole] + + @classmethod + def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self: + data = list_object_helper( + kubernetes.client.RbacAuthorizationV1Api( + api_client + ).list_cluster_role, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check ClusterRole health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/cluster_role_binding.py b/acto/system_state/cluster_role_binding.py new file mode 100644 index 0000000000..097ea6fc50 --- /dev/null +++ b/acto/system_state/cluster_role_binding.py @@ -0,0 +1,35 @@ +"""ClusterRoleBinding state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from kubernetes.client.api_client import ApiClient +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesDictObject, + list_object_helper, +) + + +class ClusterRoleBindingState(KubernetesDictObject): + """ClusterRoleBinding state object.""" + + root: dict[str, kubernetes_models.V1ClusterRoleBinding] + + @classmethod + def from_api_client(cls, api_client: ApiClient) -> Self: + data = list_object_helper( + kubernetes.client.RbacAuthorizationV1Api( + api_client + ).list_cluster_role_binding, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check ClusterRoleBinding health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/config_map.py b/acto/system_state/config_map.py new file mode 100644 index 0000000000..bfa9fd45ab --- /dev/null +++ b/acto/system_state/config_map.py @@ -0,0 +1,35 @@ +"""ConfigMap state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class ConfigMapState(KubernetesNamespacedDictObject): + """ConfigMap state object.""" + + root: dict[str, kubernetes_models.V1ConfigMap] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api(api_client).list_namespaced_config_map, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if ConfigMap is healthy""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/cron_job.py b/acto/system_state/cron_job.py new file mode 100644 index 0000000000..b51fc9df75 --- /dev/null +++ b/acto/system_state/cron_job.py @@ -0,0 +1,35 @@ +"""CronJob state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class CronJobState(KubernetesNamespacedDictObject): + """CronJob state object.""" + + root: dict[str, kubernetes_models.V1CronJob] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.BatchV1Api(api_client).list_namespaced_cron_job, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check CronJob health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/daemon_set.py b/acto/system_state/daemon_set.py new file mode 100644 index 0000000000..0f2fef6477 --- /dev/null +++ b/acto/system_state/daemon_set.py @@ -0,0 +1,68 @@ +"""DaemonSet state model""" +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class DaemonSetState(KubernetesNamespacedDictObject): + """DaemonSet state object.""" + + root: dict[str, kubernetes_models.V1DaemonSet] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.AppsV1Api(api_client).list_namespaced_daemon_set, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if DaemonSet is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + for name, daemon_set in self.root.items(): + if ( + daemon_set.status.observed_generation + != daemon_set.metadata.generation + ): + return False, f"DaemonSet[{name}] generation mismatch" + + if ( + daemon_set.status.desired_number_scheduled + != daemon_set.status.number_ready + ): + return ( + False, + f"DaemonSet[{name}] replicas mismatch, " + + f"desired[{daemon_set.status.desired_number_scheduled}] " + + f"!= ready[{daemon_set.status.number_ready}]", + ) + + if daemon_set.status.conditions is not None: + for condition in daemon_set.status.conditions: + if ( + condition.type == "Progressing" + and condition.status != "True" + ): + return ( + False, + f"DaemonSet[{name}] is not progressing: {condition.message}", + ) + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/deployment.py b/acto/system_state/deployment.py new file mode 100644 index 0000000000..76189ae9f1 --- /dev/null +++ b/acto/system_state/deployment.py @@ -0,0 +1,74 @@ +"""Deployment state model""" +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class DeploymentState(KubernetesNamespacedDictObject): + """Deployment state model""" + + root: dict[str, kubernetes_models.V1Deployment] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.AppsV1Api(api_client).list_namespaced_deployment, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if deployment is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + for name, deployment in self.root.items(): + if ( + deployment.status.observed_generation + != deployment.metadata.generation + ): + return False, f"Deployment[{name}] generation mismatch" + + if deployment.spec.replicas != deployment.status.ready_replicas: + return False, f"Deployment[{name}] replicas mismatch" + + if deployment.status.conditions is not None: + for condition in deployment.status.conditions: + if ( + condition.type == "Available" + and condition.status != "True" + ): + return False, f"Deployment[{name}] is not available" + if ( + condition.type == "Progressing" + and condition.status != "True" + ): + return False, f"Deployment[{name}] is not progressing" + + if deployment.status.replicas != deployment.status.ready_replicas: + return False, f"Deployment[{name}] replicas mismatch" + + if ( + deployment.status.unavailable_replicas != 0 + and deployment.status.unavailable_replicas is not None + ): + return ( + False, + f"[{name}] [{deployment.status.unavailable_replicas}] pods are unavailable", + ) + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/endpoints.py b/acto/system_state/endpoints.py new file mode 100644 index 0000000000..f2ae0bc006 --- /dev/null +++ b/acto/system_state/endpoints.py @@ -0,0 +1,35 @@ +"""Endpoints state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class EndpointsState(KubernetesNamespacedDictObject): + """Endpoints state object.""" + + root: dict[str, kubernetes_models.V1Endpoints] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api(api_client).list_namespaced_endpoints, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check Endpoints health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/ingress.py b/acto/system_state/ingress.py new file mode 100644 index 0000000000..ceb3f93a1f --- /dev/null +++ b/acto/system_state/ingress.py @@ -0,0 +1,41 @@ +"""Ingress state model""" +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class IngressState(KubernetesNamespacedDictObject): + """Ingress state object.""" + + root: dict[str, kubernetes_models.V1Ingress] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.NetworkingV1Api( + api_client + ).list_namespaced_ingress, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if Ingress is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/job.py b/acto/system_state/job.py new file mode 100644 index 0000000000..e87c8a0975 --- /dev/null +++ b/acto/system_state/job.py @@ -0,0 +1,35 @@ +"""Job state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class JobState(KubernetesNamespacedDictObject): + """Job state object.""" + + root: dict[str, kubernetes_models.V1Job] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.BatchV1Api(api_client).list_namespaced_job, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check Job health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/kubernetes_object.py b/acto/system_state/kubernetes_object.py new file mode 100644 index 0000000000..9738a235a1 --- /dev/null +++ b/acto/system_state/kubernetes_object.py @@ -0,0 +1,250 @@ +"""Base class for all Kubernetes objects.""" + + +import abc +from typing import Any, Callable, Literal, TypeAlias + +import deepdiff +import deepdiff.model as deepdiff_model +import kubernetes +import kubernetes.client +import kubernetes.client.models as kubernetes_models +import pydantic +from deepdiff.helper import NotPresent +from typing_extensions import Self + +from acto.common import ( + EXCLUDE_PATH_REGEX, + Diff, + PropertyPath, + flatten_dict, + flatten_list, +) + + +class KubernetesResourceInterface(pydantic.BaseModel): + """Helper interface shared by all Kubernetes resources""" + + model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) + + metadata: kubernetes_models.V1ObjectMeta + + +class KubernetesListNamespacedObjectMethodReturnType(abc.ABC): + """Kubernetes list namespaced object method return type""" + + @property + @abc.abstractmethod + def items(self) -> list[KubernetesResourceInterface]: + """Return Kubernetes list namespaced object method items""" + + +KubernetesListNamespacedObjectMethod: TypeAlias = Callable[ + ..., KubernetesListNamespacedObjectMethodReturnType +] + +DiffType: TypeAlias = Literal[ + "type_changes", + "values_changed", + "iterable_item_added", + "iterable_item_removed", + "dictionary_item_added", + "dictionary_item_removed", + "set_item_added", + "set_item_removed", + "attribute_added", + "attribute_removed", +] + +# ObjectDiff: TypeAlias = dict[DiffType, dict[str, Diff]] + + +class ObjectDiff(pydantic.RootModel): + """Object diff, based on TreeView of deepdiff""" + + root: dict[DiffType, dict[str, Diff]] + + @classmethod + def from_deepdiff(cls, diff: deepdiff_model.TreeResult) -> Self: + """Create ObjectDiff from deepdiff.DeepDiff""" + data: dict[TypeAlias, dict[str, Diff]] = {} + + for category, changes in diff.items(): + data[category] = {} + for change in changes: + # Heuristic + # When an entire dict/list is added or removed, flatten this + # dict/list to help field matching and value comparison in oracle + + if (isinstance(change.t1, (dict, list))) and ( + change.t2 is None or isinstance(change.t2, NotPresent) + ): + if isinstance(change.t1, dict): + flattened_changes = flatten_dict(change.t1, []) + else: + flattened_changes = flatten_list(change.t1, []) + for path, value in flattened_changes: + if value is None or isinstance(value, NotPresent): + continue + str_path = change.path() + for i in path: + str_path += f"[{i}]" + data[category][str_path] = Diff( + prev=value, + curr=change.t2, + path=PropertyPath( + change.path(output_format="list") + path + ), + ) + elif (isinstance(change.t2, (dict, list))) and ( + change.t1 is None or isinstance(change.t1, NotPresent) + ): + if isinstance(change.t2, dict): + flattened_changes = flatten_dict(change.t2, []) + else: + flattened_changes = flatten_list(change.t2, []) + for path, value in flattened_changes: + if value is None or isinstance(value, NotPresent): + continue + str_path = change.path() + for i in path: + str_path += f"[{i}]" + data[category][str_path] = Diff( + prev=change.t1, + curr=value, + path=PropertyPath( + change.path(output_format="list") + path + ), + ) + else: + data[category][change.path()] = Diff( + prev=change.t1, + curr=change.t2, + path=PropertyPath(change.path(output_format="list")), + ) + + return cls.model_validate(data) + + +class KubernetesObject(abc.ABC, pydantic.RootModel): + """Base class for all Kubernetes objects.""" + + model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) + + root: Any + + @classmethod + @abc.abstractmethod + def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self: + """Create Kubernetes object from ApiClient""" + raise NotImplementedError() + + def diff_from(self, other: Self) -> ObjectDiff: + """Diff with other Kubernetes object""" + return ObjectDiff.from_deepdiff( + deepdiff.DeepDiff( + self, + other, + exclude_regex_paths=EXCLUDE_PATH_REGEX, + view="tree", + ) + ) + + @abc.abstractmethod + def check_health(self) -> tuple[bool, str]: + """Check if object is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + raise NotImplementedError() + + @pydantic.model_serializer + def serialize(self) -> dict: + """Serialize Kubernetes object""" + raise NotImplementedError() + + +class KubernetesNamespacedObject(KubernetesObject): + """Base class for all Kubernetes namespaced objects.""" + + @classmethod + @abc.abstractmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + """Create Kubernetes object from ApiClient""" + raise NotImplementedError() + + @classmethod + def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self: + return cls.from_api_client_namespaced(api_client, "default") + + +class KubernetesListObject(KubernetesObject): + """Base class for all Kubernetes objects stored as a list.""" + + root: list[Any] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, key: int) -> Any: + return self.root[key] + + +class KubernetesDictObject(KubernetesObject): + """Base class for all Kubernetes objects stored as a dict.""" + + root: dict[str, Any] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, key: str) -> Any: + return self.root[key] + + +class KubernetesNamespacedListObject(KubernetesNamespacedObject): + """Base class for all Kubernetes namespaced objects stored as a list.""" + + root: list[Any] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, key: int) -> Any: + return self.root[key] + + +class KubernetesNamespacedDictObject(KubernetesNamespacedObject): + """Base class for all Kubernetes namespaced objects stored as a dict.""" + + root: dict[str, Any] + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, key: str) -> Any: + return self.root[key] + + +def list_object_helper( + method: KubernetesListNamespacedObjectMethod, +) -> dict[str, KubernetesResourceInterface]: + """List object helper""" + result = {} + for obj in method(watch=False).items: + result[obj.metadata.name] = obj + return result + + +def list_namespaced_object_helper( + method: KubernetesListNamespacedObjectMethod, + namespace: str, +) -> dict[str, KubernetesResourceInterface]: + """List namespaced object helper""" + result = {} + for obj in method(namespace=namespace, watch=False).items: + result[obj.metadata.name] = obj + return result diff --git a/acto/system_state/kubernetes_system_state.py b/acto/system_state/kubernetes_system_state.py new file mode 100644 index 0000000000..7205f9bf94 --- /dev/null +++ b/acto/system_state/kubernetes_system_state.py @@ -0,0 +1,234 @@ +"""Kubernetes system state model""" + +import json + +import kubernetes +import pydantic +from typing_extensions import Self + +from acto.serialization import ActoEncoder +from acto.system_state.cluster_role import ClusterRoleState +from acto.system_state.cluster_role_binding import ClusterRoleBindingState +from acto.system_state.config_map import ConfigMapState +from acto.system_state.cron_job import CronJobState +from acto.system_state.daemon_set import DaemonSetState +from acto.system_state.deployment import DeploymentState +from acto.system_state.endpoints import EndpointsState +from acto.system_state.ingress import IngressState +from acto.system_state.job import JobState +from acto.system_state.kubernetes_object import ObjectDiff +from acto.system_state.network_policy import NetworkPolicyState +from acto.system_state.persistent_volume import PersistentVolumeState +from acto.system_state.persistent_volume_claim import PersistentVolumeClaimState +from acto.system_state.pod import PodState +from acto.system_state.replica_set import ReplicaSetState +from acto.system_state.role import RoleState +from acto.system_state.role_binding import RoleBindingState +from acto.system_state.secret import SecretState +from acto.system_state.service import ServiceState +from acto.system_state.service_account import ServiceAccountState +from acto.system_state.stateful_set import StatefulSetState +from acto.system_state.storage_class import StorageClassState + + +class KubernetesSystemDiff(pydantic.BaseModel): + """Kubernetes system diff model""" + + cluster_role_binding: ObjectDiff + cluster_role: ObjectDiff + config_map: ObjectDiff + cron_job: ObjectDiff + daemon_set: ObjectDiff + deployment: ObjectDiff + endpoint: ObjectDiff + ingress: ObjectDiff + job: ObjectDiff + network_policy: ObjectDiff + persistent_volume_claim: ObjectDiff + persistent_volume: ObjectDiff + pod: ObjectDiff + replica_set: ObjectDiff + role_binding: ObjectDiff + role: ObjectDiff + secret: ObjectDiff + service_account: ObjectDiff + service: ObjectDiff + stateful_set: ObjectDiff + storage_class: ObjectDiff + + +class KubernetesSystemHealth(pydantic.BaseModel): + """Kubernetes system health status model""" + + daemon_set: tuple[bool, str] + deployment: tuple[bool, str] + job: tuple[bool, str] + pod: tuple[bool, str] + replica_set: tuple[bool, str] + stateful_set: tuple[bool, str] + + def is_healthy(self) -> bool: + """Check if Kubernetes system is healthy""" + return all( + [ + self.daemon_set[0], + self.deployment[0], + self.job[0], + self.pod[0], + self.replica_set[0], + self.stateful_set[0], + ] + ) + + def __str__(self) -> str: + ret = "" + if not self.daemon_set[0]: + ret += f"DaemonSet: {self.daemon_set[1]}\n" + if not self.deployment[0]: + ret += f"Deployment: {self.deployment[1]}\n" + if not self.job[0]: + ret += f"Job: {self.job[1]}\n" + if not self.pod[0]: + ret += f"Pod: {self.pod[1]}\n" + if not self.replica_set[0]: + ret += f"ReplicaSet: {self.replica_set[1]}\n" + if not self.stateful_set[0]: + ret += f"StatefulSet: {self.stateful_set[1]}\n" + return ret + + +class KubernetesSystemState(pydantic.BaseModel): + """System state of the cluster, including all Kubernetes resources""" + + cluster_role_binding: ClusterRoleBindingState + cluster_role: ClusterRoleState + config_map: ConfigMapState + cron_job: CronJobState + daemon_set: DaemonSetState + deployment: DeploymentState + endpoint: EndpointsState + ingress: IngressState + job: JobState + network_policy: NetworkPolicyState + persistent_volume_claim: PersistentVolumeClaimState + persistent_volume: PersistentVolumeState + pod: PodState + replica_set: ReplicaSetState + role_binding: RoleBindingState + role: RoleState + secret: SecretState + service_account: ServiceAccountState + service: ServiceState + stateful_set: StatefulSetState + storage_class: StorageClassState + + @classmethod + def from_api_client( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + """Initialize Kubernetes system state by fetching all objects from + Kubernetes API server. + """ + return cls( + cluster_role_binding=ClusterRoleBindingState.from_api_client( + api_client + ), + cluster_role=ClusterRoleState.from_api_client(api_client), + config_map=ConfigMapState.from_api_client_namespaced( + api_client, namespace + ), + cron_job=CronJobState.from_api_client_namespaced( + api_client, namespace + ), + daemon_set=DaemonSetState.from_api_client_namespaced( + api_client, namespace + ), + deployment=DeploymentState.from_api_client_namespaced( + api_client, namespace + ), + endpoint=EndpointsState.from_api_client_namespaced( + api_client, namespace + ), + ingress=IngressState.from_api_client_namespaced( + api_client, namespace + ), + job=JobState.from_api_client_namespaced(api_client, namespace), + network_policy=NetworkPolicyState.from_api_client_namespaced( + api_client, namespace + ), + persistent_volume_claim=PersistentVolumeClaimState.from_api_client_namespaced( + api_client, namespace + ), + persistent_volume=PersistentVolumeState.from_api_client(api_client), + pod=PodState.from_api_client_namespaced(api_client, namespace), + replica_set=ReplicaSetState.from_api_client_namespaced( + api_client, namespace + ), + role_binding=RoleBindingState.from_api_client_namespaced( + api_client, namespace + ), + role=RoleState.from_api_client_namespaced(api_client, namespace), + secret=SecretState.from_api_client_namespaced( + api_client, namespace + ), + service_account=ServiceAccountState.from_api_client_namespaced( + api_client, namespace + ), + service=ServiceState.from_api_client_namespaced( + api_client, namespace + ), + stateful_set=StatefulSetState.from_api_client_namespaced( + api_client, namespace + ), + storage_class=StorageClassState.from_api_client(api_client), + ) + + def diff_from(self, other: Self) -> KubernetesSystemDiff: + """Diff with other Kubernetes system state""" + return KubernetesSystemDiff( + cluster_role_binding=self.cluster_role_binding.diff_from( + other.cluster_role_binding + ), + cluster_role=self.cluster_role.diff_from(other.cluster_role), + config_map=self.config_map.diff_from(other.config_map), + cron_job=self.cron_job.diff_from(other.cron_job), + daemon_set=self.daemon_set.diff_from(other.daemon_set), + deployment=self.deployment.diff_from(other.deployment), + endpoint=self.endpoint.diff_from(other.endpoint), + ingress=self.ingress.diff_from(other.ingress), + job=self.job.diff_from(other.job), + network_policy=self.network_policy.diff_from(other.network_policy), + persistent_volume_claim=self.persistent_volume_claim.diff_from( + other.persistent_volume_claim + ), + persistent_volume=self.persistent_volume.diff_from( + other.persistent_volume + ), + pod=self.pod.diff_from(other.pod), + replica_set=self.replica_set.diff_from(other.replica_set), + role_binding=self.role_binding.diff_from(other.role_binding), + role=self.role.diff_from(other.role), + secret=self.secret.diff_from(other.secret), + service_account=self.service_account.diff_from( + other.service_account + ), + service=self.service.diff_from(other.service), + stateful_set=self.stateful_set.diff_from(other.stateful_set), + storage_class=self.storage_class.diff_from(other.storage_class), + ) + + def dump(self, path: str) -> None: + """Dump Kubernetes system state to a file""" + with open(path, "w", encoding="utf-8") as file: + json.dump(self.model_dump(), file, indent=4, cls=ActoEncoder) + + def check_health(self) -> KubernetesSystemHealth: + """Check if Kubernetes system state is healthy""" + return KubernetesSystemHealth( + daemon_set=self.daemon_set.check_health(), + deployment=self.deployment.check_health(), + job=self.job.check_health(), + pod=self.pod.check_health(), + replica_set=self.replica_set.check_health(), + stateful_set=self.stateful_set.check_health(), + ) diff --git a/acto/system_state/network_policy.py b/acto/system_state/network_policy.py new file mode 100644 index 0000000000..9b11c1bcb6 --- /dev/null +++ b/acto/system_state/network_policy.py @@ -0,0 +1,37 @@ +"""NetworkPolicy state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class NetworkPolicyState(KubernetesNamespacedDictObject): + """NetworkPolicy state object.""" + + root: dict[str, kubernetes_models.V1NetworkPolicy] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.NetworkingV1Api( + api_client + ).list_namespaced_network_policy, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check NetworkPolicy health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/persistent_volume.py b/acto/system_state/persistent_volume.py new file mode 100644 index 0000000000..5012bd7560 --- /dev/null +++ b/acto/system_state/persistent_volume.py @@ -0,0 +1,32 @@ +"""PersistentVolume state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesDictObject, + list_object_helper, +) + + +class PersistentVolumeState(KubernetesDictObject): + """PersistentVolume state object.""" + + root: dict[str, kubernetes_models.V1PersistentVolume] + + @classmethod + def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self: + data = list_object_helper( + kubernetes.client.CoreV1Api(api_client).list_persistent_volume, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if PersistentVolume is healthy""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/persistent_volume_claim.py b/acto/system_state/persistent_volume_claim.py new file mode 100644 index 0000000000..3cbee72adc --- /dev/null +++ b/acto/system_state/persistent_volume_claim.py @@ -0,0 +1,37 @@ +"""PersistentVolumeClaim state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class PersistentVolumeClaimState(KubernetesNamespacedDictObject): + """PersistentVolumeClaim state object.""" + + root: dict[str, kubernetes_models.V1PersistentVolumeClaim] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api( + api_client + ).list_namespaced_persistent_volume_claim, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check health of PersistentVolumeClaim""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/pod.py b/acto/system_state/pod.py new file mode 100644 index 0000000000..0b47babef5 --- /dev/null +++ b/acto/system_state/pod.py @@ -0,0 +1,64 @@ +"""Pod state model""" +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class PodState(KubernetesNamespacedDictObject): + """Pod state model""" + + root: dict[str, kubernetes_models.V1Pod] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api(api_client).list_namespaced_pod, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if pod is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + for name, pod in self.root.items(): + if pod.status.conditions is not None: + for condition in pod.status.conditions: + if condition.type == "Ready" and condition.status != "True": + return ( + False, + f"Pod[{name}] is not ready: {condition.message}", + ) + + if pod.status.container_statuses is not None: + for container_status in pod.status.container_statuses: + if container_status.ready is not True: + return ( + False, + f"Container {container_status.name} is not ready", + ) + + if pod.status.init_container_statuses is not None: + for container_status in pod.status.init_container_statuses: + if container_status.ready is not True: + return ( + False, + f"Init container {container_status.name} is not ready", + ) + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/replica_set.py b/acto/system_state/replica_set.py new file mode 100644 index 0000000000..7cf40816ef --- /dev/null +++ b/acto/system_state/replica_set.py @@ -0,0 +1,56 @@ +"""ReplicaSet state model.""" +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class ReplicaSetState(KubernetesNamespacedDictObject): + """ReplicaSet state object.""" + + root: dict[str, kubernetes_models.V1ReplicaSet] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.AppsV1Api(api_client).list_namespaced_replica_set, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if ReplicaSet is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + for name, replica_set in self.root.items(): + if ( + replica_set.status.observed_generation + != replica_set.metadata.generation + ): + return False, f"ReplicaSet[{name}] generation mismatch" + + if replica_set.spec.replicas != replica_set.status.ready_replicas: + return False, f"ReplicaSet[{name}] replicas mismatch" + + if replica_set.status.conditions is not None: + for condition in replica_set.status.conditions: + if ( + condition.type == "Available" + and condition.status != "True" + ): + return False, f"ReplicaSet[{name}] is not available" + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/role.py b/acto/system_state/role.py new file mode 100644 index 0000000000..ff63b60f31 --- /dev/null +++ b/acto/system_state/role.py @@ -0,0 +1,37 @@ +"""Role state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class RoleState(KubernetesNamespacedDictObject): + """Role state object.""" + + root: dict[str, kubernetes_models.V1Role] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.RbacAuthorizationV1Api( + api_client + ).list_namespaced_role, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check Role health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/role_binding.py b/acto/system_state/role_binding.py new file mode 100644 index 0000000000..19b6f5eccc --- /dev/null +++ b/acto/system_state/role_binding.py @@ -0,0 +1,37 @@ +"""RoleBinding state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class RoleBindingState(KubernetesNamespacedDictObject): + """RoleBinding state object.""" + + root: dict[str, kubernetes_models.V1RoleBinding] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.RbacAuthorizationV1Api( + api_client + ).list_namespaced_role_binding, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check RoleBinding health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/secret.py b/acto/system_state/secret.py new file mode 100644 index 0000000000..57e44ece7f --- /dev/null +++ b/acto/system_state/secret.py @@ -0,0 +1,35 @@ +"""Secret state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class SecretState(KubernetesNamespacedDictObject): + """Secret state object.""" + + root: dict[str, kubernetes_models.V1Secret] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api(api_client).list_namespaced_secret, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if Secret is healthy""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/service.py b/acto/system_state/service.py new file mode 100644 index 0000000000..b31eda30b3 --- /dev/null +++ b/acto/system_state/service.py @@ -0,0 +1,40 @@ +"""Service state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class ServiceState(KubernetesNamespacedDictObject): + """Service state object.""" + + root: dict[str, kubernetes_models.V1Service] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api(api_client).list_namespaced_service, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if Service is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/service_account.py b/acto/system_state/service_account.py new file mode 100644 index 0000000000..44441bb4b0 --- /dev/null +++ b/acto/system_state/service_account.py @@ -0,0 +1,37 @@ +"""ServiceAccount state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class ServiceAccountState(KubernetesNamespacedDictObject): + """ServiceAccount state object.""" + + root: dict[str, kubernetes_models.V1ServiceAccount] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.CoreV1Api( + api_client + ).list_namespaced_service_account, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check ServiceAccount health""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/stateful_set.py b/acto/system_state/stateful_set.py new file mode 100644 index 0000000000..7af2410323 --- /dev/null +++ b/acto/system_state/stateful_set.py @@ -0,0 +1,71 @@ +"""StatefulSet state model.""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesNamespacedDictObject, + list_namespaced_object_helper, +) + + +class StatefulSetState(KubernetesNamespacedDictObject): + """StatefulSet state object.""" + + root: dict[str, kubernetes_models.V1StatefulSet] + + @classmethod + def from_api_client_namespaced( + cls, api_client: kubernetes.client.ApiClient, namespace: str + ) -> Self: + data = list_namespaced_object_helper( + kubernetes.client.AppsV1Api( + api_client + ).list_namespaced_stateful_set, + namespace, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check if StatefulSet is healthy + + Returns: + tuple[bool, str]: (is_healthy, reason) + """ + + for name, stateful_set in self.root.items(): + if ( + stateful_set.status.observed_generation + != stateful_set.metadata.generation + ): + return False, f"StatefulSet[{name}] generation mismatch" + + if ( + stateful_set.status.current_revision + != stateful_set.status.update_revision + ): + return ( + False, + f"StatefulSet[{name}] revision mismatch" + + f"current[{stateful_set.status.current_revision}] " + + f"!= update[{stateful_set.status.update_revision}]", + ) + + if stateful_set.spec.replicas != stateful_set.status.ready_replicas: + return False, f"StatefulSet[{name}] replicas mismatch" + + if stateful_set.status.conditions is not None: + for condition in stateful_set.status.conditions: + if condition.type == "Ready" and condition.status != "True": + return ( + False, + f"StatefulSet[{name}] is not ready: {condition.message}", + ) + + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/acto/system_state/storage_class.py b/acto/system_state/storage_class.py new file mode 100644 index 0000000000..02b3cba3fb --- /dev/null +++ b/acto/system_state/storage_class.py @@ -0,0 +1,32 @@ +"""StorageClass state model""" + +import kubernetes +import kubernetes.client.models as kubernetes_models +import pydantic +from typing_extensions import Self + +from acto.system_state.kubernetes_object import ( + KubernetesDictObject, + list_object_helper, +) + + +class StorageClassState(KubernetesDictObject): + """StorageClass state object.""" + + root: dict[str, kubernetes_models.V1StorageClass] + + @classmethod + def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self: + data = list_object_helper( + kubernetes.client.StorageV1Api(api_client).list_storage_class, + ) + return cls.model_validate(data) + + def check_health(self) -> tuple[bool, str]: + """Check health of StorageClass""" + return True, "" + + @pydantic.model_serializer + def serialize(self): + return {key: value.to_dict() for key, value in self.root.items()} diff --git a/test/integration_tests/test_kubernetes_system_state.py b/test/integration_tests/test_kubernetes_system_state.py new file mode 100644 index 0000000000..34707b1cc8 --- /dev/null +++ b/test/integration_tests/test_kubernetes_system_state.py @@ -0,0 +1,66 @@ +"""Integration tests for Kubernetes system state collection.""" +import os +import pathlib +import tempfile +import unittest + +from acto.common import kubernetes_client +from acto.kubernetes_engine import kind +from acto.system_state.kubernetes_system_state import KubernetesSystemState + +test_dir = pathlib.Path(__file__).parent.resolve() +test_data_dir = os.path.join(test_dir, "test_data") + + +class TestKubernetesSystemState(unittest.TestCase): + """test Kubernetes system state collection.""" + + def setUp(self): + config_path = os.path.join(os.path.expanduser("~"), ".kube/test-config") + name = "test-cluster" + num_nodes = 1 + version = "v1.26.0" + + cluster_instance = kind.Kind(acto_namespace=0) + + cluster_instance.configure_cluster(num_nodes, version) + print( + f"Creating cluster {name} with {num_nodes} nodes, version {version}, " + + f"configPath {config_path}" + ) + cluster_instance.create_cluster(name, config_path) + + self.kubeconfig = config_path + self.cluster_name = name + self.cluster_instance = cluster_instance + + def tearDown(self): + self.cluster_instance.delete_cluster(self.cluster_name, self.kubeconfig) + + def test_collect_and_serialization(self): + """Test collect and serialization of Kubernetes system state.""" + + api_client = kubernetes_client( + self.kubeconfig, + self.cluster_instance.get_context_name(self.cluster_name), + ) + + # check collection works + state = KubernetesSystemState.from_api_client(api_client, "kube-system") + assert "kindnet" in state.daemon_set + assert "kube-proxy" in state.daemon_set + assert "coredns" in state.deployment + assert "kube-dns" in state.service + assert "standard" in state.storage_class + assert "coredns" in state.config_map + assert "admin" in state.cluster_role + assert "cluster-admin" in state.cluster_role_binding + + # check serialization works + with tempfile.TemporaryDirectory() as tempdir: + filepath = os.path.join(tempdir, "system_state.json") + state.dump(filepath) + + +if __name__ == "__main__": + unittest.main()