Skip to content

Commit

Permalink
add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kelkawi-a committed Jun 24, 2024
1 parent 205db6e commit 35d9c92
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 22 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/integration_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Integration tests

on:
pull_request:

jobs:
integration-tests:
uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main
secrets: inherit
with:
channel: 1.28-strict/stable
modules: '["test_charm.py"]'
juju-channel: 3.1/stable
self-hosted-runner: true
self-hosted-runner-label: "xlarge"
microk8s-addons: "dns ingress rbac storage metallb:10.15.119.2-10.15.119.4 registry"
2 changes: 1 addition & 1 deletion charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ resources:
airbyte-webapp:
type: oci-image
description: OCI image for Airbyte web UI
upstream-source: airbyte/webapp:0.57.3
upstream-source: airbyte/webapp:0.60.0
4 changes: 4 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Tests module."""
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Fixtures for jenkins-k8s charm tests."""

import pytest


def pytest_addoption(parser: pytest.Parser):
"""Parse additional pytest options.
Args:
parser: pytest command line parser.
"""
# The prebuilt charm file.
parser.addoption("--charm-file", action="append", default=[])
65 changes: 65 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Charm integration test config."""

import asyncio
import logging

import pytest_asyncio
from helpers import (
APP_NAME_AIRBYTE_SERVER,
APP_NAME_AIRBYTE_UI,
APP_NAME_TEMPORAL_ADMIN,
APP_NAME_TEMPORAL_SERVER,
METADATA,
create_default_namespace,
perform_airbyte_integrations,
perform_temporal_integrations,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


@pytest_asyncio.fixture(name="deploy", scope="module")
async def deploy(ops_test: OpsTest):
"""Test the app is up and running."""
charm = await ops_test.build_charm(".")
resources = {"airbyte-webapp": METADATA["resources"]["airbyte-webapp"]["upstream-source"]}

asyncio.gather(
ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME_AIRBYTE_UI),
ops_test.model.deploy(APP_NAME_AIRBYTE_SERVER, trust=True, channel="edge"),
ops_test.model.deploy(
APP_NAME_TEMPORAL_SERVER,
channel="edge",
config={"num-history-shards": 1},
),
ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge"),
ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True),
ops_test.model.deploy("minio", channel="edge"),
ops_test.model.deploy("nginx-ingress-integrator", channel="edge", revision=103, trust=True),
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=["postgresql-k8s", "minio"], status="active", raise_on_blocked=False, timeout=1200
)
await ops_test.model.wait_for_idle(
apps=[APP_NAME_TEMPORAL_SERVER, APP_NAME_TEMPORAL_ADMIN],
status="blocked",
raise_on_blocked=False,
timeout=600,
)
await ops_test.model.wait_for_idle(
apps=["nginx-ingress-integrator"],
status="waiting",
raise_on_blocked=False,
timeout=600,
)

await perform_temporal_integrations(ops_test)
await create_default_namespace(ops_test)

await perform_airbyte_integrations(ops_test)
166 changes: 166 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

# flake8: noqa

"""Charm integration test helpers."""

import logging
import socket
from datetime import timedelta
from pathlib import Path

import yaml
from pytest_operator.plugin import OpsTest
from temporal_client.activities import say_hello
from temporal_client.workflows import SayHello
from temporalio.client import Client
from temporalio.worker import Worker

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./charmcraft.yaml").read_text())
APP_NAME_AIRBYTE_UI = METADATA["name"]
APP_NAME_AIRBYTE_SERVER = "airbyte-k8s"
APP_NAME_TEMPORAL_SERVER = "temporal-k8s"
APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s"
APP_NAME_TEMPORAL_UI = "temporal-ui-k8s"


def gen_patch_getaddrinfo(host: str, resolve_to: str): # noqa
"""Generate patched getaddrinfo function.
This function is used to generate a patched getaddrinfo function that will resolve to the
resolve_to address without having to actually register a host.
Args:
host: intended hostname of a given application.
resolve_to: destination address for host to resolve to.
Returns:
A patching function for getaddrinfo.
"""
original_getaddrinfo = socket.getaddrinfo

def patched_getaddrinfo(*args):
"""Patch getaddrinfo to point to desired ip address.
Args:
args: original arguments to getaddrinfo when creating network connection.
Returns:
Patched getaddrinfo function.
"""
if args[0] == host:
return original_getaddrinfo(resolve_to, *args[1:])
return original_getaddrinfo(*args)

return patched_getaddrinfo


async def run_sample_workflow(ops_test: OpsTest):
"""Connect a client and runs a basic Temporal workflow.
Args:
ops_test: PyTest object.
"""
url = await get_application_url(ops_test, application=APP_NAME_TEMPORAL_SERVER, port=7233)
logger.info("running workflow on app address: %s", url)

client = await Client.connect(url)

# Run a worker for the workflow
async with Worker(client, task_queue="my-task-queue", workflows=[SayHello], activities=[say_hello]):
name = "Jean-luc"
result = await client.execute_workflow(
SayHello.run, name, id="my-workflow-id", task_queue="my-task-queue", run_timeout=timedelta(seconds=20)
)
logger.info(f"result: {result}")
assert result == f"Hello, {name}!"


async def create_default_namespace(ops_test: OpsTest):
"""Creates default namespace on Temporal server using tctl.
Args:
ops_test: PyTest object.
"""
# Register default namespace from admin charm.
action = (
await ops_test.model.applications[APP_NAME_TEMPORAL_ADMIN]
.units[0]
.run_action("tctl", args="--ns default namespace register -rd 3")
)
result = (await action.wait()).results
logger.info(f"tctl result: {result}")
assert "result" in result and result["result"] == "command succeeded"


async def get_application_url(ops_test: OpsTest, application, port):
"""Return application URL from the model.
Args:
ops_test: PyTest object.
application: Name of the application.
port: Port number of the URL.
Returns:
Application URL of the form {address}:{port}
"""
status = await ops_test.model.get_status() # noqa: F821
address = status["applications"][application].public_address
return f"{address}:{port}"


async def get_unit_url(ops_test: OpsTest, application, unit, port, protocol="http"):
"""Return unit URL from the model.
Args:
ops_test: PyTest object.
application: Name of the application.
unit: Number of the unit.
port: Port number of the URL.
protocol: Transfer protocol (default: http).
Returns:
Unit URL of the form {protocol}://{address}:{port}
"""
status = await ops_test.model.get_status() # noqa: F821
address = status["applications"][application]["units"][f"{application}/{unit}"]["address"]
return f"{protocol}://{address}:{port}"


async def perform_temporal_integrations(ops_test: OpsTest):
"""Integrate Temporal charm with postgresql, admin and ui charms.
Args:
ops_test: PyTest object.
"""
await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:db", "postgresql-k8s:database")
await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:visibility", "postgresql-k8s:database")
await ops_test.model.integrate(f"{APP_NAME_TEMPORAL_SERVER}:admin", f"{APP_NAME_TEMPORAL_ADMIN}:admin")
await ops_test.model.wait_for_idle(
apps=[APP_NAME_TEMPORAL_SERVER], status="active", raise_on_blocked=False, timeout=180
)

assert ops_test.model.applications[APP_NAME_TEMPORAL_SERVER].units[0].workload_status == "active"


async def perform_airbyte_integrations(ops_test: OpsTest):
"""Perform Airbyte charm integrations.
Args:
ops_test: PyTest object.
"""
await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "postgresql-k8s")
await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, "minio")
await ops_test.model.integrate(APP_NAME_AIRBYTE_SERVER, APP_NAME_AIRBYTE_UI)
await ops_test.model.integrate(APP_NAME_AIRBYTE_UI, "nginx-ingress-integrator")

await ops_test.model.wait_for_idle(
apps=[APP_NAME_AIRBYTE_SERVER, APP_NAME_AIRBYTE_UI, "nginx-ingress-integrator"],
status="active",
raise_on_blocked=False,
timeout=600,
)

assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active"
assert ops_test.model.applications[APP_NAME_AIRBYTE_UI].units[0].workload_status == "active"
20 changes: 20 additions & 0 deletions tests/integration/temporal_client/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.


"""Temporal client activity."""

from temporalio import activity


@activity.defn
async def say_hello(name: str) -> str:
"""Temporal activity.
Args:
name: used to run the dynamic activity.
Returns:
String in the form "Hello, {name}!
"""
return f"Hello, {name}!"
30 changes: 30 additions & 0 deletions tests/integration/temporal_client/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.


"""Temporal client sample workflow."""

from datetime import timedelta

from temporalio import workflow

# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
from .activities import say_hello


@workflow.defn
class SayHello:
"""Temporal workflow class."""

@workflow.run
async def run(self, name: str) -> str:
"""Workflow execution method.
Args:
name: used to run the dynamic activity.
Returns:
Workflow execution
"""
return await workflow.execute_activity(say_hello, name, schedule_to_close_timeout=timedelta(seconds=5))
Loading

0 comments on commit 35d9c92

Please sign in to comment.