Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E Tests Improvements #140

Open
wants to merge 16 commits into
base: stable/yoga-m3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions networking_nsxv3/tests/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
export OS_STDERR_CAPTURE=0

# E2E Test specific vars
export E2E_BB="nova" # Building block to use for E2E tests
export E2E_NETWORK_NAME="test-net-1" # Pre-existing Network used for the E2E Test Scenarios
export E2E_SERVER_NAME="os-test-vm-1" # Pre-existing VM (server) used for the E2E Test Scenarios
export E2E_CREATE_SERVER_NAME_PREFIX="os-e2e-test-" # Prefix for server names, UUID will be appended
Expand Down Expand Up @@ -59,10 +60,10 @@
- Add/Remove trunk to/from server [implemented]
- Provision server with trunk [implemented]
## Security Groups & Policies
- Create/Delete/Update Security Groups (Remote IP)
- Create/Delete/Update Security Groups (Remote Group)
- Add/Remove rules to Security Groups
- Add/Remove ports to/from Security Group
- Create/Delete/Update Security Groups (Remote IP) [implemented]
- Create/Delete/Update Security Groups (Remote Group) [implemented]
- Add/Remove rules to Security Groups [implemented]
- Add/Remove ports to/from Security Group [implemented]
## Address Groups
- Create/Delete/Update IPv4/IPv6 address groups [implemented]
- Create mixed IPv4/IPv6 members [implemented]
Expand Down
151 changes: 131 additions & 20 deletions networking_nsxv3/tests/e2e/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import eventlet
eventlet.monkey_patch()

from networking_nsxv3.common import config # noqa
from typing import List
from keystoneauth1 import identity
from keystoneauth1 import session
from networking_nsxv3.tests.e2e import neutron
from novaclient.v2.servers import Server
from novaclient.v2.servers import NetworkInterface
from novaclient.v2.client import Client as NovaClient
from novaclient import client as nova
from neutron.tests import base
Expand All @@ -15,9 +18,6 @@
from networking_nsxv3.plugins.ml2.drivers.nsxv3.agent.provider_nsx_policy import API
import uuid

from networking_nsxv3.common import config # noqa


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -81,6 +81,7 @@ def setUpClass(cls):
cls.nsx_client = client_nsx.Client()
cls.nsx_client.version # This will force the client to login
cls.OS_PROJECT_ID = cls.auth.get_project_id(cls.sess)
cls.availability_zone = g("E2E_BB")

