diff --git a/acto/deploy.py b/acto/deploy.py index 4c7dea33e2..eaf765314c 100644 --- a/acto/deploy.py +++ b/acto/deploy.py @@ -51,15 +51,15 @@ def undeploy(self, context: dict, apiclient): time.sleep(5) logger.info('Namespace %s deleted' % context['namespace']) - def deploy(self, context: dict, kubeconfig: str, context_name: str): + def deploy(self, kubeconfig: str, context_name: str, namespace: str): # XXX: context param is temporary, need to figure out why rabbitmq complains about namespace pass - def deploy_with_retry(self, context, kubeconfig: str, context_name: str, retry_count=3): + def deploy_with_retry(self, kubeconfig: str, context_name: str, namespace: str, retry_count=3): logger = get_thread_logger(with_prefix=False) while retry_count > 0: try: - return self.deploy(context, kubeconfig, context_name) + return self.deploy(kubeconfig, context_name, namespace) except Exception as e: logger.warn(e) logger.info( @@ -68,7 +68,7 @@ def deploy_with_retry(self, context, kubeconfig: str, context_name: str, retry_c retry_count -= 1 return False - def check_status(self, context: dict, kubeconfig: str, context_name: str): + def check_status(self, kubeconfig: str, context_name: str): ''' We need to make sure operator to be ready before applying test cases, because Acto would @@ -163,7 +163,7 @@ def check_status(self, context, context_name: str) -> bool: class Yaml(Deploy): - def deploy(self, context: dict, kubeconfig: str, context_name: str): + def deploy(self, kubeconfig: str, context_name: str, namespace: str): # TODO: We cannot specify namespace ACTO_NAMESPACE here. # rabbitMQ operator will report the error message ''' @@ -173,9 +173,6 @@ def deploy(self, context: dict, kubeconfig: str, context_name: str): logger = get_thread_logger(with_prefix=True) print_event('Deploying operator...') - namespace = utils.get_yaml_existing_namespace( - self.path) or CONST.ACTO_NAMESPACE - context['namespace'] = namespace ret = utils.create_namespace( kubernetes_client(kubeconfig, context_name), namespace) if ret == None: @@ -183,14 +180,14 @@ def deploy(self, context: dict, kubeconfig: str, context_name: str): if self.init_yaml: kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig, context_name=context_name) - self.check_status(context, kubeconfig=kubeconfig, + self.check_status(kubeconfig=kubeconfig, context_name=context_name) - kubectl(['apply', '--server-side', '-f', self.path, '-n', context['namespace']], kubeconfig=kubeconfig, + kubectl(['apply', '--server-side', '-f', self.path, '-n', namespace], kubeconfig=kubeconfig, context_name=context_name) - self.check_status(context, kubeconfig=kubeconfig, + self.check_status(kubeconfig=kubeconfig, context_name=context_name) - add_acto_label(kubernetes_client(kubeconfig, context_name), context) - self.check_status(context, kubeconfig=kubeconfig, + add_acto_label(kubernetes_client(kubeconfig, context_name), namespace) + self.check_status(kubeconfig=kubeconfig, context_name=context_name) time.sleep(20) @@ -201,18 +198,15 @@ def deploy(self, context: dict, kubeconfig: str, context_name: str): class Kustomize(Deploy): - def deploy(self, context, kubeconfig, context_name): - # TODO: We need to remove hardcoded namespace. - namespace = "cass-operator" - context['namespace'] = namespace + def deploy(self, kubeconfig, context_name, namespace): if self.init_yaml: kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig, context_name=context_name) - self.check_status(context, kubeconfig=kubeconfig, + self.check_status(kubeconfig=kubeconfig, context_name=context_name) - kubectl(['apply', '--server-side', '-k', self.path, '-n', context['namespace']], kubeconfig=kubeconfig, + kubectl(['apply', '--server-side', '-k', self.path, '-n', namespace], kubeconfig=kubeconfig, context_name=context_name) - self.check_status(context, kubeconfig=kubeconfig, + self.check_status(kubeconfig=kubeconfig, context_name=context_name) return True diff --git a/acto/engine.py b/acto/engine.py index 88cc0106e8..268fc02e61 100644 --- a/acto/engine.py +++ b/acto/engine.py @@ -29,7 +29,7 @@ from acto.serialization import ActoEncoder, ContextEncoder from acto.snapshot import Snapshot from acto.utils import (delete_operator_pod, process_crd, - update_preload_images) + update_preload_images, get_yaml_existing_namespace) from acto.lib.operator_config import OperatorConfig from acto.utils.thread_logger import (get_thread_logger, set_thread_logger_prefix) @@ -246,8 +246,9 @@ def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL): apiclient = kubernetes_client(self.kubeconfig, self.context_name) self.cluster.load_images(self.images_archive, self.cluster_name) trial_k8s_bootstrap_time = time.time() - deployed = self.deploy.deploy_with_retry(self.context, self.kubeconfig, - self.context_name) + deployed = self.deploy.deploy_with_retry(self.kubeconfig, + self.context_name, + self.context['namespace']) if not deployed: logger.info('Not deployed. Try again!') continue @@ -778,8 +779,11 @@ def __learn(self, context_file, helper_crd, analysis_only=False): while True: self.cluster.restart_cluster('learn', learn_kubeconfig) - deployed = self.deploy.deploy_with_retry(self.context, learn_kubeconfig, - learn_context_name) + namespace = get_yaml_existing_namespace(self.deploy.path) or CONST.ACTO_NAMESPACE + self.context['namespace'] = namespace + deployed = self.deploy.deploy_with_retry(learn_kubeconfig, + learn_context_name, + namespace) if deployed: break apiclient = kubernetes_client(learn_kubeconfig, learn_context_name) @@ -787,8 +791,8 @@ def __learn(self, context_file, helper_crd, analysis_only=False): runner.run_without_collect(self.operator_config.seed_custom_resource) update_preload_images(self.context, self.cluster.get_node_list('learn')) - process_crd(self.context, apiclient, KubectlClient(learn_kubeconfig, learn_context_name), - self.crd_name, helper_crd) + self.context['crd'] = process_crd(apiclient, KubectlClient(learn_kubeconfig, learn_context_name), + self.crd_name, helper_crd) self.cluster.delete_cluster('learn', learn_kubeconfig) run_end_time = time.time() diff --git a/acto/post_process/post_diff_test.py b/acto/post_process/post_diff_test.py index 327697606c..a620362edc 100644 --- a/acto/post_process/post_diff_test.py +++ b/acto/post_process/post_diff_test.py @@ -279,9 +279,10 @@ def run_cr(self, cr, trial, gen): self._cluster.restart_cluster(self._cluster_name, self._kubeconfig) self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) - add_acto_label(apiclient, self._context) + deployed = self._deploy.deploy_with_retry(self._kubeconfig, + self._context_name, + self._context['namespace']) + add_acto_label(apiclient, self._context['namespace']) trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) os.makedirs(trial_dir, exist_ok=True) runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) @@ -328,8 +329,9 @@ def run(self): self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) after_k8s_bootstrap_time = time.time() - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) + deployed = self._deploy.deploy_with_retry(self._kubeconfig, + self._context_name, + self._context['namespace']) after_operator_deploy_time = time.time() trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) @@ -369,8 +371,9 @@ def run(self): self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) after_k8s_bootstrap_time = time.time() - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) + deployed = self._deploy.deploy_with_retry(self._kubeconfig, + self._context_name, + self._context['namespace']) after_operator_deploy_time = time.time() runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) diff --git a/acto/runner/runner.py b/acto/runner/runner.py index 86a5649a32..7aa75f312c 100644 --- a/acto/runner/runner.py +++ b/acto/runner/runner.py @@ -63,6 +63,7 @@ def __init__(self, def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: '''Simply run the cmd and dumps system_state, delta, operator log, events and input files without checking. The function blocks until system converges. + TODO: move the serialization part to a separate function Args: @@ -93,6 +94,7 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: logger.error('STDERR: ' + cli_result.stderr) return Snapshot(input, self.collect_cli_result(cli_result), {}, []), True err = None + try: err = self.wait_for_system_converge() except (KeyError, ValueError) as e: diff --git a/acto/utils/preprocess.py b/acto/utils/preprocess.py index 0e27fcbf1d..82aac65696 100644 --- a/acto/utils/preprocess.py +++ b/acto/utils/preprocess.py @@ -3,6 +3,7 @@ from typing import List, Optional import kubernetes +import kubernetes.client.models as k8s_models import yaml from acto.kubectl_client import KubectlClient @@ -19,6 +20,7 @@ def update_preload_images(context: dict, worker_list): if not namespace: return + # block list when getting the operator specific images k8s_images = [ 'docker.io/kindest/kindnetd', 'docker.io/rancher/local-path-provisioner', @@ -39,6 +41,15 @@ def update_preload_images(context: dict, worker_list): 'docker.io/rancher/mirrored-library-traefik', 'docker.io/rancher/mirrored-metrics-server', 'docker.io/rancher/mirrored-paus', + + # new k8s images + 'registry.k8s.io/etcd', + 'registry.k8s.io/kube-controller-manager', + 'registry.k8s.io/pause', + 'registry.k8s.io/kube-proxy', + 'registry.k8s.io/coredns/coredns', + 'registry.k8s.io/kube-apiserver', + 'registry.k8s.io/kube-scheduler', ] for worker in worker_list: @@ -62,13 +73,21 @@ def update_preload_images(context: dict, worker_list): context['preload_images'].add(image) -def process_crd(context: dict, - apiclient: kubernetes.client.ApiClient, +def process_crd(apiclient: kubernetes.client.ApiClient, kubectl_client: KubectlClient, crd_name: Optional[str] = None, - helper_crd: Optional[str] = None): + helper_crd: Optional[str] = None) -> dict: ''' Get crd from k8s and set context['crd'] + Args: + apiclient: k8s api client + kubectl_client: kubectl client + crd_name: name of the crd + helper_crd: helper crd file path + + Returns: + crd_data: crd dict + When there are more than one crd in the cluster, user should set crd_name ''' logger = get_thread_logger(with_prefix=False) @@ -76,9 +95,9 @@ def process_crd(context: dict, if helper_crd == None: apiextensionsV1Api = kubernetes.client.ApiextensionsV1Api(apiclient) crds: List[ - kubernetes.client.models. + k8s_models. V1CustomResourceDefinition] = apiextensionsV1Api.list_custom_resource_definition().items - crd: Optional[kubernetes.client.models.V1CustomResourceDefinition] = None + crd: Optional[k8s_models.V1CustomResourceDefinition] = None if len(crds) == 0: logger.error('No crd is found') quit() @@ -100,7 +119,7 @@ def process_crd(context: dict, crd_result = kubectl_client.kubectl(['get', 'crd', crd.metadata.name, "-o", "json"], True, True) crd_obj = json.loads(crd_result.stdout) - spec: kubernetes.client.models.V1CustomResourceDefinitionSpec = crd.spec + spec: k8s_models.V1CustomResourceDefinitionSpec = crd.spec crd_data = { 'group': spec.group, 'plural': spec.names.plural, @@ -108,7 +127,7 @@ def process_crd(context: dict, 'version': spec.versions[0].name, 'body': crd_obj } - context['crd'] = crd_data + return crd_data else: with open(helper_crd, 'r') as helper_crd_f: helper_crd_doc = yaml.load(helper_crd_f, Loader=yaml.FullLoader) @@ -119,17 +138,16 @@ def process_crd(context: dict, ['name'], # TODO: Handle multiple versions 'body': helper_crd_doc } - context['crd'] = crd_data - logger.debug('CRD data: %s' % crd_data) + return crd_data -def add_acto_label(apiclient: kubernetes.client.ApiClient, context: dict): +def add_acto_label(apiclient: kubernetes.client.ApiClient, namespace: str): '''Add acto label to deployment, stateful_state and corresponding pods. ''' appv1Api = kubernetes.client.AppsV1Api(apiclient) - operator_deployments = appv1Api.list_namespaced_deployment(context['namespace'], + operator_deployments = appv1Api.list_namespaced_deployment(namespace, watch=False).items - operator_stateful_states = appv1Api.list_namespaced_stateful_set(context['namespace'], + operator_stateful_states = appv1Api.list_namespaced_stateful_set(namespace, watch=False).items for deployment in operator_deployments: patches = [{