Skip to content

Commit

Permalink
Prepare system state interface and collection (#313)
Browse files Browse the repository at this point in the history
* Prepare system state interface and collection

Signed-off-by: Tyler Gu <[email protected]>

* Fix system state test

Signed-off-by: Tyler Gu <[email protected]>

* Fix temporary file creation in test

Signed-off-by: Tyler Gu <[email protected]>

---------

Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu authored Jan 29, 2024
1 parent e48cae8 commit 28ea844
Show file tree
Hide file tree
Showing 26 changed files with 1,515 additions and 0 deletions.
58 changes: 58 additions & 0 deletions acto/cli/collect_system_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import logging
import os

from acto.common import kubernetes_client
from acto.system_state.kubernetes_system_state import KubernetesSystemState


def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Collect the system state of a Kubernetes cluster under a namespace"
" and dump it to a file. Check the health of the system state."
)
parser.add_argument(
"--output",
required=False,
default="system_state.json",
help="Path to dump the system state to",
)
parser.add_argument(
"--kubeconfig",
required=False,
default=f"{os.environ['HOME']}/.kube/config",
help="Path to the kubeconfig file",
)
parser.add_argument(
"--kubecontext",
required=False,
default="kind-kind",
help="Name of the Kubernetes context to use",
)
parser.add_argument(
"--namespace",
required=False,
default="default",
help="Namespace to collect the system state under",
)
args = parser.parse_args()

api_client = kubernetes_client(args.kubeconfig, args.kubecontext)

system_state = KubernetesSystemState.from_api_client(
api_client, args.namespace
)
system_state.dump(args.output)
logging.info("System state dumped to %s", args.output)

health_status = system_state.check_health()
if health_status.is_healthy() is False:
logging.error(
"System state is not healthy with errors: \n%s",
str(health_status),
)


if __name__ == "__main__":
main()
Empty file added acto/system_state/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions acto/system_state/cluster_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""ClusterRole state model"""

import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesDictObject,
list_object_helper,
)


class ClusterRoleState(KubernetesDictObject):
"""ClusterRole state object."""

root: dict[str, kubernetes_models.V1ClusterRole]

@classmethod
def from_api_client(cls, api_client: kubernetes.client.ApiClient) -> Self:
data = list_object_helper(
kubernetes.client.RbacAuthorizationV1Api(
api_client
).list_cluster_role,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check ClusterRole health"""
return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
35 changes: 35 additions & 0 deletions acto/system_state/cluster_role_binding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""ClusterRoleBinding state model"""

import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from kubernetes.client.api_client import ApiClient
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesDictObject,
list_object_helper,
)


class ClusterRoleBindingState(KubernetesDictObject):
"""ClusterRoleBinding state object."""

root: dict[str, kubernetes_models.V1ClusterRoleBinding]

@classmethod
def from_api_client(cls, api_client: ApiClient) -> Self:
data = list_object_helper(
kubernetes.client.RbacAuthorizationV1Api(
api_client
).list_cluster_role_binding,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check ClusterRoleBinding health"""
return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
35 changes: 35 additions & 0 deletions acto/system_state/config_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""ConfigMap state model"""

import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesNamespacedDictObject,
list_namespaced_object_helper,
)


class ConfigMapState(KubernetesNamespacedDictObject):
"""ConfigMap state object."""

root: dict[str, kubernetes_models.V1ConfigMap]

@classmethod
def from_api_client_namespaced(
cls, api_client: kubernetes.client.ApiClient, namespace: str
) -> Self:
data = list_namespaced_object_helper(
kubernetes.client.CoreV1Api(api_client).list_namespaced_config_map,
namespace,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check if ConfigMap is healthy"""
return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
35 changes: 35 additions & 0 deletions acto/system_state/cron_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""CronJob state model"""

import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesNamespacedDictObject,
list_namespaced_object_helper,
)


class CronJobState(KubernetesNamespacedDictObject):
"""CronJob state object."""

root: dict[str, kubernetes_models.V1CronJob]

@classmethod
def from_api_client_namespaced(
cls, api_client: kubernetes.client.ApiClient, namespace: str
) -> Self:
data = list_namespaced_object_helper(
kubernetes.client.BatchV1Api(api_client).list_namespaced_cron_job,
namespace,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check CronJob health"""
return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
68 changes: 68 additions & 0 deletions acto/system_state/daemon_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""DaemonSet state model"""
import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesNamespacedDictObject,
list_namespaced_object_helper,
)


class DaemonSetState(KubernetesNamespacedDictObject):
"""DaemonSet state object."""

root: dict[str, kubernetes_models.V1DaemonSet]

@classmethod
def from_api_client_namespaced(
cls, api_client: kubernetes.client.ApiClient, namespace: str
) -> Self:
data = list_namespaced_object_helper(
kubernetes.client.AppsV1Api(api_client).list_namespaced_daemon_set,
namespace,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check if DaemonSet is healthy
Returns:
tuple[bool, str]: (is_healthy, reason)
"""

