Skip to content

Commit

Permalink
make more async
Browse files Browse the repository at this point in the history
Signed-off-by: Sylvain Hellegouarch <[email protected]>
  • Loading branch information
Lawouach committed Apr 21, 2024
1 parent cd169ba commit 803c43b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

[Unreleased]: https://github.com/chaostoolkit-incubator/kubernetes-crd/compare/0.12.2...HEAD

### Changed

* Try to make code a bit more async

## [0.12.2][] - 2024-04-19

[0.12.2]: https://github.com/chaostoolkit-incubator/kubernetes-crd/compare/0.12.1...0.12.2
Expand Down
67 changes: 48 additions & 19 deletions controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import hashlib
import logging
from typing import Any, Dict, List, Union
from typing import Any, Callable, Dict, List, Union

import kopf
from kopf._cogs.structs import bodies
Expand Down Expand Up @@ -119,6 +120,10 @@ async def delete_chaos_experiment( # noqa: C901
###############################################################################
# Internals
###############################################################################
async def run_async(f: Callable, *args, **kwargs) -> Any:
return await asyncio.to_thread(f, *args, **kwargs)


def set_ns(resource: Union[Dict[str, Any], List[Dict[str, Any]]], ns: str):
"""
Set the namespace on the resource(s)
Expand Down Expand Up @@ -470,8 +475,10 @@ async def create_experiment_env_config_map(
created = False

try:
cm = v1.read_namespaced_config_map(
namespace=namespace, name="chaostoolkit-env"
cm = await run_async(
v1.read_namespaced_config_map,
namespace=namespace,
name="chaostoolkit-env",
)
logger.info("Reusing existing default 'chaostoolkit-env' configmap")
except ApiException:
Expand Down Expand Up @@ -502,7 +509,9 @@ async def delete_experiment_env_config_map(
name = f"{name}-{name_suffix}"
logger.info("Deleting '{name}' configmap")
try:
return v1.delete_namespaced_config_map(name=name, namespace=namespace)
return await run_async(
v1.delete_namespaced_config_map, name=name, namespace=namespace
)
except ApiException:
logger.error(
f"Failed to delete experiment configmap '{name}'", exc_info=True
Expand All @@ -515,8 +524,10 @@ async def get_config_map(
cm_pod_spec_name = spec.get("template", {}).get(
"name", "chaostoolkit-resources-templates"
)
cm = v1.read_namespaced_config_map(
namespace=namespace, name=cm_pod_spec_name
cm = await run_async(
v1.read_namespaced_config_map,
namespace=namespace,
name=cm_pod_spec_name,
)
return cm

Expand All @@ -534,7 +545,7 @@ async def create_ns(
tpl["metadata"]["name"] = ns_name
logger.debug(f"Creating namespace with template:\n{tpl}")
try:
r = api.create_namespace(body=tpl)
r = await run_async(api.create_namespace, body=tpl)
return ns_name, r
except ApiException as e:
if e.status == 409:
Expand Down Expand Up @@ -564,7 +575,9 @@ async def create_sa(
set_ns(tpl, ns)
logger.debug(f"Creating service account with template:\n{tpl}")
try:
return api.create_namespaced_service_account(body=tpl, namespace=ns)
return await run_async(
api.create_namespaced_service_account, body=tpl, namespace=ns
)
except ApiException as e:
if e.status == 409:
logger.info(f"Service account '{sa_name}' already exists.")
Expand All @@ -589,8 +602,10 @@ async def delete_sa(
sa_name = f"{sa_name}-{name_suffix}"
logger.debug(f"Deleting service account: {sa_name}")
try:
return api.delete_namespaced_service_account(
name=sa_name, namespace=ns
return await run_async(
api.delete_namespaced_service_account,
name=sa_name,
namespace=ns,
)
except ApiException:
logger.error(
Expand All @@ -616,7 +631,9 @@ async def create_role(

logger.debug(f"Creating role with template:\n{tpl}")
try:
return api.create_namespaced_role(body=tpl, namespace=ns)
return await run_async(
api.create_namespaced_role, body=tpl, namespace=ns
)
except ApiException as e:
if e.status == 409:
logger.info(f"Role '{role_name}' already exists.")
Expand All @@ -639,7 +656,9 @@ async def delete_role(
role_name = f"{role_name}-{name_suffix}"
logger.debug(f"Deleting role with template: {role_name}")
try:
return api.delete_namespaced_role(name=role_name, namespace=ns)
return await run_async(
api.delete_namespaced_role, name=role_name, namespace=ns
)
except ApiException:
logger.error(f"Failed to delete role '{role_name}'", exc_info=True)

Expand Down Expand Up @@ -678,7 +697,9 @@ async def create_role_binding(
set_ns(tpl, ns)
logger.debug(f"Creating role binding with template:\n{tpl}")
try:
return api.create_namespaced_role_binding(body=tpl, namespace=ns)
return await run_async(
api.create_namespaced_role_binding, body=tpl, namespace=ns
)
except ApiException as e:
if e.status == 409:
logger.info(
Expand Down Expand Up @@ -745,8 +766,10 @@ async def delete_role_binding(
role_binding_name = f"{role_binding_name}-{name_suffix}"
logger.debug(f"Deleting role binding: {role_binding_name}")
try:
return api.delete_namespaced_role_binding(
name=role_binding_name, namespace=ns
return await run_async(
api.delete_namespaced_role_binding,
name=role_binding_name,
namespace=ns,
)
except ApiException:
logger.error(
Expand Down Expand Up @@ -878,7 +901,7 @@ async def create_pod(

if apply:
logger.debug(f"Creating pod with template:\n{tpl}")
pod = api.create_namespaced_pod(body=tpl, namespace=ns)
pod = await run_async(api.create_namespaced_pod, body=tpl, namespace=ns)
logger.info(f"Pod {pod.metadata.name} created in ns '{ns}'")
return pod

Expand All @@ -905,7 +928,9 @@ async def delete_pod(
pod_name = f"{pod_name}-{name_suffix}"
logger.debug(f"Deleting pod: {pod_name}")
try:
return api.delete_namespaced_pod(name=pod_name, namespace=ns)
return await run_async(
api.delete_namespaced_pod, name=pod_name, namespace=ns
)
except ApiException:
logger.error(f"Failed to delete pod '{pod_name}'", exc_info=True)

Expand Down Expand Up @@ -938,7 +963,9 @@ async def create_cron_job(
)

logger.debug(f"Creating cron job with template:\n{tpl}")
cron = api.create_namespaced_cron_job(body=tpl, namespace=ns)
cron = await run_async(
api.create_namespaced_cron_job, body=tpl, namespace=ns
)
logger.info(
f"Cron Job '{cron.metadata.name}' scheduled with "
f"pattern '{schedule}' in ns '{ns}'"
Expand All @@ -960,6 +987,8 @@ async def delete_cron_job(
cron_job_name = f"{cron_job_name}-{name_suffix}"
logger.debug(f"Deleting cron job: {cron_job_name}")
try:
return api.delete_namespaced_cron_job(name=cron_job_name, namespace=ns)
return await run_async(
api.delete_namespaced_cron_job, name=cron_job_name, namespace=ns
)
except ApiException:
logger.error(f"Failed to cron job '{cron_job_name}'", exc_info=True)

0 comments on commit 803c43b

Please sign in to comment.