def setUp(self):
super().setUp()
Expand Down Expand Up @@ -110,6 +111,7 @@ def create_test_server(self, name, image_name, flavor_name, network_id=None, sec
min_count=1,
max_count=1,
security_groups=security_groups,
availability_zone=self.availability_zone,
nics=[{'net-id': network_id}] if len(nic_ports) < 1 else nic_ports,
block_device_mapping_v2=[{
"uuid": image.id,
Expand Down Expand Up @@ -154,22 +156,79 @@ def get_nsx_sg_by_os_id(self, os_sg_id):
return resp.json()
return None

@RetryDecorator.RetryIfResultIsNone(max_retries=5, sleep_duration=5)
def get_nsx_sg_effective_members(self, sg_id) -> list:
res = self.nsx_client.get(path=API.SEARCH_DSL, params={
"query": "resource_type:SegmentPort",
"dsl": sg_id,
"page_size": 100,
"data_source": "INTENT",
"exclude_internal_types": True
})
@RetryDecorator.RetryIfResultIsNone(max_retries=30, sleep_duration=30)
def get_nsx_rules(self, os_sg_id, desired_count=None):
res = self.nsx_client.get(API.RULES.format(os_sg_id))
if res.ok:
j = res.json()
if j['result_count'] == 0:
return None
if desired_count is not None:
if j['result_count'] != desired_count:
return None
return j['results']
return None

def get_nsx_rules_no_retry(self, os_sg_id):
res = self.nsx_client.get(API.RULES.format(os_sg_id))
if res.ok:
j = res.json()
if j['result_count'] == 0:
return []
return j['results']
return []

@RetryDecorator.RetryIfResultIsNone(max_retries=30, sleep_duration=60)
def get_nsx_sg_effective_members(self, os_sg_id, ensure_port_names: set = None) -> list:
"""
Retrieves the effective members of an NSX security group.
Retries the request up to 30 times with a 60-second delay between retries.
If the `ensure_port_names` parameter is provided, the method will ensure that all port names are present in the response before returning.

Args:
os_sg_id (str): The ID of the OpenStack security group.
ensure_port_names (set, optional): A set of port names to ensure are present in the response.

Returns:
list: A list of effective members of the NSX security group, or None if the request fails or no members are found.
"""
res = self.nsx_client.get(API.GROUP.format(os_sg_id) + "/members/segment-ports", {"page_size": 1000})
if res.ok:
j = res.json()
if j['result_count'] == 0:
return None
if ensure_port_names is not None:
# Ensure all ports are in the response
port_names = set([p['display_name'] for p in j['results']])
if not set(ensure_port_names).issubset(port_names):
return None
return j['results']
return None

def get_sg_members_no_retry(self, os_sg_id) -> list:
res = self.nsx_client.get(API.GROUP.format(os_sg_id) + "/members/segment-ports", {"page_size": 1000})
if res.ok:
j = res.json()
if j['result_count'] == 0:
return []
return j['results']
return []

def get_os_default_security_group(self):
# Get the default security group
lsg = self.neutron_client.list_security_groups(project_id=self.OS_PROJECT_ID)
default_sg = [sg for sg in lsg['security_groups'] if sg['name'] == 'default'][0]

# Assert that the default security group exists and has active member ports
self.assertIsNotNone(default_sg, "Default security group must exist")
sg_ports = self.neutron_client.list_ports(security_groups=[default_sg['id']])
self.assertGreater(len(sg_ports), 0, "Default security group must have at least one port")
self.assertGreater(len(sg_ports['ports']), 0, "Default security group must have at least one port")
self.assertTrue(any([p['status'] == 'ACTIVE' and p['admin_state_up'] for p in sg_ports['ports']]),
"Default security group must have at least one active port")

return default_sg

def set_test_network(self, net_name: str):
""" Set the test network (self.test_network) to the network with the name provided.
"""
Expand All @@ -181,13 +240,23 @@ def set_test_network(self, net_name: str):
def set_test_server(self, server_name: str):
""" Set the test server (self.test_server) to the server with the name provided.
"""
servers = self.nova_client.servers.list()
servers = self.nova_client.servers.list(search_opts={"availability_zone": self.availability_zone})
self.test_server: Server = next((s for s in servers if s.name == server_name), None)
self.assertIsNotNone(self.test_server, f"Server '{server_name}' not found.")
self.assertIsNotNone(self.test_server, f"Server '{server_name}' not found in AZ '{self.availability_zone}'.")

def create_test_ports(self):
""" Create ports on the test network (self.test_network) and store their IDs (self.test_ports).
Also add cleanup for deletion.
"""
Creates test ports for the given test network.

This method iterates over the list of test ports (self.test_ports) and creates a port for each one.
The created ports are associated with the test network specified by `self.test_network['id']`.
The name of each port is set based on the corresponding `self.port['name']` value.

Returns:
None

Raises:
Any exceptions raised by the `neutron_client.create_port` method.
"""
for port in self.test_ports:
result = self.neutron_client.create_port({
Expand Down Expand Up @@ -216,13 +285,55 @@ def create_new_port(self) -> dict:
self.addCleanup(self.neutron_client.delete_port, new_port['id'])
return new_port

def assert_server_nsx_ports_sgs(self, ports: list):
def attach_test_ports_to_test_server(self):
"""
Attach test ports to the test server.

This method attaches the test ports (self.test_ports) to the test server (elf.test_server),
using the `nova_client.servers.interface_attach` method.
It also adds a cleanup step to detach the ports using `nova_client.servers.interface_detach` method.

Args:
None

Returns:
None
"""
for port in self.test_ports:
self.nova_client.servers.interface_attach(
server=self.test_server.id,
port_id=port['id'],
net_id=None,
fixed_ip=None
)
self.addCleanup(self.nova_client.servers.interface_detach, server=self.test_server.id, port_id=port['id'])

def assert_os_ports_nsx_sg_membership(self, ports: List[NetworkInterface]):
LOG.info(f"""Verifying NSX Security Group membership for {len(ports)} ports:
({', '.join([p.id for p in ports])})
This may take a while as the test will try to wait for NSX to sync with OpenStack...""")
desired_results = []
actual_results = []

for port in ports:
port_sgs = self.neutron_client.show_port(port.id)['port']['security_groups']
desired_results.append({"port_id": port.id, "sgs": port_sgs})
actual_results.append({"port_id": port.id, "sgs": []})
# For each SG get the Group from NSX and its members
for sg_id in port_sgs:
nsx_ports_for_sg = self.get_nsx_sg_effective_members(sg_id)
self.assertIsNotNone(nsx_ports_for_sg, f"Security Group {sg_id} not found in NSX.")
nsx_ports_for_sg = self.get_nsx_sg_effective_members(sg_id, ensure_port_names={port.id})
self.assertIsNotNone(nsx_ports_for_sg, f"""Security Group {sg_id} not found in NSX or the port {port.id} is not a member.
Desired results: {desired_results[-1]}
NSX Ports in SG {sg_id}: {[p['display_name'] for p in self.get_sg_members_no_retry(sg_id)]}""")
# Assert the port is a member of the SG
nsx_port = next((p for p in nsx_ports_for_sg if p['display_name'] == port.id), None)
self.assertIsNotNone(nsx_port, f"Port {port.id} not found in Security Group {sg_id} in NSX.")
if nsx_port:
actual_results[-1]['sgs'].append(sg_id)

# order 'sgs' sublists for comparison
for r in actual_results:
r['sgs'].sort()
for r in desired_results:
r['sgs'].sort()

self.assertListEqual(desired_results, actual_results, "Security Group membership does not match.")
77 changes: 36 additions & 41 deletions networking_nsxv3/tests/e2e/test_address_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,46 @@
eventlet.monkey_patch()

from networking_nsxv3.common import config # noqa
from oslo_log import log as logging
import uuid
from networking_nsxv3.tests.e2e import base
from networking_nsxv3.plugins.ml2.drivers.nsxv3.agent.provider_nsx_policy import API

from networking_nsxv3.tests.e2e import base
from typing import List
from novaclient.v2.servers import NetworkInterface
import uuid
import os
from oslo_log import log as logging

LOG = logging.getLogger(__name__)


class TestAddressGroups(base.E2ETestCase):

def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
self.test_network1_name = os.environ.get("E2E_NETWORK_NAME", None)

def setUp(self):
super().setUp()
# self.skipTest("Skipping test temporarily.")

self.new_addr_grp_rules = []
self.new_grp_ids = []
self.new_sg_ids = []
self.existing_updated_ports = []
self.assertGreater(len(self.nova_client.servers.list()), 0, "At least one server should exist!")
self.def_os_sg = self._get_os_default_sg()
self.new_server = None
self.def_os_sg = self.get_os_default_security_group()

if not self.test_network1_name:
self.fail("E2E_NETWORK_NAME is not set. Please set it to the name of the network to use for testing.")
self.set_test_network(self.test_network1_name)

def tearDown(self):
super().tearDown()
# Clean up & Assert cleanup
LOG.info("Tearing down test case...")
addr_grp_ids = [ag.get("security_group_rule", {}).get("id") for ag in self.new_addr_grp_rules]
self._revert_updated_ports()
self.doCleanups()
self._clean_neutron_sg_rules()
self._clean_addr_groups()
self._clean_sec_groups()
addr_grp_ids = [ag.get("security_group_rule", {}).get("id") for ag in self.new_addr_grp_rules]
self._assert_nsx_cleanup(rule_ids=addr_grp_ids)

def test_create_ipv4_address_groups(self):
Expand Down Expand Up @@ -158,9 +167,10 @@ def test_update_address_groups(self):

def test_address_group_in_multiple_security_groups(self):
LOG.info("Testing address group in multiple security groups...")

# Check that there are active ports
ports = self._get_assert_active_ports()

self._create_server()
ports: List[NetworkInterface] = self.new_server.interface_list()
self.assertTrue(ports, "Server should have at least one port.")

unique_addr_grp_name = str(uuid.uuid4())
new_addr_grp = self.neutron_client.create_address_group(body={
Expand All @@ -187,12 +197,12 @@ def test_address_group_in_multiple_security_groups(self):
})

# Attach the new SGs to an active port
self.existing_updated_ports.append(ports[0])
self.neutron_client.update_port(ports[0]['id'], body={
self.neutron_client.update_port(ports[0].id, body={
"port": {
"security_groups": [new_sg_1['security_group']['id'], new_sg_2['security_group']['id']]
}
})
self.addCleanup(self.neutron_client.delete_port, ports[0].id)

# Verify that the security groups were created
self.assertTrue(new_sg_1 and new_sg_1.get('security_group', {}).get('id'))
Expand Down Expand Up @@ -338,13 +348,6 @@ def _clean_addr_groups(self):
for ag in self.neutron_client.list_address_groups()['address_groups']])
self.new_grp_ids = []

def _revert_updated_ports(self):
# Revert back the updated ports
if self.existing_updated_ports and len(self.existing_updated_ports) > 0:
for port in self.existing_updated_ports:
self.neutron_client.update_port(
port.get("id"), {"port": {"security_groups": port.get("security_groups")}})

def _clean_neutron_sg_rules(self):
if len(self.new_addr_grp_rules) > 0:
for new_addr_grp_rule in self.new_addr_grp_rules:
Expand All @@ -361,21 +364,6 @@ def _clean_sec_groups(self):
for sg in self.neutron_client.list_security_groups()['security_groups']])
self.new_sg_ids = []

def _get_os_default_sg(self):
# Get the default security group
lsg = self.neutron_client.list_security_groups(project_id=self.OS_PROJECT_ID)
default_sg = [sg for sg in lsg['security_groups'] if sg['name'] == 'default'][0]

# Assert that the default security group exists and has active member ports
self.assertIsNotNone(default_sg, "Default security group must exist")
sg_ports = self.neutron_client.list_ports(security_groups=[default_sg['id']])
self.assertGreater(len(sg_ports), 0, "Default security group must have at least one port")
self.assertGreater(len(sg_ports['ports']), 0, "Default security group must have at least one port")
self.assertTrue(any([p['status'] == 'ACTIVE' and p['admin_state_up'] for p in sg_ports['ports']]),
"Default security group must have at least one active port")

return default_sg

def _get_assert_new_grp_id(self, unique_addr_grp_name, new_addr_grp):
new_grp_id = None
if new_addr_grp:
Expand All @@ -390,11 +378,18 @@ def _get_assert_new_grp_id(self, unique_addr_grp_name, new_addr_grp):

return new_grp_id

def _get_assert_active_ports(self):
ports = self.neutron_client.list_ports(
device_owner="compute:nova", admin_state_up="True", status="ACTIVE").get('ports', [])
self.assertTrue(ports and len(ports) > 0, "No active ports found")
return ports
def _create_server(self):
img_name = os.environ.get("E2E_CREATE_SERVER_IMAGE_NAME", "cirros-0.3.2-i386-disk")
flvr_name = os.environ.get("E2E_CREATE_SERVER_FLAVOR_NAME", "m1.nano")
srv_name = os.environ.get("E2E_CREATE_SERVER_NAME_PREFIX", "os-e2e-test-") + str(uuid.uuid4())
sg_id = self.def_os_sg['id']
net = self.test_network

LOG.info(
f"Creating a server '{srv_name}' on network '{net['name']}' with image '{img_name}', flavor '{flvr_name}' and security group ID '{sg_id}'.")
self.new_server = self.create_test_server(srv_name, img_name, flvr_name, net['id'], security_groups=[sg_id])
self.assertIsNotNone(self.new_server, "Server should be created successfully.")
self.addCleanup(self.nova_client.servers.delete, self.new_server.id)

def _assert_and_append_new_grp_rules(self, new_addr_grp_rules):
for rule in new_addr_grp_rules:
Expand Down
Loading
Loading