Skip to content

Commit

Permalink
Support to configure feature gates for K8s cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Nov 2, 2023
1 parent 1780f5c commit 675bdcf
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 20 deletions.
18 changes: 11 additions & 7 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from copy import deepcopy
from types import FunctionType

import yaml
import jsonpatch
import yaml

from acto.checker.checker_set import CheckerSet
from acto.common import *
Expand All @@ -24,17 +24,18 @@
from acto.input.valuegenerator import ArrayGenerator
from acto.kubectl_client import KubectlClient
from acto.kubernetes_engine import base, k3d, kind
from acto.lib.operator_config import OperatorConfig
from acto.oracle_handle import OracleHandle
from acto.runner import Runner
from acto.serialization import ActoEncoder, ContextEncoder
from acto.snapshot import Snapshot
from acto.utils import (delete_operator_pod, process_crd,
update_preload_images, get_yaml_existing_namespace)
from acto.lib.operator_config import OperatorConfig
from acto.utils import (delete_operator_pod, get_yaml_existing_namespace,
process_crd, update_preload_images)
from acto.utils.thread_logger import (get_thread_logger,
set_thread_logger_prefix)
from ssa.analysis import analyze


def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed, time_breakdown):
logger = get_thread_logger(with_prefix=False)

Expand Down Expand Up @@ -307,7 +308,7 @@ def run_trial(self,
on_init(oracle_handle)

runner: Runner = self.runner_t(self.context, trial_dir, self.kubeconfig, self.context_name,
self.wait_time)
wait_time=self.wait_time)
checker: CheckerSet = self.checker_t(self.context, trial_dir, self.input_model, oracle_handle, self.custom_oracle)

curr_input = self.input_model.get_seed_input()
Expand Down Expand Up @@ -627,13 +628,15 @@ def __init__(self,
operator_config.deploy.init).new()

if cluster_runtime == "KIND":
cluster = kind.Kind(acto_namespace=acto_namespace)
cluster = kind.Kind(acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates)
elif cluster_runtime == "K3D":
cluster = k3d.K3D()
else:
logger.warning(
f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind")
cluster = kind.Kind(acto_namespace=acto_namespace)
cluster = kind.Kind(acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates)

self.cluster = cluster
self.deploy = deploy
Expand Down Expand Up @@ -687,6 +690,7 @@ def __init__(self,
if not applied_custom_k8s_fields:
# default to use the known_schema module to automatically find the mapping
# from CRD to K8s schema
logger.info('Using known_schema to find the mapping from CRD to K8s schema')
tuples = find_all_matched_schemas_type(self.input_model.root_schema)
for tuple in tuples:
logger.debug(f'Found matched schema: {tuple[0].path} -> {tuple[1]}')
Expand Down
6 changes: 4 additions & 2 deletions acto/kubernetes_engine/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
import time
from abc import ABC, abstractmethod
from typing import Callable, List
from typing import Callable, Dict, List

import kubernetes

Expand All @@ -15,12 +15,14 @@ class KubernetesEngine(ABC):

@abstractmethod
def __init__(self, acto_namespace: int,
posthooks: List[KubernetesEnginePostHookType] = None) -> None: ...
posthooks: List[KubernetesEnginePostHookType] = None,
feature_gates: Dict[str, bool] = None) -> None: ...
'''Constructor for KubernetesEngine
Args:
acto_namespace: the namespace of the acto
posthooks: a list of posthooks to be executed after the cluster is created
feature_gates: a list of feature gates to be enabled
'''

@abstractmethod
Expand Down
24 changes: 16 additions & 8 deletions acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
import time
from typing import List
from typing import Dict, List

import kubernetes
import yaml
Expand All @@ -16,9 +16,11 @@
class Kind(base.KubernetesEngine):

def __init__(
self, acto_namespace: int, posthooks: List[base.KubernetesEnginePostHookType] = None):
self.config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml')
self.posthooks = posthooks
self, acto_namespace: int, posthooks: List[base.KubernetesEnginePostHookType] = None,
feature_gates: Dict[str, bool] = None):
self._config_path = os.path.join(CONST.CLUSTER_CONFIG_FOLDER, f'KIND-{acto_namespace}.yaml')
self._posthooks = posthooks
self._feature_gates = feature_gates

