diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst index 138fd769..4a3ea4f7 100644 --- a/CONTRIBUTORS.rst +++ b/CONTRIBUTORS.rst @@ -42,5 +42,6 @@ These are the contributors to pylxd according to the Github repository. fliiiix Felix simondeziel Simon Déziel (Canonical) sparkiegeek Adam Collard (Canonical) + mobergeron Marc Olivier Bergeron =============== ================================== diff --git a/doc/source/network-acls.rst b/doc/source/network-acls.rst new file mode 100644 index 00000000..e75a1033 --- /dev/null +++ b/doc/source/network-acls.rst @@ -0,0 +1,94 @@ +.. py:currentmodule:: pylxd.models + +Network ACLs +======== + +:class:`NetworkACL` objects show the current network ACLs available to LXD. Creation +and / or modification of network ACLs is possible only if 'network_acl' LXD API +extension is present. + + +Manager methods +--------------- + +Network ACLs can be queried through the following client manager +methods: + + + - :func:`~NetworkACL.all` - Retrieve all networks. + - :func:`~NetworkACL.exists` - See if a network ACL with a name exists. + Returns `bool`. + - :func:`~NetworkACL.get` - Get a specific network ACL, by its name. + - :func:`~NetworkACL.create` - Create a new network ACL. + The name of the network ACL is required. `description`, `egress`, `ingress` and `config` + are optional and the scope of their contents is documented in the LXD + documentation. + + +Network ACL attributes +------------------ + + - :attr:`~NetworkACL.name` - The name of the network ACL. + - :attr:`~NetworkACL.description` - The description of the network ACL. + - :attr:`~NetworkACL.egress` - The egress of the network ACL. + - :attr:`~NetworkACL.ingress` - The ingress of the network ACL. + - :attr:`~NetworkACL.used_by` - A list of containers using this network ACL. + - :attr:`~NetworkACL.config` - The configuration associated with the network ACL. + + +Network ACL methods +--------------- + + - :func:`~NetworkACL.rename` - Rename the network ACL. + - :func:`~NetworkACL.save` - Save the network ACL. This uses the PUT HTTP method and + not the PATCH. + - :func:`~NetworkACL.delete` - Deletes the network ACL. + +.. py:currentmodule:: pylxd.models + +Examples +-------- + +:class:`NetworkACL` operations follow the same manager-style as other +classes. Network ACLs are keyed on a unique name. + +.. code-block:: python + + >>> client.network_acls.exists('allow-external-ingress') + True + + >>> acl = client.network_acls.get('allow-external-ingress') + >>> acl + NetworkACL(config={}, description="Allowing external source for ingress", egress=[], ingress=[{"action": "allow", "description": "Allow external sources", "source": "@external", "state": "enabled"}], name="allow-external-ingress") + + >>> print(acl) + { + "name": "allow-external-ingress", + "description": "Allowing external source for ingress", + "egress": [], + "ingress": [ + { + "action": "allow", + "source": "@external", + "description": "Allow external sources", + "state": "enabled" + } + ], + "config": {}, + "used_by": [] + } + +The network ACL can then be modified and saved. + + >>> acl.ingress.append({"action":"allow","state":"enabled"}) + >>> acl.save() + +Or deleted + + >>> acl.delete() + +To create a new network ACL, use :func:`~NetworkACL.create` with a name, and optional +arguments: `description` and `egress` and `ingress` and `config`. + + >>> acl = client.network_acls.create(name="allow-external-ingress", description="Allowing external source for ingress", ingress=[{"action":"allow","description":"Allow external sources","source":"@external","state":"enabled"}]) + diff --git a/integration/test_networks.py b/integration/test_networks.py index 72518545..1336313b 100644 --- a/integration/test_networks.py +++ b/integration/test_networks.py @@ -121,3 +121,45 @@ def test_delete(self): self.assertRaises( exceptions.LXDAPIException, self.client.networks.get, self.network.name ) + +class TestNetworkACL(NetworkTestCase): + """Tests for `NetworkACL`.""" + + def setUp(self): + super().setUp() + name = self.create_network_acl() + self.acl = self.client.network_acls.get(name) + + def tearDown(self): + super().tearDown() + self.delete_network_acl(self.acl.name) + + def test_save(self): + """A network ACL is updated""" + self.acl.ingress.insert(0, {"action":"allow","state":"enabled"}) + self.acl.save() + + acl = self.client.network_acls.get(self.acl.name) + self.assertEqual({"action":"allow","state":"enabled"}, acl.ingress[0]) + + def test_rename(self): + """A network ACL is renamed""" + oldName = self.acl.name + name = "allow-new-name" + self.addCleanup(self.delete_network_acl, name) + + self.acl.rename(name) + acl = self.client.network_acls.get(name) + + self.assertEqual(name, acl.name) + self.assertRaises( + exceptions.LXDAPIException, self.client.network_acls.get, oldName + ) + + def test_delete(self): + """A network ACL is deleted""" + self.acl.delete() + + self.assertRaises( + exceptions.LXDAPIException, self.client.network_acls.get, self.acl.name + ) diff --git a/integration/testing.py b/integration/testing.py index 118d71e9..192fbd4c 100644 --- a/integration/testing.py +++ b/integration/testing.py @@ -157,6 +157,22 @@ def delete_network(self, name): except exceptions.NotFound: pass + def create_network_acl(self): + name = self.generate_object_name() + self.lxd.network_acls.post( + json={ + "name": name, + "config": {}, + } + ) + return name + + def delete_network_acl(self, name): + try: + self.lxd.network_acls[name].delete() + except exceptions.NotFound: + pass + def assertCommon(self, response): """Assert common LXD responses. diff --git a/pylxd/client.py b/pylxd/client.py index 968874d4..912c5fbc 100644 --- a/pylxd/client.py +++ b/pylxd/client.py @@ -119,7 +119,7 @@ def __getattr__(self, name): :rtype: _APINode """ # '-' can't be used in variable names - if name in ("storage_pools", "virtual_machines"): + if name in ("storage_pools", "virtual_machines", "network_acls"): name = name.replace("_", "-") return self.__class__( f"{self._api_endpoint}/{name}", @@ -435,6 +435,7 @@ def __init__( self.virtual_machines = managers.VirtualMachineManager(self) self.images = managers.ImageManager(self) self.networks = managers.NetworkManager(self) + self.network_acls = managers.NetworkACLManager(self) self.operations = managers.OperationManager(self) self.profiles = managers.ProfileManager(self) self.projects = managers.ProjectManager(self) diff --git a/pylxd/managers.py b/pylxd/managers.py index 2c0d6d75..d8882d02 100644 --- a/pylxd/managers.py +++ b/pylxd/managers.py @@ -53,6 +53,10 @@ class NetworkForwardManager(BaseManager): manager_for = "pylxd.models.NetworkForward" +class NetworkACLManager(BaseManager): + manager_for = "pylxd.models.NetworkACL" + + class OperationManager(BaseManager): manager_for = "pylxd.models.Operation" diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py index d73b59f2..f015ea4f 100644 --- a/pylxd/models/__init__.py +++ b/pylxd/models/__init__.py @@ -3,7 +3,7 @@ from pylxd.models.container import Container from pylxd.models.image import Image from pylxd.models.instance import Instance, Snapshot -from pylxd.models.network import Network, NetworkForward +from pylxd.models.network import Network, NetworkACL, NetworkForward from pylxd.models.operation import Operation from pylxd.models.profile import Profile from pylxd.models.project import Project @@ -19,6 +19,7 @@ "Image", "Instance", "Network", + "NetworkACL", "NetworkForward", "Operation", "Profile", diff --git a/pylxd/models/network.py b/pylxd/models/network.py index 665e8561..b2f2fd14 100644 --- a/pylxd/models/network.py +++ b/pylxd/models/network.py @@ -57,6 +57,149 @@ def __repr__(self): return f"{self.__class__.__name__}({', '.join(sorted(attrs))})" +class NetworkACL(model.Model): + """Model representing a LXD network ACL.""" + + name = model.Attribute() + description = model.Attribute() + egress = model.Attribute() + ingress = model.Attribute() + config = model.Attribute() + used_by = model.Attribute(readonly=True) + _endpoint = "network-acls" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @classmethod + def exists(cls, client, name): + """ + Determine whether network ACL with provided name exists. + + :param client: client instance + :type client: :class:`~pylxd.client.Client` + :param name: name of the network ACL + :type name: str + :returns: `True` if network ACL exists, `False` otherwise + :rtype: bool + """ + try: + client.network_acls.get(name) + return True + except cls.NotFound: + return False + + @classmethod + def get(cls, client, name): + """ + Get a network ACL by name. + + :param client: client instance + :type client: :class:`~pylxd.client.Client` + :param name: name of the network ACL + :type name: str + :returns: network ACL instance (if exists) + :rtype: :class:`NetworkACL` + :raises: :class:`~pylxd.exceptions.NotFound` if network ACL does not exist + """ + response = client.api.network_acls[name].get() + + return cls(client, **response.json()["metadata"]) + + @classmethod + def all(cls, client): + """ + Get all network ACLs. + + :param client: client instance + :type client: :class:`~pylxd.client.Client` + :rtype: list[:class:`NetworkACL`] + """ + response = client.api.network_acls.get() + + acls = [] + for url in response.json()["metadata"]: + name = url.split("/")[-1] + acls.append(cls(client, name=name)) + return acls + + @classmethod + def create( + cls, client, name, description=None, egress=None, ingress=None, config=None + ): + """ + Create a network ACL. + + :param client: client instance + :type client: :class:`~pylxd.client.Client` + :param name: name of the network ACL + :type name: str + :param description: description of the network ACL + :type description: str + :param egress: egress of the network ACL + :type egress: list + :param ingress: ingress of the network ACL + :type ingress: list + :param config: additional configuration + :type config: dict + """ + client.assert_has_api_extension("network_acl") + + acl = {"name": name} + if description is not None: + acl["description"] = description + if egress is not None: + acl["egress"] = egress + if ingress is not None: + acl["ingress"] = ingress + if config is not None: + acl["config"] = config + client.api.network_acls.post(json=acl) + return cls.get(client, name) + + def rename(self, new_name): + """ + Rename a network ACL. + + :param new_name: new name of the network ACL + :type new_name: str + :return: Renamed network ACL instance + :rtype: :class:`NetworkACL` + """ + self.client.assert_has_api_extension("network_acl") + self.client.api.network_acls[self.name].post(json={"name": new_name}) + return NetworkACL.get(self.client, new_name) + + def save(self, *args, **kwargs): + self.client.assert_has_api_extension("network_acl") + super().save(*args, **kwargs) + + def log(self): + """Get network acl log.""" + response = self.api.log.get() + pring(response.json()["metadata"]) + log = NetworkACLLog(response.json()["metadata"]) + return log + + @property + def api(self): + return self.client.api.network_acls[self.name] + + def __str__(self): + return json.dumps(self.marshall(skip_readonly=False), indent=2) + + def __repr__(self): + attrs = [] + for attribute, value in self.marshall().items(): + attrs.append(f"{attribute}={json.dumps(value, sort_keys=True)}") + + return f"{self.__class__.__name__}({', '.join(sorted(attrs))})" + + +class NetworkACLLog(model.AttributeDict): + """A simple object for representing a network state.""" + + class NetworkState(model.AttributeDict): """A simple object for representing a network state.""" diff --git a/pylxd/models/tests/test_network.py b/pylxd/models/tests/test_network.py index 9b0e7153..2af863c0 100644 --- a/pylxd/models/tests/test_network.py +++ b/pylxd/models/tests/test_network.py @@ -283,6 +283,252 @@ def test_repr(self): ) +class TestNetworkACL(testing.PyLXDTestCase): + """Tests for pylxd.models.NetworkACL.""" + + def test_get(self): + """A network ACL is fetched.""" + name = "allow-external-ingress" + an_network = models.NetworkACL.get(self.client, name) + + self.assertEqual(name, an_network.name) + + def test_get_not_found(self): + """LXDAPIException is raised on unknown network ACL.""" + + def not_found(_, context): + context.status_code = 404 + return json.dumps( + {"type": "error", "error": "Not found", "error_code": 404} + ) + + self.add_rule( + { + "text": not_found, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + } + ) + + self.assertRaises( + exceptions.LXDAPIException, + models.NetworkACL.get, + self.client, + "allow-external-ingress", + ) + + def test_get_error(self): + """LXDAPIException is raised on error.""" + + def error(_, context): + context.status_code = 500 + return json.dumps( + { + "type": "error", + "error": "Not found", + "error_code": 500, + } + ) + + self.add_rule( + { + "text": error, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + } + ) + + self.assertRaises( + exceptions.LXDAPIException, + models.NetworkACL.get, + self.client, + "allow-external-ingress", + ) + + def test_exists(self): + """True is returned if network exists.""" + name = "allow-external-ingress" + + self.assertTrue(models.NetworkACL.exists(self.client, name)) + + def test_not_exists(self): + """False is returned when network does not exist.""" + + def not_found(_, context): + context.status_code = 404 + return json.dumps( + {"type": "error", "error": "Not found", "error_code": 404} + ) + + self.add_rule( + { + "text": not_found, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + } + ) + + name = "allow-external-ingress" + + self.assertFalse(models.NetworkACL.exists(self.client, name)) + + def test_all(self): + acls = models.NetworkACL.all(self.client) + + self.assertEqual(2, len(acls)) + + def test_create_with_parameters(self): + with mock.patch.object(self.client, "assert_has_api_extension"): + acl = models.NetworkACL.create( + self.client, + name="allow-external-ingress1", + config={}, + egress=[], + ingress=[ + { + "action": "allow", + "description": "Allow external sources", + "source": "@external", + "state": "enabled", + } + ], + description="Network ACL description", + ) + + self.assertIsInstance(acl, models.NetworkACL) + self.assertEqual("allow-external-ingress1", acl.name) + self.assertEqual("Network ACL description", acl.description) + self.assertEqual([], acl.egress) + self.assertEqual( + [ + { + "action": "allow", + "description": "Allow external sources", + "source": "@external", + "state": "enabled", + } + ], + acl.ingress, + ) + + def test_create_default(self): + with mock.patch.object(self.client, "assert_has_api_extension"): + acl = models.NetworkACL.create(self.client, "allow-external-ingress1") + + self.assertIsInstance(acl, models.NetworkACL) + self.assertEqual("allow-external-ingress1", acl.name) + + def test_create_api_not_available(self): + # Note, by default with the tests, no 'network_acl' extension is available. + with self.assertRaises(LXDAPIExtensionNotAvailable): + models.NetworkACL.create( + self.client, + name="allow-external-ingress1", + config={}, + egress=[], + ingress=[], + description="Network ACL description", + ) + + def test_rename(self): + with mock.patch.object(self.client, "assert_has_api_extension"): + acl = models.NetworkACL.get(self.client, "allow-external-ingress") + renamed_acl = acl.rename("allow-external-ingress2") + + self.assertEqual("allow-external-ingress2", renamed_acl.name) + + def test_update(self): + """A network is updated.""" + with mock.patch.object(self.client, "assert_has_api_extension"): + network = models.NetworkACL.get(self.client, "allow-external-ingress") + network.ingress = [] + network.save() + self.assertEqual([], network.ingress) + + def test_fetch(self): + """A partial network ACL is synced.""" + acl = self.client.network_acls.all()[1] + + acl.sync() + + self.assertEqual("Network ACL description", acl.description) + + def test_fetch_not_found(self): + """LXDAPIException is raised on bogus network fetch.""" + + def not_found(_, context): + context.status_code = 404 + return json.dumps( + {"type": "error", "error": "Not found", "error_code": 404} + ) + + self.add_rule( + { + "text": not_found, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + } + ) + acl = models.NetworkACL(self.client, name="allow-external-ingress") + + self.assertRaises(exceptions.LXDAPIException, acl.sync) + + def test_fetch_error(self): + """LXDAPIException is raised on fetch error.""" + + def error(_, context): + context.status_code = 500 + return json.dumps( + {"type": "error", "error": "Not found", "error_code": 500} + ) + + self.add_rule( + { + "text": error, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + } + ) + acl = models.NetworkACL(self.client, name="allow-external-ingress") + + self.assertRaises(exceptions.LXDAPIException, acl.sync) + + def test_delete(self): + """A network ACL is deleted.""" + acl = models.NetworkACL(self.client, name="allow-external-ingress") + + acl.delete() + + def test_str(self): + """Network ACL is printed in JSON format.""" + acl = models.NetworkACL.get(self.client, "allow-external-ingress") + self.assertEqual( + json.loads(str(acl)), + { + "name": "allow-external-ingress", + "description": "Network ACL description", + "egress": [], + "ingress": [ + { + "action": "allow", + "source": "@external", + "description": "Allow external sources", + "state": "enabled", + } + ], + "config": {}, + "used_by": [], + }, + ) + + def test_repr(self): + acl = models.NetworkACL.get(self.client, "allow-external-ingress") + self.assertEqual( + repr(acl), + 'NetworkACL(config={}, description="Network ACL description", egress=[], ingress=[{"action": "allow", "description": "Allow external sources", "source": "@external", "state": "enabled"}], name="allow-external-ingress")', + ) + + class TestNetworkForward(testing.PyLXDTestCase): """Tests for pylxd.models.NetworkForward.""" diff --git a/pylxd/tests/mock_lxd.py b/pylxd/tests/mock_lxd.py index bdc56197..c541a079 100644 --- a/pylxd/tests/mock_lxd.py +++ b/pylxd/tests/mock_lxd.py @@ -95,6 +95,42 @@ def networks_DELETE(_, context): ) +def network_acls_GET(request, _): + name = request.path.split("/")[-1] + return json.dumps( + { + "type": "sync", + "metadata": { + "config": {}, + "name": name, + "description": "Network ACL description", + "egress": [], + "ingress": [ + { + "action": "allow", + "source": "@external", + "description": "Allow external sources", + "state": "enabled", + } + ], + "used_by": [], + }, + } + ) + + +def network_acls_POST(_, context): + context.status_code = 200 + return json.dumps({"type": "sync", "metadata": {}}) + + +def network_acls_DELETE(_, context): + context.status_code = 202 + return json.dumps( + {"type": "sync", "operation": "/1.0/operations/operation-abc?project=default"} + ) + + def profile_GET(request, context): name = request.path.split("/")[-1] return json.dumps( @@ -830,6 +866,55 @@ def snapshot_DELETE(request, context): "method": "DELETE", "url": r"^http://pylxd.test/1.0/networks/eth0$", }, + # Network ACLs + { + "json": { + "type": "sync", + "metadata": [ + "http://pylxd.test/1.0/network-acls/allow-external-ingress", + "http://pylxd.test/1.0/network-acls/allow-external-ingress1", + ], + }, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls$", + }, + { + "text": network_acls_POST, + "method": "POST", + "url": r"^http://pylxd.test/1.0/network-acls$", + }, + { + "text": network_acls_GET, + "method": "POST", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + }, + { + "json": { + "type": "sync", + "metadata": { + "name": "allow-external-ingress", + "type": "loopback", + "used_by": [], + }, + }, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + }, + { + "text": network_acls_GET, + "method": "GET", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress(1|2)?$", + }, + { + "text": json.dumps({"type": "sync"}), + "method": "PUT", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + }, + { + "text": network_acls_DELETE, + "method": "DELETE", + "url": r"^http://pylxd.test/1.0/network-acls/allow-external-ingress$", + }, # Network forwards { "json": {