diff --git a/acto/engine.py b/acto/engine.py index 268fc02e61..d9f91605ec 100644 --- a/acto/engine.py +++ b/acto/engine.py @@ -6,8 +6,8 @@ from copy import deepcopy from types import FunctionType -import yaml import jsonpatch +import yaml from acto.checker.checker_set import CheckerSet from acto.common import * @@ -24,17 +24,18 @@ from acto.input.valuegenerator import ArrayGenerator from acto.kubectl_client import KubectlClient from acto.kubernetes_engine import base, k3d, kind +from acto.lib.operator_config import OperatorConfig from acto.oracle_handle import OracleHandle from acto.runner import Runner from acto.serialization import ActoEncoder, ContextEncoder from acto.snapshot import Snapshot -from acto.utils import (delete_operator_pod, process_crd, - update_preload_images, get_yaml_existing_namespace) -from acto.lib.operator_config import OperatorConfig +from acto.utils import (delete_operator_pod, get_yaml_existing_namespace, + process_crd, update_preload_images) from acto.utils.thread_logger import (get_thread_logger, set_thread_logger_prefix) from ssa.analysis import analyze + def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed, time_breakdown): logger = get_thread_logger(with_prefix=False) @@ -307,7 +308,7 @@ def run_trial(self, on_init(oracle_handle) runner: Runner = self.runner_t(self.context, trial_dir, self.kubeconfig, self.context_name, - self.wait_time) + wait_time=self.wait_time) checker: CheckerSet = self.checker_t(self.context, trial_dir, self.input_model, oracle_handle, self.custom_oracle) curr_input = self.input_model.get_seed_input() @@ -627,13 +628,15 @@ def __init__(self, operator_config.deploy.init).new() if cluster_runtime == "KIND": - cluster = kind.Kind(acto_namespace=acto_namespace) + cluster = kind.Kind(acto_namespace=acto_namespace, + feature_gates=operator_config.kubernetes_engine.feature_gates) elif cluster_runtime == "K3D": cluster = k3d.K3D() else: logger.warning( f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind") - cluster = kind.Kind(acto_namespace=acto_namespace) + cluster = kind.Kind(acto_namespace=acto_namespace, + feature_gates=operator_config.kubernetes_engine.feature_gates) self.cluster = cluster self.deploy = deploy @@ -687,6 +690,7 @@ def __init__(self, if not applied_custom_k8s_fields: # default to use the known_schema module to automatically find the mapping # from CRD to K8s schema + logger.info('Using known_schema to find the mapping from CRD to K8s schema') tuples = find_all_matched_schemas_type(self.input_model.root_schema) for tuple in tuples: logger.debug(f'Found matched schema: {tuple[0].path} -> {tuple[1]}') diff --git a/acto/kubernetes_engine/base.py b/acto/kubernetes_engine/base.py index e56d06d167..066d3b477c 100644 --- a/acto/kubernetes_engine/base.py +++ b/acto/kubernetes_engine/base.py @@ -1,7 +1,7 @@ import subprocess import time from abc import ABC, abstractmethod -from typing import Callable, List +from typing import Callable, Dict, List import kubernetes @@ -15,12 +15,14 @@ class KubernetesEngine(ABC): @abstractmethod def __init__(self, acto_namespace: int, - posthooks: List[KubernetesEnginePostHookType] = None) -> None: ... + posthooks: List[KubernetesEnginePostHookType] = None, + feature_gates: Dict[str, bool] = None) -> None: ... '''Constructor for KubernetesEngine Args: acto_namespace: the namespace of the acto posthooks: a list of posthooks to be executed after the cluster is created + feature_gates: a list of feature gates to be enabled ''' @abstractmethod diff --git a/acto/kubernetes_engine/kind.py b/acto/kubernetes_engine/kind.py index c2647cbb2c..05dd4922be 100644 --- a/acto/kubernetes_engine/kind.py +++ b/acto/kubernetes_engine/kind.py @@ -2,7 +2,7 @@ import os import subprocess import time -from typing import List +from typing import Dict, List import kubernetes import yaml @@ -16,9 +16,11 @@ class Kind(base.KubernetesEngine): def __init__( - self, acto_namespace: int, posthooks: List[base.KubernetesEnginePostHookType] = None): - self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml') - self.posthooks = posthooks + self, acto_namespace: int, posthooks: List[base.KubernetesEnginePostHookType] = None, + feature_gates: Dict[str, bool] = None): + self._config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml') + self._posthooks = posthooks + self._feature_gates = feature_gates def configure_cluster(self, num_nodes: int, version: str): '''Create config file for kind''' @@ -45,12 +47,17 @@ def configure_cluster(self, num_nodes: int, version: str): }] }) + if self._feature_gates: + config_dict['featureGates'] = {} + for key, value in self._feature_gates.items(): + config_dict['featureGates'][key] = value + try: os.mkdir(CONST.CLUSTER_CONFIG_FOLDER) except FileExistsError: pass - with open(self.config_path, 'w') as config_file: + with open(self._config_path, 'w') as config_file: yaml.dump(config_dict, config_file) self._k8s_version = version @@ -82,12 +89,13 @@ def create_cluster(self, name: str, kubeconfig: str): else: raise Exception('Missing kubeconfig for kind create') - cmd.extend(['--config', self.config_path]) + cmd.extend(['--config', self._config_path]) cmd.extend(['--image', f"kindest/node:{self._k8s_version}"]) p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) while p.returncode != 0: + # TODO: retry for three times logging.error('Failed to create kind cluster, retrying') self.delete_cluster(name, kubeconfig) time.sleep(5) @@ -103,8 +111,8 @@ def create_cluster(self, name: str, kubeconfig: str): logging.debug(f.read()) raise e - if self.posthooks: - for posthook in self.posthooks: + if self._posthooks: + for posthook in self._posthooks: posthook(apiclient=apiclient) def load_images(self, images_archive_path: str, name: str): diff --git a/acto/lib/operator_config.py b/acto/lib/operator_config.py index 84c14d02f6..a6400edb21 100644 --- a/acto/lib/operator_config.py +++ b/acto/lib/operator_config.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import List, Optional +from typing import Dict, List, Optional from pydantic import BaseModel, Field @@ -31,6 +31,11 @@ class AnalysisConfig(BaseModel, extra='forbid'): description='The relative path of the main package for the operator') +class KubernetesEngineConfig(BaseModel, extra='forbid'): + feature_gates: Dict[str, bool] = Field( + description='Path to the feature gates file', default=None) + + class OperatorConfig(BaseModel, extra='forbid'): """Configuration for porting operators to Acto""" deploy: DeployConfig @@ -50,6 +55,9 @@ class OperatorConfig(BaseModel, extra='forbid'): diff_ignore_fields: List[str] = Field(default_factory=list) kubernetes_version: str = Field( default='v1.22.9', description='Kubernetes version') + kubernetes_engine: KubernetesEngineConfig = Field( + default=KubernetesEngineConfig(), description='Configuration for the Kubernetes engine' + ) monkey_patch: Optional[str] = Field( default=None, description='Path to the monkey patch file') diff --git a/acto/post_process/post_diff_test.py b/acto/post_process/post_diff_test.py index a620362edc..cd5451566d 100644 --- a/acto/post_process/post_diff_test.py +++ b/acto/post_process/post_diff_test.py @@ -423,7 +423,8 @@ def __init__(self, testrun_dir: str, config: OperatorConfig, ignore_invalid: boo def post_process(self, workdir: str, num_workers: int = 1): if not os.path.exists(workdir): os.mkdir(workdir) - cluster = kind.Kind(acto_namespace=self.acto_namespace) + cluster = kind.Kind(acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates) cluster.configure_cluster(self.config.num_nodes, self.config.kubernetes_version) deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new() # Build an archive to be preloaded @@ -479,7 +480,8 @@ def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str, generation = 0 # for additional runner additional_runner_dir = os.path.join(workdir, f'additional-runner-{worker_id}') - cluster = kind.Kind(acto_namespace=self.acto_namespace) + cluster = kind.Kind(acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates) cluster.configure_cluster(self.config.num_nodes, self.config.kubernetes_version) deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new()