Skip to content

Commit

Permalink
Reduce module dependency on the context file
Browse files Browse the repository at this point in the history
  • Loading branch information
tylergu committed Sep 22, 2023
1 parent 07bad40 commit 0dd46e5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 46 deletions.
34 changes: 14 additions & 20 deletions acto/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ def undeploy(self, context: dict, apiclient):
time.sleep(5)
logger.info('Namespace %s deleted' % context['namespace'])

def deploy(self, context: dict, kubeconfig: str, context_name: str):
def deploy(self, kubeconfig: str, context_name: str, namespace: str):
# XXX: context param is temporary, need to figure out why rabbitmq complains about namespace
pass

def deploy_with_retry(self, context, kubeconfig: str, context_name: str, retry_count=3):
def deploy_with_retry(self, kubeconfig: str, context_name: str, namespace: str, retry_count=3):
logger = get_thread_logger(with_prefix=False)
while retry_count > 0:
try:
return self.deploy(context, kubeconfig, context_name)
return self.deploy(kubeconfig, context_name, namespace)
except Exception as e:
logger.warn(e)
logger.info(
Expand All @@ -68,7 +68,7 @@ def deploy_with_retry(self, context, kubeconfig: str, context_name: str, retry_c
retry_count -= 1
return False

def check_status(self, context: dict, kubeconfig: str, context_name: str):
def check_status(self, kubeconfig: str, context_name: str):
'''
We need to make sure operator to be ready before applying test cases, because Acto would
Expand Down Expand Up @@ -163,7 +163,7 @@ def check_status(self, context, context_name: str) -> bool:

class Yaml(Deploy):

def deploy(self, context: dict, kubeconfig: str, context_name: str):
def deploy(self, kubeconfig: str, context_name: str, namespace: str):
# TODO: We cannot specify namespace ACTO_NAMESPACE here.
# rabbitMQ operator will report the error message
'''
Expand All @@ -173,24 +173,21 @@ def deploy(self, context: dict, kubeconfig: str, context_name: str):
logger = get_thread_logger(with_prefix=True)
print_event('Deploying operator...')

namespace = utils.get_yaml_existing_namespace(
self.path) or CONST.ACTO_NAMESPACE
context['namespace'] = namespace
ret = utils.create_namespace(
kubernetes_client(kubeconfig, context_name), namespace)
if ret == None:
logger.error('Failed to create namespace')
if self.init_yaml:
kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(context, kubeconfig=kubeconfig,
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
kubectl(['apply', '--server-side', '-f', self.path, '-n', context['namespace']], kubeconfig=kubeconfig,
kubectl(['apply', '--server-side', '-f', self.path, '-n', namespace], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(context, kubeconfig=kubeconfig,
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
add_acto_label(kubernetes_client(kubeconfig, context_name), context)
self.check_status(context, kubeconfig=kubeconfig,
add_acto_label(kubernetes_client(kubeconfig, context_name), namespace)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
time.sleep(20)

Expand All @@ -201,18 +198,15 @@ def deploy(self, context: dict, kubeconfig: str, context_name: str):

class Kustomize(Deploy):

def deploy(self, context, kubeconfig, context_name):
# TODO: We need to remove hardcoded namespace.
namespace = "cass-operator"
context['namespace'] = namespace
def deploy(self, kubeconfig, context_name, namespace):
if self.init_yaml:
kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(context, kubeconfig=kubeconfig,
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
kubectl(['apply', '--server-side', '-k', self.path, '-n', context['namespace']], kubeconfig=kubeconfig,
kubectl(['apply', '--server-side', '-k', self.path, '-n', namespace], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(context, kubeconfig=kubeconfig,
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
return True

Expand Down
18 changes: 11 additions & 7 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from acto.serialization import ActoEncoder, ContextEncoder
from acto.snapshot import Snapshot
from acto.utils import (delete_operator_pod, process_crd,
update_preload_images)
update_preload_images, get_yaml_existing_namespace)
from acto.lib.operator_config import OperatorConfig
from acto.utils.thread_logger import (get_thread_logger,
set_thread_logger_prefix)
Expand Down Expand Up @@ -246,8 +246,9 @@ def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL):
apiclient = kubernetes_client(self.kubeconfig, self.context_name)
self.cluster.load_images(self.images_archive, self.cluster_name)
trial_k8s_bootstrap_time = time.time()
deployed = self.deploy.deploy_with_retry(self.context, self.kubeconfig,
self.context_name)
deployed = self.deploy.deploy_with_retry(self.kubeconfig,
self.context_name,
self.context['namespace'])
if not deployed:
logger.info('Not deployed. Try again!')
continue
Expand Down Expand Up @@ -778,17 +779,20 @@ def __learn(self, context_file, helper_crd, analysis_only=False):

while True:
self.cluster.restart_cluster('learn', learn_kubeconfig)
deployed = self.deploy.deploy_with_retry(self.context, learn_kubeconfig,
learn_context_name)
namespace = get_yaml_existing_namespace(self.deploy.path) or CONST.ACTO_NAMESPACE
self.context['namespace'] = namespace
deployed = self.deploy.deploy_with_retry(learn_kubeconfig,
learn_context_name,
namespace)
if deployed:
break
apiclient = kubernetes_client(learn_kubeconfig, learn_context_name)
runner = Runner(self.context, 'learn', learn_kubeconfig, learn_context_name)
runner.run_without_collect(self.operator_config.seed_custom_resource)

update_preload_images(self.context, self.cluster.get_node_list('learn'))
process_crd(self.context, apiclient, KubectlClient(learn_kubeconfig, learn_context_name),
self.crd_name, helper_crd)
self.context['crd'] = process_crd(apiclient, KubectlClient(learn_kubeconfig, learn_context_name),
self.crd_name, helper_crd)
self.cluster.delete_cluster('learn', learn_kubeconfig)

run_end_time = time.time()
Expand Down
17 changes: 10 additions & 7 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ def run_cr(self, cr, trial, gen):
self._cluster.restart_cluster(self._cluster_name, self._kubeconfig)
self._cluster.load_images(self._images_archive, self._cluster_name)
apiclient = kubernetes_client(self._kubeconfig, self._context_name)
deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig,
self._context_name)
add_acto_label(apiclient, self._context)
deployed = self._deploy.deploy_with_retry(self._kubeconfig,
self._context_name,
self._context['namespace'])
add_acto_label(apiclient, self._context['namespace'])
trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id)
os.makedirs(trial_dir, exist_ok=True)
runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name)
Expand Down Expand Up @@ -328,8 +329,9 @@ def run(self):
self._cluster.load_images(self._images_archive, self._cluster_name)
apiclient = kubernetes_client(self._kubeconfig, self._context_name)
after_k8s_bootstrap_time = time.time()
deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig,
self._context_name)
deployed = self._deploy.deploy_with_retry(self._kubeconfig,
self._context_name,
self._context['namespace'])
after_operator_deploy_time = time.time()

trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id)
Expand Down Expand Up @@ -369,8 +371,9 @@ def run(self):
self._cluster.load_images(self._images_archive, self._cluster_name)
apiclient = kubernetes_client(self._kubeconfig, self._context_name)
after_k8s_bootstrap_time = time.time()
deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig,
self._context_name)
deployed = self._deploy.deploy_with_retry(self._kubeconfig,
self._context_name,
self._context['namespace'])
after_operator_deploy_time = time.time()
runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name)

Expand Down
2 changes: 2 additions & 0 deletions acto/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self,
def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
'''Simply run the cmd and dumps system_state, delta, operator log, events and input files without checking.
The function blocks until system converges.
TODO: move the serialization part to a separate function
Args:
Expand Down Expand Up @@ -93,6 +94,7 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
logger.error('STDERR: ' + cli_result.stderr)
return Snapshot(input, self.collect_cli_result(cli_result), {}, []), True
err = None

try:
err = self.wait_for_system_converge()
except (KeyError, ValueError) as e:
Expand Down
42 changes: 30 additions & 12 deletions acto/utils/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Optional

import kubernetes
import kubernetes.client.models as k8s_models
import yaml

from acto.kubectl_client import KubectlClient
Expand All @@ -19,6 +20,7 @@ def update_preload_images(context: dict, worker_list):
if not namespace:
return

# block list when getting the operator specific images
k8s_images = [
'docker.io/kindest/kindnetd',
'docker.io/rancher/local-path-provisioner',
Expand All @@ -39,6 +41,15 @@ def update_preload_images(context: dict, worker_list):
'docker.io/rancher/mirrored-library-traefik',
'docker.io/rancher/mirrored-metrics-server',
'docker.io/rancher/mirrored-paus',

# new k8s images
'registry.k8s.io/etcd',
'registry.k8s.io/kube-controller-manager',
'registry.k8s.io/pause',
'registry.k8s.io/kube-proxy',
'registry.k8s.io/coredns/coredns',
'registry.k8s.io/kube-apiserver',
'registry.k8s.io/kube-scheduler',
]

for worker in worker_list:
Expand All @@ -62,23 +73,31 @@ def update_preload_images(context: dict, worker_list):
context['preload_images'].add(image)


def process_crd(context: dict,
apiclient: kubernetes.client.ApiClient,
def process_crd(apiclient: kubernetes.client.ApiClient,
kubectl_client: KubectlClient,
crd_name: Optional[str] = None,
helper_crd: Optional[str] = None):
helper_crd: Optional[str] = None) -> dict:
''' Get crd from k8s and set context['crd']
Args:
apiclient: k8s api client
kubectl_client: kubectl client
crd_name: name of the crd
helper_crd: helper crd file path
Returns:
crd_data: crd dict
When there are more than one crd in the cluster, user should set crd_name
'''
logger = get_thread_logger(with_prefix=False)

if helper_crd == None:
apiextensionsV1Api = kubernetes.client.ApiextensionsV1Api(apiclient)
crds: List[
kubernetes.client.models.
k8s_models.
V1CustomResourceDefinition] = apiextensionsV1Api.list_custom_resource_definition().items
crd: Optional[kubernetes.client.models.V1CustomResourceDefinition] = None
crd: Optional[k8s_models.V1CustomResourceDefinition] = None
if len(crds) == 0:
logger.error('No crd is found')
quit()
Expand All @@ -100,15 +119,15 @@ def process_crd(context: dict,
crd_result = kubectl_client.kubectl(['get', 'crd', crd.metadata.name, "-o", "json"],
True, True)
crd_obj = json.loads(crd_result.stdout)
spec: kubernetes.client.models.V1CustomResourceDefinitionSpec = crd.spec
spec: k8s_models.V1CustomResourceDefinitionSpec = crd.spec
crd_data = {
'group': spec.group,
'plural': spec.names.plural,
# TODO: Handle multiple versions
'version': spec.versions[0].name,
'body': crd_obj
}
context['crd'] = crd_data
return crd_data
else:
with open(helper_crd, 'r') as helper_crd_f:
helper_crd_doc = yaml.load(helper_crd_f, Loader=yaml.FullLoader)
Expand All @@ -119,17 +138,16 @@ def process_crd(context: dict,
['name'], # TODO: Handle multiple versions
'body': helper_crd_doc
}
context['crd'] = crd_data
logger.debug('CRD data: %s' % crd_data)
return crd_data


def add_acto_label(apiclient: kubernetes.client.ApiClient, context: dict):
def add_acto_label(apiclient: kubernetes.client.ApiClient, namespace: str):
'''Add acto label to deployment, stateful_state and corresponding pods.
'''
appv1Api = kubernetes.client.AppsV1Api(apiclient)
operator_deployments = appv1Api.list_namespaced_deployment(context['namespace'],
operator_deployments = appv1Api.list_namespaced_deployment(namespace,
watch=False).items
operator_stateful_states = appv1Api.list_namespaced_stateful_set(context['namespace'],
operator_stateful_states = appv1Api.list_namespaced_stateful_set(namespace,
watch=False).items
for deployment in operator_deployments:
patches = [{
Expand Down

0 comments on commit 0dd46e5

Please sign in to comment.