def configure_cluster(self, num_nodes: int, version: str):
'''Create config file for kind'''
Expand All @@ -45,12 +47,17 @@ def configure_cluster(self, num_nodes: int, version: str):
}]
})

if self._feature_gates:
config_dict['featureGates'] = {}
for key, value in self._feature_gates.items():
config_dict['featureGates'][key] = value

try:
os.mkdir(CONST.CLUSTER_CONFIG_FOLDER)
except FileExistsError:
pass

with open(self.config_path, 'w') as config_file:
with open(self._config_path, 'w') as config_file:
yaml.dump(config_dict, config_file)

self._k8s_version = version
Expand Down Expand Up @@ -82,12 +89,13 @@ def create_cluster(self, name: str, kubeconfig: str):
else:
raise Exception('Missing kubeconfig for kind create')

cmd.extend(['--config', self.config_path])
cmd.extend(['--config', self._config_path])

cmd.extend(['--image', f"kindest/node:{self._k8s_version}"])

p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
while p.returncode != 0:
# TODO: retry for three times
logging.error('Failed to create kind cluster, retrying')
self.delete_cluster(name, kubeconfig)
time.sleep(5)
Expand All @@ -103,8 +111,8 @@ def create_cluster(self, name: str, kubeconfig: str):
logging.debug(f.read())
raise e

if self.posthooks:
for posthook in self.posthooks:
if self._posthooks:
for posthook in self._posthooks:
posthook(apiclient=apiclient)

def load_images(self, images_archive_path: str, name: str):
Expand Down
10 changes: 9 additions & 1 deletion acto/lib/operator_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import List, Optional
from typing import Dict, List, Optional

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -31,6 +31,11 @@ class AnalysisConfig(BaseModel, extra='forbid'):
description='The relative path of the main package for the operator')


class KubernetesEngineConfig(BaseModel, extra='forbid'):
feature_gates: Dict[str, bool] = Field(
description='Path to the feature gates file', default=None)


class OperatorConfig(BaseModel, extra='forbid'):
"""Configuration for porting operators to Acto"""
deploy: DeployConfig
Expand All @@ -50,6 +55,9 @@ class OperatorConfig(BaseModel, extra='forbid'):
diff_ignore_fields: List[str] = Field(default_factory=list)
kubernetes_version: str = Field(
default='v1.22.9', description='Kubernetes version')
kubernetes_engine: KubernetesEngineConfig = Field(
default=KubernetesEngineConfig(), description='Configuration for the Kubernetes engine'
)

monkey_patch: Optional[str] = Field(
default=None, description='Path to the monkey patch file')
Expand Down
6 changes: 4 additions & 2 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ def __init__(self, testrun_dir: str, config: OperatorConfig, ignore_invalid: boo
def post_process(self, workdir: str, num_workers: int = 1):
if not os.path.exists(workdir):
os.mkdir(workdir)
cluster = kind.Kind(acto_namespace=self.acto_namespace)
cluster = kind.Kind(acto_namespace=self.acto_namespace,
feature_gates=self.config.kubernetes_engine.feature_gates)
cluster.configure_cluster(self.config.num_nodes, self.config.kubernetes_version)
deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new()
# Build an archive to be preloaded
Expand Down Expand Up @@ -479,7 +480,8 @@ def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str,

generation = 0 # for additional runner
additional_runner_dir = os.path.join(workdir, f'additional-runner-{worker_id}')
cluster = kind.Kind(acto_namespace=self.acto_namespace)
cluster = kind.Kind(acto_namespace=self.acto_namespace,
feature_gates=self.config.kubernetes_engine.feature_gates)
cluster.configure_cluster(self.config.num_nodes, self.config.kubernetes_version)

deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new()
Expand Down

0 comments on commit 675bdcf

Please sign in to comment.