for name, daemon_set in self.root.items():
if (
daemon_set.status.observed_generation
!= daemon_set.metadata.generation
):
return False, f"DaemonSet[{name}] generation mismatch"

if (
daemon_set.status.desired_number_scheduled
!= daemon_set.status.number_ready
):
return (
False,
f"DaemonSet[{name}] replicas mismatch, "
+ f"desired[{daemon_set.status.desired_number_scheduled}] "
+ f"!= ready[{daemon_set.status.number_ready}]",
)

if daemon_set.status.conditions is not None:
for condition in daemon_set.status.conditions:
if (
condition.type == "Progressing"
and condition.status != "True"
):
return (
False,
f"DaemonSet[{name}] is not progressing: {condition.message}",
)

return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
74 changes: 74 additions & 0 deletions acto/system_state/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Deployment state model"""
import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesNamespacedDictObject,
list_namespaced_object_helper,
)


class DeploymentState(KubernetesNamespacedDictObject):
"""Deployment state model"""

root: dict[str, kubernetes_models.V1Deployment]

@classmethod
def from_api_client_namespaced(
cls, api_client: kubernetes.client.ApiClient, namespace: str
) -> Self:
data = list_namespaced_object_helper(
kubernetes.client.AppsV1Api(api_client).list_namespaced_deployment,
namespace,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check if deployment is healthy
Returns:
tuple[bool, str]: (is_healthy, reason)
"""

for name, deployment in self.root.items():
if (
deployment.status.observed_generation
!= deployment.metadata.generation
):
return False, f"Deployment[{name}] generation mismatch"

if deployment.spec.replicas != deployment.status.ready_replicas:
return False, f"Deployment[{name}] replicas mismatch"

if deployment.status.conditions is not None:
for condition in deployment.status.conditions:
if (
condition.type == "Available"
and condition.status != "True"
):
return False, f"Deployment[{name}] is not available"
if (
condition.type == "Progressing"
and condition.status != "True"
):
return False, f"Deployment[{name}] is not progressing"

if deployment.status.replicas != deployment.status.ready_replicas:
return False, f"Deployment[{name}] replicas mismatch"

if (
deployment.status.unavailable_replicas != 0
and deployment.status.unavailable_replicas is not None
):
return (
False,
f"[{name}] [{deployment.status.unavailable_replicas}] pods are unavailable",
)

return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
35 changes: 35 additions & 0 deletions acto/system_state/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Endpoints state model"""

import kubernetes
import kubernetes.client.models as kubernetes_models
import pydantic
from typing_extensions import Self

from acto.system_state.kubernetes_object import (
KubernetesNamespacedDictObject,
list_namespaced_object_helper,
)


class EndpointsState(KubernetesNamespacedDictObject):
"""Endpoints state object."""

root: dict[str, kubernetes_models.V1Endpoints]

@classmethod
def from_api_client_namespaced(
cls, api_client: kubernetes.client.ApiClient, namespace: str
) -> Self:
data = list_namespaced_object_helper(
kubernetes.client.CoreV1Api(api_client).list_namespaced_endpoints,
namespace,
)
return cls.model_validate(data)

def check_health(self) -> tuple[bool, str]:
"""Check Endpoints health"""
return True, ""

@pydantic.model_serializer
def serialize(self):
return {key: value.to_dict() for key, value in self.root.items()}
Loading

0 comments on commit 28ea844

Please sign in to comment.