From 2ba7b722e125c51a21546869bc77319a47cc6010 Mon Sep 17 00:00:00 2001 From: Mathias Ertl Date: Sat, 14 Dec 2024 14:55:13 +0100 Subject: [PATCH] move tests to pytest --- pyproject.toml | 9 + tests/__init__.py | 168 ------- tests/conftest.py | 154 +++++++ tests/test_aes.py | 643 +++++++++++++------------- tests/test_des.py | 50 ++- tests/test_dh.py | 719 +++++++++++++++--------------- tests/test_digest.py | 129 +++--- tests/test_dsa.py | 70 ++- tests/test_ecc.py | 310 ++++++------- tests/test_iterators.py | 67 ++- tests/test_public_key_external.py | 322 ++++++------- tests/test_rsa.py | 241 +++++----- tests/test_sessions.py | 311 ++++++------- tests/test_slots_and_tokens.py | 139 +++--- tests/test_threading.py | 53 ++- tests/test_x509.py | 463 +++++++++---------- 16 files changed, 1942 insertions(+), 1906 deletions(-) create mode 100644 tests/conftest.py diff --git a/pyproject.toml b/pyproject.toml index 5e4e4ad..c93aff7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,14 @@ Documentation = "http://python-pkcs11.readthedocs.io/en/latest/" Issues = "https://github.com/pyauth/python-pkcs11/issues" Repository = "https://github.com/pyauth/python-pkcs11" +[tool.pytest.ini_options] +markers = [ + "requires: marks tests require support for a certain PKCS11 mechanism.", + "xfail_nfast: Expected failure on nFast.", + "xfail_softhsm: Expected failure on SoftHSMv2.", + "xfail_opencryptoki: Expected failure on OpenCryptoki.", +] + [tool.ruff] line-length = 100 @@ -46,6 +54,7 @@ extend-select = [ "F", # pyflakes "I", # isort "G", # flake8-logging-format + "PT", # flake8-pytest-style "RUF", # ruff specific checks ] diff --git a/tests/__init__.py b/tests/__init__.py index d701e4d..e69de29 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,168 +0,0 @@ -""" -PKCS#11 Tests - -The following environment variables will influence the behaviour of test cases: - - PKCS11_MODULE, mandatory, points to the library/DLL to use for testing - - PKCS11_TOKEN_LABEL, mandatory, contains the token label - - PKCS11_TOKEN_PIN, optional (default is None), contains the PIN/passphrase of the token - - PKCS11_TOKEN_SO_PIN, optional (default is same as PKCS11_TOKEN_PIN), security officer PIN - - OPENSSL_PATH, optional, path to openssl executable (i.e. the folder that contains it) - -""" - -import os -import shutil -import unittest -from functools import wraps -from warnings import warn - -import pkcs11 - -try: - LIB = os.environ["PKCS11_MODULE"] -except KeyError as ex: - raise RuntimeError("Must define `PKCS11_MODULE' to run tests.") from ex - - -try: - TOKEN = os.environ["PKCS11_TOKEN_LABEL"] -except KeyError as ex: - raise RuntimeError("Must define `PKCS11_TOKEN_LABEL' to run tests.") from ex - -TOKEN_PIN = os.environ.get("PKCS11_TOKEN_PIN") # Can be None -if TOKEN_PIN is None: - warn("`PKCS11_TOKEN_PIN' env variable is unset.", stacklevel=2) - -TOKEN_SO_PIN = os.environ.get("PKCS11_TOKEN_SO_PIN") -if TOKEN_SO_PIN is None: - TOKEN_SO_PIN = TOKEN_PIN - warn( - "`PKCS11_TOKEN_SO_PIN' env variable is unset. Using value from `PKCS11_TOKEN_PIN'", - stacklevel=2, - ) - -OPENSSL = shutil.which("openssl", path=os.environ.get("OPENSSL_PATH")) -if OPENSSL is None: - warn("Path to OpenSSL not found. Please adjust `PATH' or define `OPENSSL_PATH'", stacklevel=2) - - -class TestCase(unittest.TestCase): - """Base test case, optionally creates a token and a session.""" - - with_token = True - """Creates a token for this test case.""" - with_session = True - """Creates a session for this test case.""" - - @classmethod - def setUpClass(cls): - super().setUpClass() - cls.lib = lib = pkcs11.lib(LIB) - - if cls.with_token or cls.with_session: - cls.token = lib.get_token(token_label=TOKEN) - - def setUp(self): - super().setUp() - - if self.with_session: - self.session = self.token.open(user_pin=TOKEN_PIN) - - def tearDown(self): - if self.with_session: - self.session.close() - - super().tearDown() - - -def requires(*mechanisms): - """ - Decorates a function or class as requiring mechanisms, else they are - skipped. - """ - - def check_requirements(self): - """Determine what, if any, required mechanisms are unavailable.""" - unavailable = set(mechanisms) - self.token.slot.get_mechanisms() - - if unavailable: - raise unittest.SkipTest("Requires %s" % ", ".join(map(str, unavailable))) - - def inner(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - check_requirements(self) - - return func(self, *args, **kwargs) - - return wrapper - - return inner - - -def xfail(condition): - """Mark a test that's expected to fail for a given condition.""" - - def inner(func): - if condition: - return unittest.expectedFailure(func) - - else: - return func - - return inner - - -class Is: - """ - Test what device we're using. - """ - - # trick: str.endswith() can accept tuples, - # see https://stackoverflow.com/questions/18351951/check-if-string-ends-with-one-of-the-strings-from-a-list - softhsm2 = LIB.lower().endswith( - ("libsofthsm2.so", "libsofthsm2.dylib", "softhsm2.dll", "softhsm2-x64.dll") - ) - nfast = LIB.lower().endswith(("libcknfast.so", "cknfast.dll")) - opencryptoki = LIB.endswith("libopencryptoki.so") - travis = os.environ.get("TRAVIS") == "true" - - -class Avail: - """ - Test if a resource is available - """ - - # openssl is searched across the exec path. Optionally, OPENSSL_PATH env variable can be defined - # in case there is no direct path to it (i.e. PATH does not point to it) - openssl = OPENSSL is not None - - -class Only: - """ - Limit tests to given conditions - """ - - softhsm2 = unittest.skipUnless(Is.softhsm2, "SoftHSMv2 only") - openssl = unittest.skipUnless(Avail.openssl, "openssl not found in the path") - - -class Not: - """ - Ignore tests for given devices - """ - - softhsm2 = unittest.skipIf(Is.softhsm2, "Not supported by SoftHSMv2") - nfast = unittest.skipIf(Is.nfast, "Not supported by nFast") - opencryptoki = unittest.skipIf(Is.opencryptoki, "Not supported by OpenCryptoki") - - -class FIXME: - """ - Tests is broken on this platform. - """ - - softhsm2 = xfail(Is.softhsm2) - nfast = xfail(Is.nfast) - opencryptoki = xfail(Is.opencryptoki) - travis = xfail(Is.travis) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f9af88c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,154 @@ +import os +import secrets +import shutil +import string +import subprocess +from pathlib import Path +from typing import Iterator +from unittest import mock +from warnings import warn + +import pytest +from _pytest.fixtures import SubRequest + +import pkcs11 + +ALLOWED_RANDOM_CHARS = string.ascii_letters + string.digits +LIB_PATH = os.environ.get("PKCS11_MODULE", "/usr/lib/softhsm/libsofthsm2.so") + +# trick: str.endswith() can accept tuples, +# see https://stackoverflow.com/questions/18351951/check-if-string-ends-with-one-of-the-strings-from-a-list +IS_SOFTHSM = LIB_PATH.lower().endswith( + ("libsofthsm2.so", "libsofthsm2.dylib", "softhsm2.dll", "softhsm2-x64.dll") +) +IS_NFAST = LIB_PATH.lower().endswith(("libcknfast.so", "cknfast.dll")) +IS_OPENCRYPTOKI = LIB_PATH.endswith("libopencryptoki.so") + +OPENSSL = shutil.which("openssl", path=os.environ.get("OPENSSL_PATH")) +if OPENSSL is None: + warn("Path to OpenSSL not found. Please adjust `PATH' or define `OPENSSL_PATH'", stacklevel=2) + + +def pytest_collection_modifyitems(items) -> None: + for item in items: + markers = [marker.name for marker in item.iter_markers()] + if "xfail_nfast" in markers and IS_NFAST: + item.add_marker( + pytest.mark.xfail(IS_NFAST, reason="Expected failure with nFast.", strict=True) + ) + if "xfail_softhsm" in markers and IS_SOFTHSM: + item.add_marker( + pytest.mark.xfail( + IS_SOFTHSM, reason="Expected failure with SoftHSMvs.", strict=True + ) + ) + if "xfail_opencryptoki" in markers: + item.add_marker( + pytest.mark.xfail( + IS_OPENCRYPTOKI, reason="Expected failure with OpenCryptoki.", strict=True + ) + ) + + +def get_random_string(length): + return "".join(secrets.choice(ALLOWED_RANDOM_CHARS) for i in range(length)) + + +@pytest.fixture(scope="session") +def lib(): + return pkcs11.lib(LIB_PATH) + + +@pytest.fixture +def softhsm_setup(tmp_path: Path) -> Iterator[Path]: # pragma: hsm + """Fixture to set up a unique SoftHSM2 configuration.""" + softhsm_dir = tmp_path / "softhsm" + token_dir = softhsm_dir / "tokens" + token_dir.mkdir(exist_ok=True, parents=True) + + softhsm2_conf = tmp_path / "softhsm2.conf" + print("# SoftHSMv2 conf:", softhsm2_conf) + + with open(softhsm2_conf, "w", encoding="utf-8") as stream: + stream.write(f"""# SoftHSM v2 configuration file + +directories.tokendir = {token_dir} +objectstore.backend = file + +# ERROR, WARNING, INFO, DEBUG +log.level = DEBUG + +# If CKF_REMOVABLE_DEVICE flag should be set +slots.removable = false + +# Enable and disable PKCS#11 mechanisms using slots.mechanisms. +slots.mechanisms = ALL + +# If the library should reset the state on fork +library.reset_on_fork = false""") + + with mock.patch.dict(os.environ, {"SOFTHSM2_CONF": str(softhsm2_conf)}): + yield softhsm_dir + + +@pytest.fixture +def so_pin() -> str: + return get_random_string(12) + + +@pytest.fixture +def pin() -> str: + return get_random_string(12) + + +@pytest.fixture +def softhsm_token(request: "SubRequest", lib, so_pin: str, pin: str) -> pkcs11.Token: + """Get a unique token for the current test.""" + request.getfixturevalue("softhsm_setup") + token = get_random_string(8) + + args = ( + "softhsm2-util", + "--init-token", + "--free", + "--label", + token, + "--so-pin", + so_pin, + "--pin", + pin, + ) + print("+", " ".join(args)) + subprocess.run(args, check=True) + + # Reinitialize library if already loaded (tokens are only seen after (re-)initialization). + lib.reinitialize() + + return lib.get_token(token_label=token) + + +@pytest.fixture +def softhsm_session(softhsm_token: pkcs11.Token, pin: str) -> Iterator[pkcs11.Session]: + session = softhsm_token.open(user_pin=pin) + yield session + session.close() + + +@pytest.fixture +def token(softhsm_token: pkcs11.Token) -> pkcs11.Token: + return softhsm_token + + +@pytest.fixture +def session( + request: "SubRequest", softhsm_session: pkcs11.Session, softhsm_token: pkcs11.Token +) -> pkcs11.Session: + # Skip test if session does not support required mechanisms + requirements = [mark.args[0] for mark in request.node.iter_markers(name="requires")] + if requirements: + unavailable = set(requirements) - softhsm_token.slot.get_mechanisms() + + if unavailable: + pytest.skip("Requires %s" % ", ".join(map(str, unavailable))) + + return softhsm_session diff --git a/tests/test_aes.py b/tests/test_aes.py index 3c7f2b0..f9ca94c 100644 --- a/tests/test_aes.py +++ b/tests/test_aes.py @@ -2,388 +2,397 @@ PKCS#11 AES Secret Keys """ -from parameterized import parameterized +import pytest import pkcs11 from pkcs11 import Mechanism -from . import FIXME, TestCase, requires - - -class AESTests(TestCase): - @requires(Mechanism.AES_KEY_GEN) - def setUp(self): - super().setUp() - self.key = self.session.generate_key(pkcs11.KeyType.AES, 128) - - @requires(Mechanism.AES_CBC_PAD) - def test_encrypt(self): - data = b"INPUT DATA" - iv = b"0" * 16 - - crypttext = self.key.encrypt(data, mechanism_param=iv) - self.assertIsInstance(crypttext, bytes) - self.assertNotEqual(data, crypttext) - # We should be aligned to the block size - self.assertEqual(len(crypttext), 16) - # Ensure we didn't just get 16 nulls - self.assertFalse(all(c == "\0" for c in crypttext)) - - text = self.key.decrypt(crypttext, mechanism_param=iv) - self.assertEqual(data, text) - - @requires(Mechanism.AES_CBC_PAD) - def test_encrypt_stream(self): - data = ( - b"I" * 16, - b"N" * 16, - b"P" * 16, - b"U" * 16, - b"T" * 10, # don't align to the blocksize - ) - iv = b"0" * 16 +pytestmark = [pytest.mark.requires(Mechanism.AES_KEY_GEN)] - cryptblocks = list(self.key.encrypt(data, mechanism_param=iv)) - self.assertEqual(len(cryptblocks), len(data) + 1) +@pytest.fixture +def key(session: pkcs11.Session) -> pkcs11.SecretKey: + return session.generate_key(pkcs11.KeyType.AES, 128) - crypttext = b"".join(cryptblocks) - self.assertNotEqual(b"".join(data), crypttext) - # We should be aligned to the block size - self.assertEqual(len(crypttext) % 16, 0) - # Ensure we didn't just get 16 nulls - self.assertFalse(all(c == "\0" for c in crypttext)) +@pytest.mark.requires(Mechanism.AES_CBC_PAD) +def test_encrypt(key: pkcs11.SecretKey) -> None: + data = b"INPUT DATA" + iv = b"0" * 16 - text = b"".join(self.key.decrypt(cryptblocks, mechanism_param=iv)) - self.assertEqual(b"".join(data), text) + crypttext = key.encrypt(data, mechanism_param=iv) + assert isinstance(crypttext, bytes) + assert data != crypttext + # We should be aligned to the block size + assert len(crypttext) == 16 + # Ensure we didn't just get 16 nulls + assert all(c == "\0" for c in crypttext) is False - @requires(Mechanism.AES_CBC_PAD) - def test_encrypt_whacky_sizes(self): - data = [(char * ord(char)).encode("utf-8") for char in "HELLO WORLD"] - iv = b"0" * 16 + text = key.decrypt(crypttext, mechanism_param=iv) + assert data == text - cryptblocks = list(self.key.encrypt(data, mechanism_param=iv)) - textblocks = list(self.key.decrypt(cryptblocks, mechanism_param=iv)) - self.assertEqual(b"".join(data), b"".join(textblocks)) +@pytest.mark.requires(Mechanism.AES_CBC_PAD) +def test_encrypt_stream(key: pkcs11.SecretKey): + data = ( + b"I" * 16, + b"N" * 16, + b"P" * 16, + b"U" * 16, + b"T" * 10, # don't align to the blocksize + ) + iv = b"0" * 16 - @requires(Mechanism.AES_CBC_PAD) - def test_encrypt_big_string(self): - data = b"HELLO WORLD" * 1024 + cryptblocks = list(key.encrypt(data, mechanism_param=iv)) - iv = self.session.generate_random(128) - crypttext = self.key.encrypt(data, mechanism_param=iv) - text = self.key.decrypt(crypttext, mechanism_param=iv) + assert len(cryptblocks) == len(data) + 1 - self.assertEqual(text, data) + crypttext = b"".join(cryptblocks) - @requires(Mechanism.AES_MAC) - def test_sign(self): - data = b"HELLO WORLD" + assert b"".join(data) != crypttext + # We should be aligned to the block size + assert len(crypttext) % 16 == 0 + # Ensure we didn't just get 16 nulls + assert all(c == "\0" for c in crypttext) is False - signature = self.key.sign(data) - self.assertIsNotNone(signature) - self.assertIsInstance(signature, bytes) - self.assertTrue(self.key.verify(data, signature)) - self.assertFalse(self.key.verify(data, b"1234")) + text = b"".join(key.decrypt(cryptblocks, mechanism_param=iv)) + assert b"".join(data) == text - @requires(Mechanism.AES_MAC) - def test_sign_stream(self): - data = ( - b"I" * 16, - b"N" * 16, - b"P" * 16, - b"U" * 16, - b"T" * 10, # don't align to the blocksize - ) - signature = self.key.sign(data) - self.assertIsNotNone(signature) - self.assertIsInstance(signature, bytes) - self.assertTrue(self.key.verify(data, signature)) +@pytest.mark.requires(Mechanism.AES_CBC_PAD) +def test_encrypt_whacky_sizes(key: pkcs11.SecretKey): + data = [(char * ord(char)).encode("utf-8") for char in "HELLO WORLD"] + iv = b"0" * 16 - @requires(Mechanism.AES_KEY_WRAP) - @FIXME.opencryptoki # can't set key attributes - def test_wrap(self): - key = self.session.generate_key( - pkcs11.KeyType.AES, - 128, - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - data = self.key.wrap_key(key) + cryptblocks = list(key.encrypt(data, mechanism_param=iv)) + textblocks = list(key.decrypt(cryptblocks, mechanism_param=iv)) + + assert b"".join(data) == b"".join(textblocks) + + +@pytest.mark.requires(Mechanism.AES_CBC_PAD) +def test_encrypt_big_string(session: pkcs11.Session, key: pkcs11.SecretKey): + data = b"HELLO WORLD" * 1024 + + iv = session.generate_random(128) + crypttext = key.encrypt(data, mechanism_param=iv) + text = key.decrypt(crypttext, mechanism_param=iv) + + assert text == data - key2 = self.key.unwrap_key( - pkcs11.ObjectClass.SECRET_KEY, - pkcs11.KeyType.AES, - data, - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - self.assertEqual(key[pkcs11.Attribute.VALUE], key2[pkcs11.Attribute.VALUE]) - - @parameterized.expand( - [ - ("POSITIVE_128_BIT", 128, 16, TestCase.assertIsNotNone), - ("POSITIVE_128_BIT_LONG_IV", 128, 32, TestCase.assertIsNotNone), - ("NEGATIVE_128_BIT_BAD_IV", 128, 15, TestCase.assertIsNone), - ("POSITIVE_256_BIT_LONG_IV", 256, 32, TestCase.assertIsNotNone), - ("NEGATIVE_256_BIT_SHORT_IV", 256, 16, TestCase.assertIsNone), - ("NEGATIVE_256_BIT_BAD_IV", 256, 31, TestCase.assertIsNone), - ] +@pytest.mark.requires(Mechanism.AES_MAC) +def test_sign(key: pkcs11.SecretKey): + data = b"HELLO WORLD" + + signature = key.sign(data) + assert isinstance(signature, bytes) + assert key.verify(data, signature) is True + assert key.verify(data, b"1234") is False + + +@pytest.mark.requires(Mechanism.AES_MAC) +def test_sign_stream(key: pkcs11.SecretKey): + data = ( + b"I" * 16, + b"N" * 16, + b"P" * 16, + b"U" * 16, + b"T" * 10, # don't align to the blocksize + ) + + signature = key.sign(data) + assert isinstance(signature, bytes) + assert key.verify(data, signature) + + +@pytest.mark.requires(Mechanism.AES_KEY_WRAP) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_wrap(session: pkcs11.Session, key: pkcs11.SecretKey): + key = session.generate_key( + pkcs11.KeyType.AES, + 128, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.SENSITIVE: False, + }, + ) + data = key.wrap_key(key) + + key2 = key.unwrap_key( + pkcs11.ObjectClass.SECRET_KEY, + pkcs11.KeyType.AES, + data, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.SENSITIVE: False, + }, ) - @requires(Mechanism.AES_ECB_ENCRYPT_DATA) - @FIXME.opencryptoki # can't set key attributes - def test_derive_using_ecb_encrypt(self, test_type, test_key_length, iv_length, assert_fn): - """Function to test AES Key Derivation using the ECB_ENCRYPT Mechanism. - - Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 - """ - - # Create the Master Key - capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] - capabilities |= pkcs11.MechanismFlag.DERIVE - key = self.session.generate_key( + + assert key[pkcs11.Attribute.VALUE] == key2[pkcs11.Attribute.VALUE] + + +@pytest.mark.parametrize( + ("test_key_length", "iv_length", "is_none"), + [ + (128, 16, False), + (128, 32, False), + (128, 15, True), + (256, 32, False), + (256, 16, True), + (256, 31, True), + ], +) +@pytest.mark.requires(Mechanism.AES_ECB_ENCRYPT_DATA) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_derive_using_ecb_encrypt( + session: pkcs11.Session, + key: pkcs11.SecretKey, + test_key_length: int, + iv_length: int, + is_none: bool, +): + """Function to test AES Key Derivation using the ECB_ENCRYPT Mechanism. + + Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 + """ + + # Create the Master Key + capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] + capabilities |= pkcs11.MechanismFlag.DERIVE + key = session.generate_key( + pkcs11.KeyType.AES, + key_length=test_key_length, + capabilities=capabilities, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.DERIVE: True, + pkcs11.Attribute.SENSITIVE: False, + }, + ) + + assert key is not None, "Failed to create {}-bit Master Key".format(test_key_length) + + # Derive a Key from the Master Key + iv = b"0" * iv_length + try: + derived_key = key.derive_key( pkcs11.KeyType.AES, key_length=test_key_length, capabilities=capabilities, + mechanism=Mechanism.AES_ECB_ENCRYPT_DATA, + mechanism_param=iv, template={ pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.DERIVE: True, pkcs11.Attribute.SENSITIVE: False, }, ) - - self.assertTrue( - key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - ) - - # Derive a Key from the Master Key - iv = b"0" * iv_length - try: - derived_key = key.derive_key( - pkcs11.KeyType.AES, - key_length=test_key_length, - capabilities=capabilities, - mechanism=Mechanism.AES_ECB_ENCRYPT_DATA, - mechanism_param=iv, - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - except (pkcs11.exceptions.MechanismParamInvalid, pkcs11.exceptions.FunctionFailed): - derived_key = None - - assert_fn(self, derived_key, "{}-bit Key Derivation Failure".format(test_key_length)) - - @parameterized.expand( - [ - ("POSITIVE_128_BIT", 128, 16), - ("POSITIVE_256_BIT_LONG_IV", 256, 32), - ] + except (pkcs11.exceptions.MechanismParamInvalid, pkcs11.exceptions.FunctionFailed): + derived_key = None + + if is_none: + assert derived_key is None, "{}-bit Key Derivation Failure".format(test_key_length) + else: + assert derived_key is not None, "{}-bit Key Derivation Failure".format(test_key_length) + + +@pytest.mark.parametrize(("test_key_length", "iv_length"), [(128, 16), (256, 32)]) +@pytest.mark.requires(Mechanism.AES_ECB_ENCRYPT_DATA) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_encrypt_with_key_derived_using_ecb_encrypt( + session: pkcs11.Session, key: pkcs11.SecretKey, test_key_length: int, iv_length: int +) -> None: + """Function to test Data Encryption/Decryption using a Derived AES Key. + + Function to test Data Encryption/Decryption using an AES Key + Derived by the ECB_ENCRYPT Mechanism. + + Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 + """ + + # Create the Master Key + capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] + capabilities |= pkcs11.MechanismFlag.DERIVE + key = session.generate_key( + pkcs11.KeyType.AES, + key_length=test_key_length, + capabilities=capabilities, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.DERIVE: True, + pkcs11.Attribute.SENSITIVE: False, + }, ) - @requires(Mechanism.AES_ECB_ENCRYPT_DATA) - @FIXME.opencryptoki # can't set key attributes - def test_encrypt_with_key_derived_using_ecb_encrypt( - self, test_type, test_key_length, iv_length - ): - """Function to test Data Encryption/Decryption using a Derived AES Key. - - Function to test Data Encryption/Decryption using an AES Key - Derived by the ECB_ENCRYPT Mechanism. - Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 - """ + assert key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - # Create the Master Key - capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] - capabilities |= pkcs11.MechanismFlag.DERIVE - key = self.session.generate_key( + # Derive a Key from the Master Key + iv = b"0" * iv_length + try: + derived_key = key.derive_key( pkcs11.KeyType.AES, key_length=test_key_length, capabilities=capabilities, + mechanism=Mechanism.AES_ECB_ENCRYPT_DATA, + mechanism_param=iv, template={ pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.DERIVE: True, pkcs11.Attribute.SENSITIVE: False, }, ) - - self.assertTrue( - key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - ) - - # Derive a Key from the Master Key - iv = b"0" * iv_length - try: - derived_key = key.derive_key( - pkcs11.KeyType.AES, - key_length=test_key_length, - capabilities=capabilities, - mechanism=Mechanism.AES_ECB_ENCRYPT_DATA, - mechanism_param=iv, - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - except (pkcs11.exceptions.MechanismParamInvalid, pkcs11.exceptions.FunctionFailed): - derived_key = None - - self.assertTrue( - derived_key is not None, "Failed to derive {}-bit Derived Key".format(test_key_length) - ) - - # Test capability of Key to Encrypt/Decrypt data - data = b"HELLO WORLD" * 1024 - - iv = self.session.generate_random(128) - crypttext = self.key.encrypt(data, mechanism_param=iv) - text = self.key.decrypt(crypttext, mechanism_param=iv) - - self.assertEqual(text, data) - - @parameterized.expand( - [ - ("POSITIVE_128_BIT", 128, 16, 16, TestCase.assertIsNotNone), - ("POSITIVE_128_BIT_LONG_DATA", 128, 16, 64, TestCase.assertIsNotNone), - ("NEGATIVE_128_BIT_BAD_IV", 128, 15, 16, TestCase.assertIsNone), - ("NEGATIVE_128_BIT_BAD_DATA", 128, 16, 31, TestCase.assertIsNone), - ("POSITIVE_256_BIT", 256, 16, 32, TestCase.assertIsNotNone), - ("POSITIVE_256_BIT_LONG_DATA", 256, 16, 64, TestCase.assertIsNotNone), - ("NEGATIVE_256_BIT_BAD_IV", 256, 15, 16, TestCase.assertIsNone), - ("NEGATIVE_256_BIT_BAD_DATA", 256, 16, 31, TestCase.assertIsNone), - ("NEGATIVE_256_BIT_SHORT_DATA", 256, 16, 16, TestCase.assertIsNone), - ] + except (pkcs11.exceptions.MechanismParamInvalid, pkcs11.exceptions.FunctionFailed): + derived_key = None + + assert derived_key is not None, "Failed to derive {}-bit Derived Key".format(test_key_length) + + # Test capability of Key to Encrypt/Decrypt data + data = b"HELLO WORLD" * 1024 + + iv = session.generate_random(128) + crypttext = key.encrypt(data, mechanism_param=iv) + text = key.decrypt(crypttext, mechanism_param=iv) + + assert text == data + + +@pytest.mark.parametrize( + ("test_key_length", "iv_length", "data_length", "is_none"), + [ + (128, 16, 16, False), + (128, 16, 64, False), + (128, 15, 16, True), + (128, 16, 31, True), + (256, 16, 32, False), + (256, 16, 64, False), + (256, 15, 16, True), + (256, 16, 31, True), + (256, 16, 16, True), + ], +) +@pytest.mark.requires(Mechanism.AES_CBC_ENCRYPT_DATA) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_derive_using_cbc_encrypt( + session: pkcs11.Session, + key: pkcs11.SecretKey, + test_key_length: int, + iv_length: int, + data_length: int, + is_none: bool, +): + """Function to test AES Key Derivation using the CBC_ENCRYPT Mechanism. + + Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 + """ + + # Create the Master Key + capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] + capabilities |= pkcs11.MechanismFlag.DERIVE + key = session.generate_key( + pkcs11.KeyType.AES, + key_length=test_key_length, + capabilities=capabilities, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.DERIVE: True, + pkcs11.Attribute.SENSITIVE: False, + }, ) - @requires(Mechanism.AES_CBC_ENCRYPT_DATA) - @FIXME.opencryptoki # can't set key attributes - def test_derive_using_cbc_encrypt( - self, test_type, test_key_length, iv_length, data_length, assert_fn - ): - """Function to test AES Key Derivation using the CBC_ENCRYPT Mechanism. - Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 - """ + assert key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - # Create the Master Key - capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] - capabilities |= pkcs11.MechanismFlag.DERIVE - key = self.session.generate_key( + # Derive a Key from the Master Key + iv = b"0" * iv_length + data = b"1" * data_length + try: + derived_key = key.derive_key( pkcs11.KeyType.AES, key_length=test_key_length, capabilities=capabilities, + mechanism=Mechanism.AES_CBC_ENCRYPT_DATA, + mechanism_param=(iv, data), template={ pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.DERIVE: True, pkcs11.Attribute.SENSITIVE: False, }, ) - - self.assertTrue( - key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - ) - - # Derive a Key from the Master Key - iv = b"0" * iv_length - data = b"1" * data_length - try: - derived_key = key.derive_key( - pkcs11.KeyType.AES, - key_length=test_key_length, - capabilities=capabilities, - mechanism=Mechanism.AES_CBC_ENCRYPT_DATA, - mechanism_param=(iv, data), - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - except ( - pkcs11.exceptions.MechanismParamInvalid, - pkcs11.exceptions.FunctionFailed, - IndexError, - ): - derived_key = None - - assert_fn(self, derived_key, "{}-bit Key Derivation Failure".format(test_key_length)) - - @parameterized.expand( - [ - ("POSITIVE_128_BIT", 128, 16, 16), - ("POSITIVE_256_BIT", 256, 16, 32), - ("POSITIVE_256_BIT_LONG_DATA", 256, 16, 64), - ] - ) - @requires(Mechanism.AES_CBC_ENCRYPT_DATA) - @FIXME.opencryptoki # can't set key attributes - def test_encrypt_with_key_derived_using_cbc_encrypt( - self, test_type, test_key_length, iv_length, data_length + except ( + pkcs11.exceptions.MechanismParamInvalid, + pkcs11.exceptions.FunctionFailed, + IndexError, ): - """Function to test Data Encryption/Decryption using a Derived AES Key. - - Function to test Data Encryption/Decryption using an AES Key - Derived by the CBC_ENCRYPT Mechanism. + derived_key = None + + if is_none: + assert derived_key is None, "{}-bit Key Derivation Failure".format(test_key_length) + else: + assert derived_key is not None + + +@pytest.mark.parametrize( + ("test_key_length", "iv_length", "data_length"), [(128, 16, 16), (256, 16, 32), (256, 16, 64)] +) +@pytest.mark.requires(Mechanism.AES_CBC_ENCRYPT_DATA) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_encrypt_with_key_derived_using_cbc_encrypt( + session: pkcs11.Session, + key: pkcs11.SecretKey, + test_key_length: int, + iv_length: int, + data_length: int, +): + """Function to test Data Encryption/Decryption using a Derived AES Key. + + Function to test Data Encryption/Decryption using an AES Key + Derived by the CBC_ENCRYPT Mechanism. + + Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 + """ + + # Create the Master Key + capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] + capabilities |= pkcs11.MechanismFlag.DERIVE + key = session.generate_key( + pkcs11.KeyType.AES, + key_length=test_key_length, + capabilities=capabilities, + template={ + pkcs11.Attribute.EXTRACTABLE: True, + pkcs11.Attribute.DERIVE: True, + pkcs11.Attribute.SENSITIVE: False, + }, + ) - Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 - """ + assert key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - # Create the Master Key - capabilities = pkcs11.defaults.DEFAULT_KEY_CAPABILITIES[pkcs11.KeyType.AES] - capabilities |= pkcs11.MechanismFlag.DERIVE - key = self.session.generate_key( + # Derive a Key from the Master Key + iv = b"0" * iv_length + data = b"1" * data_length + try: + derived_key = key.derive_key( pkcs11.KeyType.AES, key_length=test_key_length, capabilities=capabilities, + mechanism=Mechanism.AES_CBC_ENCRYPT_DATA, + mechanism_param=(iv, data), template={ pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.DERIVE: True, pkcs11.Attribute.SENSITIVE: False, }, ) + except ( + pkcs11.exceptions.MechanismParamInvalid, + pkcs11.exceptions.FunctionFailed, + IndexError, + ): + derived_key = None - self.assertTrue( - key is not None, "Failed to create {}-bit Master Key".format(test_key_length) - ) - - # Derive a Key from the Master Key - iv = b"0" * iv_length - data = b"1" * data_length - try: - derived_key = key.derive_key( - pkcs11.KeyType.AES, - key_length=test_key_length, - capabilities=capabilities, - mechanism=Mechanism.AES_CBC_ENCRYPT_DATA, - mechanism_param=(iv, data), - template={ - pkcs11.Attribute.EXTRACTABLE: True, - pkcs11.Attribute.SENSITIVE: False, - }, - ) - except ( - pkcs11.exceptions.MechanismParamInvalid, - pkcs11.exceptions.FunctionFailed, - IndexError, - ): - derived_key = None - - self.assertTrue( - derived_key is not None, "Failed to derive {}-bit Derived Key".format(test_key_length) - ) + assert derived_key is not None, "Failed to derive {}-bit Derived Key".format(test_key_length) - # Test capability of Key to Encrypt/Decrypt data - data = b"HELLO WORLD" * 1024 + # Test capability of Key to Encrypt/Decrypt data + data = b"HELLO WORLD" * 1024 - iv = self.session.generate_random(128) - crypttext = self.key.encrypt(data, mechanism_param=iv) - text = self.key.decrypt(crypttext, mechanism_param=iv) + iv = session.generate_random(128) + crypttext = key.encrypt(data, mechanism_param=iv) + text = key.decrypt(crypttext, mechanism_param=iv) - self.assertEqual(text, data) + assert text == data diff --git a/tests/test_des.py b/tests/test_des.py index 0c04436..40a8ace 100644 --- a/tests/test_des.py +++ b/tests/test_des.py @@ -2,39 +2,41 @@ PKCS#11 DES Secret Keys """ +import pytest + import pkcs11 from pkcs11 import KeyType, Mechanism -from . import TestCase, requires +@pytest.mark.requires(Mechanism.DES2_KEY_GEN) +def test_generate_des2_key(session: pkcs11.Session): + key = session.generate_key(KeyType.DES2) + assert isinstance(key, pkcs11.SecretKey) + + +@pytest.mark.requires(Mechanism.DES3_KEY_GEN) +def test_generate_des3_key(session: pkcs11.Session): + key = session.generate_key(KeyType.DES3) + assert isinstance(key, pkcs11.SecretKey) -class DESTests(TestCase): - @requires(Mechanism.DES2_KEY_GEN) - def test_generate_des2_key(self): - key = self.session.generate_key(KeyType.DES2) - self.assertIsInstance(key, pkcs11.SecretKey) - @requires(Mechanism.DES3_KEY_GEN) - def test_generate_des3_key(self): - key = self.session.generate_key(KeyType.DES3) - self.assertIsInstance(key, pkcs11.SecretKey) +@pytest.mark.requires(Mechanism.DES2_KEY_GEN, Mechanism.DES3_CBC_PAD) +def test_encrypt_des2(session: pkcs11.Session): + key = session.generate_key(KeyType.DES2) - @requires(Mechanism.DES2_KEY_GEN, Mechanism.DES3_CBC_PAD) - def test_encrypt_des2(self): - key = self.session.generate_key(KeyType.DES2) + iv = session.generate_random(64) + crypttext = key.encrypt("PLAIN TEXT_", mechanism_param=iv) + plaintext = key.decrypt(crypttext, mechanism_param=iv) - iv = self.session.generate_random(64) - crypttext = key.encrypt("PLAIN TEXT_", mechanism_param=iv) - plaintext = key.decrypt(crypttext, mechanism_param=iv) + assert plaintext == b"PLAIN TEXT_" - self.assertEqual(plaintext, b"PLAIN TEXT_") - @requires(Mechanism.DES3_KEY_GEN, Mechanism.DES3_CBC_PAD) - def test_encrypt_des3(self): - key = self.session.generate_key(KeyType.DES3) +@pytest.mark.requires(Mechanism.DES3_KEY_GEN, Mechanism.DES3_CBC_PAD) +def test_encrypt_des3(session: pkcs11.Session): + key = session.generate_key(KeyType.DES3) - iv = self.session.generate_random(64) - crypttext = key.encrypt("PLAIN TEXT_", mechanism_param=iv) - plaintext = key.decrypt(crypttext, mechanism_param=iv) + iv = session.generate_random(64) + crypttext = key.encrypt("PLAIN TEXT_", mechanism_param=iv) + plaintext = key.decrypt(crypttext, mechanism_param=iv) - self.assertEqual(plaintext, b"PLAIN TEXT_") + assert plaintext == b"PLAIN TEXT_" diff --git a/tests/test_dh.py b/tests/test_dh.py index 59060a5..1ed2035 100644 --- a/tests/test_dh.py +++ b/tests/test_dh.py @@ -4,6 +4,9 @@ import base64 +import pytest + +import pkcs11 from pkcs11 import Attribute, DomainParameters, KeyType, Mechanism from pkcs11.util.dh import ( decode_dh_domain_parameters, @@ -11,377 +14,371 @@ encode_dh_public_key, ) -from . import FIXME, TestCase, requires +@pytest.mark.requires(Mechanism.DH_PKCS_KEY_PAIR_GEN, Mechanism.DH_PKCS_DERIVE) +@pytest.mark.xfail_opencryptoki # AttributeValueInvalid when generating keypair +def test_derive_key(session: pkcs11.Session) -> None: + # Alice and Bob each create a Diffie-Hellman keypair from the + # publicly available DH parameters + # + # E.g. RFC 3526, RFC 5114 or openssl dhparam -C 2236 + prime = [ + 0x0F, + 0x52, + 0xE5, + 0x24, + 0xF5, + 0xFA, + 0x9D, + 0xDC, + 0xC6, + 0xAB, + 0xE6, + 0x04, + 0xE4, + 0x20, + 0x89, + 0x8A, + 0xB4, + 0xBF, + 0x27, + 0xB5, + 0x4A, + 0x95, + 0x57, + 0xA1, + 0x06, + 0xE7, + 0x30, + 0x73, + 0x83, + 0x5E, + 0xC9, + 0x23, + 0x11, + 0xED, + 0x42, + 0x45, + 0xAC, + 0x49, + 0xD3, + 0xE3, + 0xF3, + 0x34, + 0x73, + 0xC5, + 0x7D, + 0x00, + 0x3C, + 0x86, + 0x63, + 0x74, + 0xE0, + 0x75, + 0x97, + 0x84, + 0x1D, + 0x0B, + 0x11, + 0xDA, + 0x04, + 0xD0, + 0xFE, + 0x4F, + 0xB0, + 0x37, + 0xDF, + 0x57, + 0x22, + 0x2E, + 0x96, + 0x42, + 0xE0, + 0x7C, + 0xD7, + 0x5E, + 0x46, + 0x29, + 0xAF, + 0xB1, + 0xF4, + 0x81, + 0xAF, + 0xFC, + 0x9A, + 0xEF, + 0xFA, + 0x89, + 0x9E, + 0x0A, + 0xFB, + 0x16, + 0xE3, + 0x8F, + 0x01, + 0xA2, + 0xC8, + 0xDD, + 0xB4, + 0x47, + 0x12, + 0xF8, + 0x29, + 0x09, + 0x13, + 0x6E, + 0x9D, + 0xA8, + 0xF9, + 0x5D, + 0x08, + 0x00, + 0x3A, + 0x8C, + 0xA7, + 0xFF, + 0x6C, + 0xCF, + 0xE3, + 0x7C, + 0x3B, + 0x6B, + 0xB4, + 0x26, + 0xCC, + 0xDA, + 0x89, + 0x93, + 0x01, + 0x73, + 0xA8, + 0x55, + 0x3E, + 0x5B, + 0x77, + 0x25, + 0x8F, + 0x27, + 0xA3, + 0xF1, + 0xBF, + 0x7A, + 0x73, + 0x1F, + 0x85, + 0x96, + 0x0C, + 0x45, + 0x14, + 0xC1, + 0x06, + 0xB7, + 0x1C, + 0x75, + 0xAA, + 0x10, + 0xBC, + 0x86, + 0x98, + 0x75, + 0x44, + 0x70, + 0xD1, + 0x0F, + 0x20, + 0xF4, + 0xAC, + 0x4C, + 0xB3, + 0x88, + 0x16, + 0x1C, + 0x7E, + 0xA3, + 0x27, + 0xE4, + 0xAD, + 0xE1, + 0xA1, + 0x85, + 0x4F, + 0x1A, + 0x22, + 0x0D, + 0x05, + 0x42, + 0x73, + 0x69, + 0x45, + 0xC9, + 0x2F, + 0xF7, + 0xC2, + 0x48, + 0xE3, + 0xCE, + 0x9D, + 0x74, + 0x58, + 0x53, + 0xE7, + 0xA7, + 0x82, + 0x18, + 0xD9, + 0x3D, + 0xAF, + 0xAB, + 0x40, + 0x9F, + 0xAA, + 0x4C, + 0x78, + 0x0A, + 0xC3, + 0x24, + 0x2D, + 0xDB, + 0x12, + 0xA9, + 0x54, + 0xE5, + 0x47, + 0x87, + 0xAC, + 0x52, + 0xFE, + 0xE8, + 0x3D, + 0x0B, + 0x56, + 0xED, + 0x9C, + 0x9F, + 0xFF, + 0x39, + 0xE5, + 0xE5, + 0xBF, + 0x62, + 0x32, + 0x42, + 0x08, + 0xAE, + 0x6A, + 0xED, + 0x88, + 0x0E, + 0xB3, + 0x1A, + 0x4C, + 0xD3, + 0x08, + 0xE4, + 0xC4, + 0xAA, + 0x2C, + 0xCC, + 0xB1, + 0x37, + 0xA5, + 0xC1, + 0xA9, + 0x64, + 0x7E, + 0xEB, + 0xF9, + 0xD3, + 0xF5, + 0x15, + 0x28, + 0xFE, + 0x2E, + 0xE2, + 0x7F, + 0xFE, + 0xD9, + 0xB9, + 0x38, + 0x42, + 0x57, + 0x03, + ] + parameters = session.create_domain_parameters( + KeyType.DH, {Attribute.PRIME: prime, Attribute.BASE: [0x2]}, local=True + ) + + # Alice generate a keypair + alice_public, alice_private = parameters.generate_keypair() + alice_value = alice_public[Attribute.VALUE] -class DHTests(TestCase): - @requires(Mechanism.DH_PKCS_KEY_PAIR_GEN, Mechanism.DH_PKCS_DERIVE) - @FIXME.opencryptoki # AttributeValueInvalid when generating keypair - def test_derive_key(self): - # Alice and Bob each create a Diffie-Hellman keypair from the - # publicly available DH parameters - # - # E.g. RFC 3526, RFC 5114 or openssl dhparam -C 2236 - prime = [ - 0x0F, - 0x52, - 0xE5, - 0x24, - 0xF5, - 0xFA, - 0x9D, - 0xDC, - 0xC6, - 0xAB, - 0xE6, - 0x04, - 0xE4, - 0x20, - 0x89, - 0x8A, - 0xB4, - 0xBF, - 0x27, - 0xB5, - 0x4A, - 0x95, - 0x57, - 0xA1, - 0x06, - 0xE7, - 0x30, - 0x73, - 0x83, - 0x5E, - 0xC9, - 0x23, - 0x11, - 0xED, - 0x42, - 0x45, - 0xAC, - 0x49, - 0xD3, - 0xE3, - 0xF3, - 0x34, - 0x73, - 0xC5, - 0x7D, - 0x00, - 0x3C, - 0x86, - 0x63, - 0x74, - 0xE0, - 0x75, - 0x97, - 0x84, - 0x1D, - 0x0B, - 0x11, - 0xDA, - 0x04, - 0xD0, - 0xFE, - 0x4F, - 0xB0, - 0x37, - 0xDF, - 0x57, - 0x22, - 0x2E, - 0x96, - 0x42, - 0xE0, - 0x7C, - 0xD7, - 0x5E, - 0x46, - 0x29, - 0xAF, - 0xB1, - 0xF4, - 0x81, - 0xAF, - 0xFC, - 0x9A, - 0xEF, - 0xFA, - 0x89, - 0x9E, - 0x0A, - 0xFB, - 0x16, - 0xE3, - 0x8F, - 0x01, - 0xA2, - 0xC8, - 0xDD, - 0xB4, - 0x47, - 0x12, - 0xF8, - 0x29, - 0x09, - 0x13, - 0x6E, - 0x9D, - 0xA8, - 0xF9, - 0x5D, - 0x08, - 0x00, - 0x3A, - 0x8C, - 0xA7, - 0xFF, - 0x6C, - 0xCF, - 0xE3, - 0x7C, - 0x3B, - 0x6B, - 0xB4, - 0x26, - 0xCC, - 0xDA, - 0x89, - 0x93, - 0x01, - 0x73, - 0xA8, - 0x55, - 0x3E, - 0x5B, - 0x77, - 0x25, - 0x8F, - 0x27, - 0xA3, - 0xF1, - 0xBF, - 0x7A, - 0x73, - 0x1F, - 0x85, - 0x96, - 0x0C, - 0x45, - 0x14, - 0xC1, - 0x06, - 0xB7, - 0x1C, - 0x75, - 0xAA, - 0x10, - 0xBC, - 0x86, - 0x98, - 0x75, - 0x44, - 0x70, - 0xD1, - 0x0F, - 0x20, - 0xF4, - 0xAC, - 0x4C, - 0xB3, - 0x88, - 0x16, - 0x1C, - 0x7E, - 0xA3, - 0x27, - 0xE4, - 0xAD, - 0xE1, - 0xA1, - 0x85, - 0x4F, - 0x1A, - 0x22, - 0x0D, - 0x05, - 0x42, - 0x73, - 0x69, - 0x45, - 0xC9, - 0x2F, - 0xF7, - 0xC2, - 0x48, - 0xE3, - 0xCE, - 0x9D, - 0x74, - 0x58, - 0x53, - 0xE7, - 0xA7, - 0x82, - 0x18, - 0xD9, - 0x3D, - 0xAF, - 0xAB, - 0x40, - 0x9F, - 0xAA, - 0x4C, - 0x78, - 0x0A, - 0xC3, - 0x24, - 0x2D, - 0xDB, - 0x12, - 0xA9, - 0x54, - 0xE5, - 0x47, - 0x87, - 0xAC, - 0x52, - 0xFE, - 0xE8, - 0x3D, - 0x0B, - 0x56, - 0xED, - 0x9C, - 0x9F, - 0xFF, - 0x39, - 0xE5, - 0xE5, - 0xBF, - 0x62, - 0x32, - 0x42, - 0x08, - 0xAE, - 0x6A, - 0xED, - 0x88, - 0x0E, - 0xB3, - 0x1A, - 0x4C, - 0xD3, - 0x08, - 0xE4, - 0xC4, - 0xAA, - 0x2C, - 0xCC, - 0xB1, - 0x37, - 0xA5, - 0xC1, - 0xA9, - 0x64, - 0x7E, - 0xEB, - 0xF9, - 0xD3, - 0xF5, - 0x15, - 0x28, - 0xFE, - 0x2E, - 0xE2, - 0x7F, - 0xFE, - 0xD9, - 0xB9, - 0x38, - 0x42, - 0x57, - 0x03, - ] - parameters = self.session.create_domain_parameters( - KeyType.DH, - { - Attribute.PRIME: prime, - Attribute.BASE: [0x2], - }, - local=True, - ) + # Bob generates a keypair + bob_public, bob_private = parameters.generate_keypair() + bob_value = bob_public[Attribute.VALUE] - # Alice generate a keypair - alice_public, alice_private = parameters.generate_keypair() - alice_value = alice_public[Attribute.VALUE] + assert alice_value != bob_value - # Bob generates a keypair - bob_public, bob_private = parameters.generate_keypair() - bob_value = bob_public[Attribute.VALUE] + # Alice and Bob exchange values and an IV ... + iv = session.generate_random(128) - self.assertNotEqual(alice_value, bob_value) + alice_session = alice_private.derive_key( + KeyType.AES, + 128, + mechanism_param=bob_value, + template={ + Attribute.SENSITIVE: False, + Attribute.EXTRACTABLE: True, + }, + ) + bob_session = bob_private.derive_key( + KeyType.AES, + 128, + mechanism_param=alice_value, + template={ + Attribute.SENSITIVE: False, + Attribute.EXTRACTABLE: True, + }, + ) - # Alice and Bob exchange values and an IV ... - iv = self.session.generate_random(128) + assert alice_session[Attribute.VALUE] == bob_session[Attribute.VALUE] - alice_session = alice_private.derive_key( - KeyType.AES, - 128, - mechanism_param=bob_value, - template={ - Attribute.SENSITIVE: False, - Attribute.EXTRACTABLE: True, - }, - ) - bob_session = bob_private.derive_key( - KeyType.AES, - 128, - mechanism_param=alice_value, - template={ - Attribute.SENSITIVE: False, - Attribute.EXTRACTABLE: True, - }, - ) + crypttext = alice_session.encrypt("HI BOB!", mechanism_param=iv) + plaintext = bob_session.decrypt(crypttext, mechanism_param=iv) + assert plaintext == b"HI BOB!" - self.assertEqual(alice_session[Attribute.VALUE], bob_session[Attribute.VALUE]) - crypttext = alice_session.encrypt("HI BOB!", mechanism_param=iv) - plaintext = bob_session.decrypt(crypttext, mechanism_param=iv) - self.assertEqual(plaintext, b"HI BOB!") +def test_load_params(session: pkcs11.Session) -> None: + # This is RFC5114 #2 + PARAMS = base64.b64decode(""" + MIICKQKCAQEArRB+HpEjqdDWYPqnlVnFH6INZOVoO5/RtUsVl7YdCnXm+hQd+VpW + 26+aPEB7od8V6z1oijCcGA4d5rhaEnSgpm0/gVKtasISkDfJ7e/aTfjZHo/vVbc5 + S3rVt9C2wSIHyfmNEe002/bGugssi7wnvmoA4KC5xJcIs7+KMXCRiDaBKGEwvImF + 2xYC5xRBXZMwJ4Jzx94x79xzEPcSH9WgdBWYfZrcCkhtzfk6zEQyg4cxXXXhmMZB + pIDNhqG55YfovmDmnMkosrnFIXLkEwQumyPxCw4W55djybU9z0uoCinj+3PBa451 + uX7zY+L/ox9xz53lOE5xuBwKxN/+DBDmTwKCAQEArEAy708tmuOd8wtcj/2sUGze + vnuJmYyvdIZqCM/k/+OmgkpOELmm8N2SHwGnDEr6q3OddwDCn1LFfbF8YgqGUr5e + kAGo1mrXwXZpEBmZAkr00CcnWsE0i7inYtBSG8mK4kcVBCLqHtQJk51U2nRgzbX2 + xrJQcXy+8YDrNBGOmNEZUppF1vg0Vm4wJeMWozDvu3eobwwasVsFGuPUKMj4rLcK + gTcVC47rEOGD7dGZY93Z4mPkdwWJ72qiHn9fL/OBtTnM40CdE81Wavu0jWwBkYHh + vP6UswJp7f5y/ptqpL17Wg8ccc//TBnEGOH27AF5gbwIfypwZbOEuJDTGR8r+gId + AIAcDTTFjZP+mXF3EB+AU1pHOM68vziambNjces= + """) - def test_load_params(self): - # This is RFC5114 #2 - PARAMS = base64.b64decode(""" - MIICKQKCAQEArRB+HpEjqdDWYPqnlVnFH6INZOVoO5/RtUsVl7YdCnXm+hQd+VpW - 26+aPEB7od8V6z1oijCcGA4d5rhaEnSgpm0/gVKtasISkDfJ7e/aTfjZHo/vVbc5 - S3rVt9C2wSIHyfmNEe002/bGugssi7wnvmoA4KC5xJcIs7+KMXCRiDaBKGEwvImF - 2xYC5xRBXZMwJ4Jzx94x79xzEPcSH9WgdBWYfZrcCkhtzfk6zEQyg4cxXXXhmMZB - pIDNhqG55YfovmDmnMkosrnFIXLkEwQumyPxCw4W55djybU9z0uoCinj+3PBa451 - uX7zY+L/ox9xz53lOE5xuBwKxN/+DBDmTwKCAQEArEAy708tmuOd8wtcj/2sUGze - vnuJmYyvdIZqCM/k/+OmgkpOELmm8N2SHwGnDEr6q3OddwDCn1LFfbF8YgqGUr5e - kAGo1mrXwXZpEBmZAkr00CcnWsE0i7inYtBSG8mK4kcVBCLqHtQJk51U2nRgzbX2 - xrJQcXy+8YDrNBGOmNEZUppF1vg0Vm4wJeMWozDvu3eobwwasVsFGuPUKMj4rLcK - gTcVC47rEOGD7dGZY93Z4mPkdwWJ72qiHn9fL/OBtTnM40CdE81Wavu0jWwBkYHh - vP6UswJp7f5y/ptqpL17Wg8ccc//TBnEGOH27AF5gbwIfypwZbOEuJDTGR8r+gId - AIAcDTTFjZP+mXF3EB+AU1pHOM68vziambNjces= - """) + params = session.create_domain_parameters( + KeyType.DH, decode_dh_domain_parameters(PARAMS), local=True + ) + assert isinstance(params, DomainParameters) + assert params[Attribute.PRIME][:4] == b"\xad\x10\x7e\x1e" - params = self.session.create_domain_parameters( - KeyType.DH, decode_dh_domain_parameters(PARAMS), local=True - ) - self.assertIsInstance(params, DomainParameters) - self.assertEqual(params[Attribute.PRIME][:4], b"\xad\x10\x7e\x1e") - @requires(Mechanism.DH_PKCS_PARAMETER_GEN, Mechanism.DH_PKCS_KEY_PAIR_GEN) - def test_generate_params(self): - params = self.session.generate_domain_parameters(KeyType.DH, 512) - self.assertIsInstance(params, DomainParameters) - self.assertEqual(params[Attribute.PRIME_BITS], 512) - self.assertEqual(len(params[Attribute.PRIME]) * 8, 512) - encode_dh_domain_parameters(params) +@pytest.mark.requires(Mechanism.DH_PKCS_PARAMETER_GEN, Mechanism.DH_PKCS_KEY_PAIR_GEN) +def test_generate_params(session: pkcs11.Session) -> None: + params = session.generate_domain_parameters(KeyType.DH, 512) + assert isinstance(params, DomainParameters) + assert params[Attribute.PRIME_BITS] == 512 + assert len(params[Attribute.PRIME]) * 8 == 512 + encode_dh_domain_parameters(params) - # Test encoding the public key - public, _ = params.generate_keypair() - encode_dh_public_key(public) + # Test encoding the public key + public, _ = params.generate_keypair() + encode_dh_public_key(public) diff --git a/tests/test_digest.py b/tests/test_digest.py index a6dc1b8..2313fcb 100644 --- a/tests/test_digest.py +++ b/tests/test_digest.py @@ -4,72 +4,67 @@ import hashlib +import pytest + +import pkcs11 from pkcs11 import Attribute, KeyType, Mechanism +from tests.conftest import IS_NFAST + + +@pytest.mark.requires(Mechanism.SHA256) +def test_digest(session: pkcs11.Session) -> None: + data = "THIS IS SOME DATA TO DIGEST" + digest = session.digest(data, mechanism=Mechanism.SHA256) + + assert digest == hashlib.sha256(data.encode("utf-8")).digest() + + +@pytest.mark.requires(Mechanism.SHA256) +def test_digest_generator(session: pkcs11.Session) -> None: + data = (b"This is ", b"some data ", b"to digest.") + + digest = session.digest(data, mechanism=Mechanism.SHA256) + + m = hashlib.sha256() + for d in data: + m.update(d) + + assert digest == m.digest() + + +@pytest.mark.requires(Mechanism.AES_KEY_GEN, Mechanism.SHA256) +@pytest.mark.skipif(IS_NFAST, reason="nFast can't digest keys") +def test_digest_key(session: pkcs11.Session) -> None: + key = session.generate_key( + KeyType.AES, 128, template={Attribute.SENSITIVE: False, Attribute.EXTRACTABLE: True} + ) + + digest = session.digest(key, mechanism=Mechanism.SHA256) + + assert digest == hashlib.sha256(key[Attribute.VALUE]).digest() + + +@pytest.mark.requires(Mechanism.AES_KEY_GEN, Mechanism.SHA256) +@pytest.mark.skipif(IS_NFAST, reason="nFast can't digest keys") +def test_digest_key_data(session: pkcs11.Session) -> None: + key = session.generate_key( + KeyType.AES, + 128, + template={ + Attribute.SENSITIVE: False, + Attribute.EXTRACTABLE: True, + }, + ) + + data = ( + b"Some data", + key, + ) + + digest = session.digest(data, mechanism=Mechanism.SHA256) + + m = hashlib.sha256() + m.update(data[0]) + m.update(data[1][Attribute.VALUE]) -from . import Not, TestCase, requires - - -class DigestTests(TestCase): - @requires(Mechanism.SHA256) - def test_digest(self): - data = "THIS IS SOME DATA TO DIGEST" - digest = self.session.digest(data, mechanism=Mechanism.SHA256) - - self.assertEqual(digest, hashlib.sha256(data.encode("utf-8")).digest()) - - @requires(Mechanism.SHA256) - def test_digest_generator(self): - data = ( - b"This is ", - b"some data ", - b"to digest.", - ) - - digest = self.session.digest(data, mechanism=Mechanism.SHA256) - - m = hashlib.sha256() - for d in data: - m.update(d) - - self.assertEqual(digest, m.digest()) - - @requires(Mechanism.AES_KEY_GEN, Mechanism.SHA256) - @Not.nfast # nFast can't digest keys - def test_digest_key(self): - key = self.session.generate_key( - KeyType.AES, - 128, - template={ - Attribute.SENSITIVE: False, - Attribute.EXTRACTABLE: True, - }, - ) - - digest = self.session.digest(key, mechanism=Mechanism.SHA256) - - self.assertEqual(digest, hashlib.sha256(key[Attribute.VALUE]).digest()) - - @requires(Mechanism.AES_KEY_GEN, Mechanism.SHA256) - @Not.nfast # nFast can't digest keys - def test_digest_key_data(self): - key = self.session.generate_key( - KeyType.AES, - 128, - template={ - Attribute.SENSITIVE: False, - Attribute.EXTRACTABLE: True, - }, - ) - - data = ( - b"Some data", - key, - ) - - digest = self.session.digest(data, mechanism=Mechanism.SHA256) - - m = hashlib.sha256() - m.update(data[0]) - m.update(data[1][Attribute.VALUE]) - - self.assertEqual(digest, m.digest()) + assert digest == m.digest() diff --git a/tests/test_dsa.py b/tests/test_dsa.py index 4a927d5..456ada1 100644 --- a/tests/test_dsa.py +++ b/tests/test_dsa.py @@ -4,14 +4,11 @@ import base64 +import pytest + import pkcs11 from pkcs11 import Attribute, KeyType, Mechanism -from pkcs11.util.dsa import ( - decode_dsa_domain_parameters, - encode_dsa_domain_parameters, -) - -from . import FIXME, TestCase, requires +from pkcs11.util.dsa import decode_dsa_domain_parameters, encode_dsa_domain_parameters DHPARAMS = base64.b64decode(""" MIIBHwKBgQD8jXSat2sk+j0plaMn51AVYBWEyWee3ui3llRUckVceDILsjVdBs1tXCDhU7WC+VZZ @@ -23,33 +20,34 @@ """) -class DSATests(TestCase): - @requires(Mechanism.DSA_PARAMETER_GEN) - @FIXME.nfast # returns Function Failed - def test_generate_params(self): - parameters = self.session.generate_domain_parameters(KeyType.DSA, 1024) - self.assertIsInstance(parameters, pkcs11.DomainParameters) - self.assertEqual(parameters[Attribute.PRIME_BITS], 1024) - - encode_dsa_domain_parameters(parameters) - - @requires(Mechanism.DSA_KEY_PAIR_GEN, Mechanism.DSA_SHA1) - def test_generate_keypair_and_sign(self): - dhparams = self.session.create_domain_parameters( - KeyType.DSA, decode_dsa_domain_parameters(DHPARAMS), local=True - ) - - public, private = dhparams.generate_keypair() - self.assertIsInstance(public, pkcs11.PublicKey) - self.assertIsInstance(private, pkcs11.PrivateKey) - self.assertEqual(len(public[Attribute.VALUE]), 1024 // 8) - - data = "Message to sign" - signature = private.sign(data, mechanism=Mechanism.DSA_SHA1) - self.assertTrue(public.verify(data, signature, mechanism=Mechanism.DSA_SHA1)) - - @requires(Mechanism.DSA_PARAMETER_GEN, Mechanism.DSA_KEY_PAIR_GEN) - @FIXME.nfast # returns Function Failed - def test_generate_keypair_directly(self): - public, private = self.session.generate_keypair(KeyType.DSA, 1024) - self.assertEqual(len(public[Attribute.VALUE]), 1024 // 8) +@pytest.mark.requires(Mechanism.DSA_PARAMETER_GEN) +@pytest.mark.xfail_nfast +def test_generate_params(session: pkcs11.Session) -> None: + parameters = session.generate_domain_parameters(KeyType.DSA, 1024) + assert isinstance(parameters, pkcs11.DomainParameters) + assert parameters[Attribute.PRIME_BITS] == 1024 + + encode_dsa_domain_parameters(parameters) + + +@pytest.mark.requires(Mechanism.DSA_KEY_PAIR_GEN, Mechanism.DSA_SHA1) +def test_generate_keypair_and_sign(session: pkcs11.Session): + dhparams = session.create_domain_parameters( + KeyType.DSA, decode_dsa_domain_parameters(DHPARAMS), local=True + ) + + public, private = dhparams.generate_keypair() + assert isinstance(public, pkcs11.PublicKey) + assert isinstance(private, pkcs11.PrivateKey) + assert len(public[Attribute.VALUE]) == 1024 // 8 + + data = "Message to sign" + signature = private.sign(data, mechanism=Mechanism.DSA_SHA1) + assert public.verify(data, signature, mechanism=Mechanism.DSA_SHA1) is True + + +@pytest.mark.xfail_nfast +@pytest.mark.requires(Mechanism.DSA_PARAMETER_GEN, Mechanism.DSA_KEY_PAIR_GEN) +def test_generate_keypair_directly(session: pkcs11.Session): + public, private = session.generate_keypair(KeyType.DSA, 1024) + assert len(public[Attribute.VALUE]) == 1024 // 8 diff --git a/tests/test_ecc.py b/tests/test_ecc.py index 39c7f29..10394b0 100644 --- a/tests/test_ecc.py +++ b/tests/test_ecc.py @@ -4,6 +4,8 @@ import base64 +import pytest + import pkcs11 from pkcs11 import KDF, Attribute, KeyType, Mechanism from pkcs11.util.ec import ( @@ -14,158 +16,156 @@ encode_named_curve_parameters, ) -from . import TestCase, requires - - -class ECCTests(TestCase): - @requires(Mechanism.EC_KEY_PAIR_GEN, Mechanism.ECDSA) - def test_sign_ecdsa(self): - parameters = self.session.create_domain_parameters( - KeyType.EC, - {Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1")}, - local=True, - ) - - pub, priv = parameters.generate_keypair() - - mechanism = Mechanism.ECDSA - data = b"HI BOB!" - ecdsa = priv.sign(data, mechanism=mechanism) - self.assertTrue(pub.verify(data, ecdsa, mechanism=mechanism)) - - @requires(Mechanism.EC_KEY_PAIR_GEN, Mechanism.ECDH1_DERIVE) - def test_derive_key(self): - # DER encoded EC params from OpenSSL - # openssl ecparam -out ec_param.der -name prime192v1 - ecparams = base64.b64decode(b"BggqhkjOPQMBAQ==") - - parameters = self.session.create_domain_parameters( - KeyType.EC, - { - Attribute.EC_PARAMS: ecparams, - }, - local=True, - ) - alice_pub, alice_priv = parameters.generate_keypair() - alice_value = alice_pub[Attribute.EC_POINT] - - bob_pub, bob_priv = parameters.generate_keypair() - bob_value = bob_pub[Attribute.EC_POINT] - - self.assertNotEqual(alice_value, bob_value) - - alice_session = alice_priv.derive_key( - KeyType.AES, 128, mechanism_param=(KDF.NULL, None, bob_value) - ) - - bob_session = bob_priv.derive_key( - KeyType.AES, 128, mechanism_param=(KDF.NULL, None, alice_value) - ) - - iv = self.session.generate_random(128) - crypttext = alice_session.encrypt("HI BOB!", mechanism_param=iv) - plaintext = bob_session.decrypt(crypttext, mechanism_param=iv) - self.assertEqual(plaintext, b"HI BOB!") - - @requires(Mechanism.ECDSA) - def test_import_key_params(self): - der = base64.b64decode(""" - MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// - //////////////////////////////////////////////////////////////// - /////////zCBngRCAf////////////////////////////////////////////// - ///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA - 7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ - AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk - gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY - OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh - NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// - ////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBMLgt - gTFBGr0f7YrWwZsCPpLxaUQvUKvz2C6ghiFmxc2EzBgxDY+ywnmG4T++EVZhJHTP - eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp - vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= - """) - key = self.session.create_object(decode_ec_public_key(der)) - self.assertIsInstance(key, pkcs11.PublicKey) - - # We should get back to identity - self.assertEqual(encode_ec_public_key(key), der) - - @requires(Mechanism.ECDSA_SHA1) - def test_import_key_named_curve(self): - der = base64.b64decode(""" - MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEa6Q5Hs+j71J1lc+VziafH+uL6603 - R8gTAphQD0iLG9Q9RgAvDQdFFpzkvXI+mEGVNRMmT/BA1OtficHcAXTdXA== - """) - key = self.session.create_object(decode_ec_public_key(der)) - self.assertIsInstance(key, pkcs11.PublicKey) - - # Something signed with OpenSSL - signature = base64.b64decode(""" - MEYCIQD1nDlli+uLuGX3eobKJe7PsRYkYJ4F15bjqbbB+MHewwIhAPGFRwyuFOvH - zuj+sxXwk1CsDWN7AXbmHufOlOarXpiq - """) - signature = decode_ecdsa_signature(signature) - - self.assertTrue(key.verify(b"Data to sign", signature, mechanism=Mechanism.ECDSA_SHA1)) - - # We should get back to identity - self.assertEqual(encode_ec_public_key(key), der) - - @requires(Mechanism.ECDSA) - def test_import_key_pair(self): - priv = base64.b64decode(""" - MIICnAIBAQRB9JsyE7khj/d2jm5RkE9T2DKgr/y3gn4Ju+8oWfdIpurNKM4hh3Oo - 0T+ilc0BEy/SfJ5iqUxU5TocdFRpOUzfUIKgggHGMIIBwgIBATBNBgcqhkjOPQEB - AkIB//////////////////////////////////////////////////////////// - //////////////////////////8wgZ4EQgH///////////////////////////// - /////////////////////////////////////////////////////////ARBUZU+ - uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8QnhVhk5Uex+k3sWUsC9O7G/BzVz - 34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5MoSqoNpkugSBhQQAxoWOBrcE - BOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUted+/nWSj+HcEnov+o3jNIs8GF - akKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY9URJV5tEaBevvRcnPmYsl+5y - mV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlACQgH///////////////////// - //////////////////////pRhoeDvy+Wa3/MAUj3CaXQO7XJuImcR667b7cekThk - CQIBAaGBiQOBhgAEATC4LYExQRq9H+2K1sGbAj6S8WlEL1Cr89guoIYhZsXNhMwY - MQ2PssJ5huE/vhFWYSR0z3iDp1UXB114r5EXvmDEAWx/32cqnwnuNbyJd/W8IapY - vN/QAI/1qMV2bopaSmlwabxm8dt/NFCIa3nNYxYyLTjoP16fXTnnI0GSu2dMFatV - """) - priv = self.session.create_object(decode_ec_private_key(priv)) - - pub = base64.b64decode(""" - MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// - //////////////////////////////////////////////////////////////// - /////////zCBngRCAf////////////////////////////////////////////// - ///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA - 7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ - AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk - gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY - OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh - NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// - ////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBMLgt - gTFBGr0f7YrWwZsCPpLxaUQvUKvz2C6ghiFmxc2EzBgxDY+ywnmG4T++EVZhJHTP - eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp - vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= - """) - pub = self.session.create_object(decode_ec_public_key(pub)) - - signature = priv.sign(b"Example", mechanism=Mechanism.ECDSA) - self.assertTrue(pub.verify(b"Example", signature, mechanism=Mechanism.ECDSA)) - - @requires(Mechanism.EC_EDWARDS_KEY_PAIR_GEN, Mechanism.EDDSA) - def test_sign_eddsa(self): - parameters = self.session.create_domain_parameters( - KeyType.EC_EDWARDS, - { - # use "Ed25519" once https://github.com/wbond/asn1crypto/pull/134 - # is merged - Attribute.EC_PARAMS: encode_named_curve_parameters("1.3.101.112") - }, - local=True, - ) - - pub, priv = parameters.generate_keypair() - - mechanism = Mechanism.EDDSA - data = b"HI BOB!" - eddsa = priv.sign(data, mechanism=mechanism) - self.assertTrue(pub.verify(data, eddsa, mechanism=mechanism)) + +@pytest.mark.requires(Mechanism.EC_KEY_PAIR_GEN, Mechanism.ECDSA) +def test_sign_ecdsa(session: pkcs11.Session) -> None: + parameters = session.create_domain_parameters( + KeyType.EC, + {Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1")}, + local=True, + ) + + pub, priv = parameters.generate_keypair() + + mechanism = Mechanism.ECDSA + data = b"HI BOB!" + ecdsa = priv.sign(data, mechanism=mechanism) + assert pub.verify(data, ecdsa, mechanism=mechanism) + + +@pytest.mark.requires(Mechanism.EC_KEY_PAIR_GEN, Mechanism.ECDH1_DERIVE) +def test_derive_key(session: pkcs11.Session) -> None: + # DER encoded EC params from OpenSSL + # openssl ecparam -out ec_param.der -name prime192v1 + ecparams = base64.b64decode(b"BggqhkjOPQMBAQ==") + + parameters = session.create_domain_parameters( + KeyType.EC, {Attribute.EC_PARAMS: ecparams}, local=True + ) + alice_pub, alice_priv = parameters.generate_keypair() + alice_value = alice_pub[Attribute.EC_POINT] + + bob_pub, bob_priv = parameters.generate_keypair() + bob_value = bob_pub[Attribute.EC_POINT] + + assert alice_value != bob_value + + alice_session = alice_priv.derive_key( + KeyType.AES, 128, mechanism_param=(KDF.NULL, None, bob_value) + ) + + bob_session = bob_priv.derive_key( + KeyType.AES, 128, mechanism_param=(KDF.NULL, None, alice_value) + ) + + iv = session.generate_random(128) + crypttext = alice_session.encrypt("HI BOB!", mechanism_param=iv) + plaintext = bob_session.decrypt(crypttext, mechanism_param=iv) + assert plaintext == b"HI BOB!" + + +@pytest.mark.requires(Mechanism.ECDSA) +def test_import_key_params(session: pkcs11.Session) -> None: + der = base64.b64decode(""" + MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// + //////////////////////////////////////////////////////////////// + /////////zCBngRCAf////////////////////////////////////////////// + ///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA + 7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ + AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk + gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY + OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh + NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// + ////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBMLgt + gTFBGr0f7YrWwZsCPpLxaUQvUKvz2C6ghiFmxc2EzBgxDY+ywnmG4T++EVZhJHTP + eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp + vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= + """) + key = session.create_object(decode_ec_public_key(der)) + assert isinstance(key, pkcs11.PublicKey) + + # We should get back to identity + assert encode_ec_public_key(key) == der + + +@pytest.mark.requires(Mechanism.ECDSA_SHA1) +def test_import_key_named_curve(session: pkcs11.Session) -> None: + der = base64.b64decode(""" + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEa6Q5Hs+j71J1lc+VziafH+uL6603 + R8gTAphQD0iLG9Q9RgAvDQdFFpzkvXI+mEGVNRMmT/BA1OtficHcAXTdXA== + """) + key = session.create_object(decode_ec_public_key(der)) + assert isinstance(key, pkcs11.PublicKey) + + # Something signed with OpenSSL + signature = base64.b64decode(""" + MEYCIQD1nDlli+uLuGX3eobKJe7PsRYkYJ4F15bjqbbB+MHewwIhAPGFRwyuFOvH + zuj+sxXwk1CsDWN7AXbmHufOlOarXpiq + """) + signature = decode_ecdsa_signature(signature) + + assert key.verify(b"Data to sign", signature, mechanism=Mechanism.ECDSA_SHA1) + + # We should get back to identity + assert encode_ec_public_key(key) == der + + +@pytest.mark.requires(Mechanism.ECDSA) +def test_import_key_pair(session: pkcs11.Session) -> None: + priv = base64.b64decode(""" + MIICnAIBAQRB9JsyE7khj/d2jm5RkE9T2DKgr/y3gn4Ju+8oWfdIpurNKM4hh3Oo + 0T+ilc0BEy/SfJ5iqUxU5TocdFRpOUzfUIKgggHGMIIBwgIBATBNBgcqhkjOPQEB + AkIB//////////////////////////////////////////////////////////// + //////////////////////////8wgZ4EQgH///////////////////////////// + /////////////////////////////////////////////////////////ARBUZU+ + uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8QnhVhk5Uex+k3sWUsC9O7G/BzVz + 34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5MoSqoNpkugSBhQQAxoWOBrcE + BOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUted+/nWSj+HcEnov+o3jNIs8GF + akKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY9URJV5tEaBevvRcnPmYsl+5y + mV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlACQgH///////////////////// + //////////////////////pRhoeDvy+Wa3/MAUj3CaXQO7XJuImcR667b7cekThk + CQIBAaGBiQOBhgAEATC4LYExQRq9H+2K1sGbAj6S8WlEL1Cr89guoIYhZsXNhMwY + MQ2PssJ5huE/vhFWYSR0z3iDp1UXB114r5EXvmDEAWx/32cqnwnuNbyJd/W8IapY + vN/QAI/1qMV2bopaSmlwabxm8dt/NFCIa3nNYxYyLTjoP16fXTnnI0GSu2dMFatV + """) + priv = session.create_object(decode_ec_private_key(priv)) + + pub = base64.b64decode(""" + MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// + //////////////////////////////////////////////////////////////// + /////////zCBngRCAf////////////////////////////////////////////// + ///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA + 7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ + AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk + gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY + OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh + NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// + ////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBMLgt + gTFBGr0f7YrWwZsCPpLxaUQvUKvz2C6ghiFmxc2EzBgxDY+ywnmG4T++EVZhJHTP + eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp + vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= + """) + pub = session.create_object(decode_ec_public_key(pub)) + + signature = priv.sign(b"Example", mechanism=Mechanism.ECDSA) + assert pub.verify(b"Example", signature, mechanism=Mechanism.ECDSA) + + +@pytest.mark.requires(Mechanism.EC_EDWARDS_KEY_PAIR_GEN, Mechanism.EDDSA) +def test_sign_eddsa(session: pkcs11.Session) -> None: + parameters = session.create_domain_parameters( + KeyType.EC_EDWARDS, + { + # use "Ed25519" once https://github.com/wbond/asn1crypto/pull/134 + # is merged + Attribute.EC_PARAMS: encode_named_curve_parameters("1.3.101.112") + }, + local=True, + ) + + pub, priv = parameters.generate_keypair() + + mechanism = Mechanism.EDDSA + data = b"HI BOB!" + eddsa = priv.sign(data, mechanism=mechanism) + assert pub.verify(data, eddsa, mechanism=mechanism) diff --git a/tests/test_iterators.py b/tests/test_iterators.py index 9456e90..f9676ea 100644 --- a/tests/test_iterators.py +++ b/tests/test_iterators.py @@ -2,53 +2,48 @@ Iterator tests """ -import unittest +import pytest import pkcs11 -from . import TestCase, requires +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) +def test_partial_decrypt(session: pkcs11.Session) -> None: + session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") -class IteratorTests(TestCase): - @requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) - def test_partial_decrypt(self): - self.session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") + key = session.get_key(label="LOOK ME UP") + data = (b"1234", b"1234") - key = self.session.get_key(label="LOOK ME UP") - data = ( - b"1234", - b"1234", - ) + iv = session.generate_random(128) + encrypted_data = list(key.encrypt(data, mechanism_param=iv)) - iv = self.session.generate_random(128) - encrypted_data = list(key.encrypt(data, mechanism_param=iv)) + iter1 = key.decrypt(encrypted_data, mechanism_param=iv) + next(iter1) - iter1 = key.decrypt(encrypted_data, mechanism_param=iv) - next(iter1) + iter2 = key.decrypt(encrypted_data, mechanism_param=iv) + with pytest.raises(pkcs11.OperationActive): + next(iter2) - with self.assertRaises(pkcs11.OperationActive): - iter2 = key.decrypt(encrypted_data, mechanism_param=iv) - next(iter2) - @requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) - # Ideally deleting iterator #1 would terminate the operation, but it - # currently does not. - @unittest.expectedFailure - def test_close_iterators(self): - self.session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) +# Ideally deleting iterator #1 would terminate the operation, but it +# currently does not. +@pytest.mark.xfail +def test_close_iterators(session: pkcs11.Session) -> None: + session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") - key = self.session.get_key(label="LOOK ME UP") - data = ( - b"1234", - b"1234", - ) + key = session.get_key(label="LOOK ME UP") + data = ( + b"1234", + b"1234", + ) - iv = self.session.generate_random(128) - encrypted_data = list(key.encrypt(data, mechanism_param=iv)) + iv = session.generate_random(128) + encrypted_data = list(key.encrypt(data, mechanism_param=iv)) - iter1 = key.decrypt(encrypted_data, mechanism_param=iv) - next(iter1) - del iter1 + iter1 = key.decrypt(encrypted_data, mechanism_param=iv) + next(iter1) + del iter1 - iter2 = key.decrypt(encrypted_data, mechanism_param=iv) - next(iter2) + iter2 = key.decrypt(encrypted_data, mechanism_param=iv) + next(iter2) diff --git a/tests/test_public_key_external.py b/tests/test_public_key_external.py index d211f53..c575a68 100644 --- a/tests/test_public_key_external.py +++ b/tests/test_public_key_external.py @@ -1,3 +1,6 @@ +import pytest + +import pkcs11 from pkcs11 import KDF, Attribute, KeyType, Mechanism, ObjectClass from pkcs11.util.ec import ( decode_ec_public_key, @@ -6,162 +9,163 @@ encode_named_curve_parameters, ) from pkcs11.util.rsa import encode_rsa_public_key - -from . import Is, TestCase, requires - - -class ExternalPublicKeyTests(TestCase): - @requires(Mechanism.RSA_PKCS) - def test_rsa(self): - # A key we generated earlier - self.session.generate_keypair(KeyType.RSA, 1024) - - pub = self.session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PUBLIC_KEY) - - pub = encode_rsa_public_key(pub) - - from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt - - pub = load_public_key(pub) - crypttext = rsa_pkcs1v15_encrypt(pub, b"Data to encrypt") - - priv = self.session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PRIVATE_KEY) - - plaintext = priv.decrypt(crypttext, mechanism=Mechanism.RSA_PKCS) - - self.assertEqual(plaintext, b"Data to encrypt") - - @requires(Mechanism.ECDSA_SHA1) - def test_ecdsa(self): - # A key we generated earlier - self.session.create_domain_parameters( - KeyType.EC, - { - Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1"), - }, - local=True, - ).generate_keypair() - - priv = self.session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PRIVATE_KEY) - - signature = priv.sign(b"Data to sign", mechanism=Mechanism.ECDSA_SHA1) - # Encode as ASN.1 for OpenSSL - signature = encode_ecdsa_signature(signature) - - from oscrypto.asymmetric import ecdsa_verify, load_public_key - - pub = self.session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PUBLIC_KEY) - pub = load_public_key(encode_ec_public_key(pub)) - - ecdsa_verify(pub, signature, b"Data to sign", "sha1") - - @requires(Mechanism.ECDH1_DERIVE) - def test_ecdh(self): - # A key we generated earlier - self.session.create_domain_parameters( - KeyType.EC, - { - Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1"), - }, - local=True, - ).generate_keypair() - - # Retrieve our keypair, with our public key encoded for interchange - alice_priv = self.session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PRIVATE_KEY) - alice_pub = self.session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PUBLIC_KEY) - alice_pub = encode_ec_public_key(alice_pub) - - from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives.asymmetric import ec - from cryptography.hazmat.primitives.serialization import ( - Encoding, - PublicFormat, - load_der_public_key, - ) - - # Bob generates a keypair, with their public key encoded for - # interchange - bob_priv = ec.generate_private_key(ec.SECP256R1(), default_backend()) - bob_pub = bob_priv.public_key().public_bytes( - Encoding.DER, - PublicFormat.SubjectPublicKeyInfo, - ) - - # Bob converts Alice's key to internal format and generates their - # shared key - bob_shared_key = bob_priv.exchange( - ec.ECDH(), - load_der_public_key(alice_pub, default_backend()), - ) - - key = alice_priv.derive_key( - KeyType.GENERIC_SECRET, - 256, - mechanism_param=( - KDF.NULL, - None, - # N.B. it seems like SoftHSMv2 requires an EC_POINT to be - # DER-encoded, which is not what the spec says - decode_ec_public_key(bob_pub, encode_ec_point=Is.softhsm2)[Attribute.EC_POINT], - ), - template={ - Attribute.SENSITIVE: False, - Attribute.EXTRACTABLE: True, - }, - ) - alice_shared_key = key[Attribute.VALUE] - - # We should have the same shared key - self.assertEqual(bob_shared_key, alice_shared_key) - - @requires(Mechanism.RSA_PKCS) - def test_terrible_hybrid_file_encryption_app(self): - # Proof of concept code only! - import io - - from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt - from oscrypto.symmetric import ( - aes_cbc_pkcs7_decrypt, - aes_cbc_pkcs7_encrypt, - ) - - # A key we generated earlier - self.session.generate_keypair(KeyType.RSA, 1024) - - pub = self.session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PUBLIC_KEY) - pub = load_public_key(encode_rsa_public_key(pub)) - - key = self.session.generate_random(256) - iv = self.session.generate_random(128) - - source = b"This is my amazing file" - - with io.BytesIO() as dest: - # Write a 128-byte header containing our key and our IV - # strictly speaking we don't need to keep the IV secure but - # we may as well. - # - # FIXME: Because this is RSA 1.5, we should fill the rest of the - # frame with nonsense - self.assertEqual(dest.write(rsa_pkcs1v15_encrypt(pub, key + iv)), 128) - _, ciphertext = aes_cbc_pkcs7_encrypt(key, source, iv) - dest.write(ciphertext) - - # Time passes - dest.seek(0) - - # Look up our private key - priv = self.session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PRIVATE_KEY) - # Read the header - header = dest.read(priv.key_length // 8) - header = priv.decrypt(header, mechanism=Mechanism.RSA_PKCS) - - # The first 32 bytes is our key - key, header = header[:32], header[32:] - # The next 16 bytes is the IV - iv = header[:16] - # We can ignore the rest - - plaintext = aes_cbc_pkcs7_decrypt(key, dest.read(), iv) - - self.assertEqual(source, plaintext) +from tests.conftest import IS_SOFTHSM + + +@pytest.mark.requires(Mechanism.RSA_PKCS) +def test_rsa(session: pkcs11.Session) -> None: + # A key we generated earlier + session.generate_keypair(KeyType.RSA, 1024) + + pub = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PUBLIC_KEY) + + pub = encode_rsa_public_key(pub) + + from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt + + pub = load_public_key(pub) + crypttext = rsa_pkcs1v15_encrypt(pub, b"Data to encrypt") + + priv = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PRIVATE_KEY) + + plaintext = priv.decrypt(crypttext, mechanism=Mechanism.RSA_PKCS) + + assert plaintext == b"Data to encrypt" + + +@pytest.mark.requires(Mechanism.ECDSA_SHA1) +def test_ecdsa(session: pkcs11.Session) -> None: + # A key we generated earlier + session.create_domain_parameters( + KeyType.EC, + { + Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1"), + }, + local=True, + ).generate_keypair() + + priv = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PRIVATE_KEY) + + signature = priv.sign(b"Data to sign", mechanism=Mechanism.ECDSA_SHA1) + # Encode as ASN.1 for OpenSSL + signature = encode_ecdsa_signature(signature) + + from oscrypto.asymmetric import ecdsa_verify, load_public_key + + pub = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PUBLIC_KEY) + pub = load_public_key(encode_ec_public_key(pub)) + + ecdsa_verify(pub, signature, b"Data to sign", "sha1") + + +@pytest.mark.requires(Mechanism.ECDH1_DERIVE) +def test_ecdh(session: pkcs11.Session) -> None: + # A key we generated earlier + session.create_domain_parameters( + KeyType.EC, + { + Attribute.EC_PARAMS: encode_named_curve_parameters("secp256r1"), + }, + local=True, + ).generate_keypair() + + # Retrieve our keypair, with our public key encoded for interchange + alice_priv = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PRIVATE_KEY) + alice_pub = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PUBLIC_KEY) + alice_pub = encode_ec_public_key(alice_pub) + + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + load_der_public_key, + ) + + # Bob generates a keypair, with their public key encoded for + # interchange + bob_priv = ec.generate_private_key(ec.SECP256R1(), default_backend()) + bob_pub = bob_priv.public_key().public_bytes( + Encoding.DER, + PublicFormat.SubjectPublicKeyInfo, + ) + + # Bob converts Alice's key to internal format and generates their + # shared key + bob_shared_key = bob_priv.exchange( + ec.ECDH(), + load_der_public_key(alice_pub, default_backend()), + ) + + key = alice_priv.derive_key( + KeyType.GENERIC_SECRET, + 256, + mechanism_param=( + KDF.NULL, + None, + # N.B. it seems like SoftHSMv2 requires an EC_POINT to be + # DER-encoded, which is not what the spec says + decode_ec_public_key(bob_pub, encode_ec_point=IS_SOFTHSM)[Attribute.EC_POINT], + ), + template={ + Attribute.SENSITIVE: False, + Attribute.EXTRACTABLE: True, + }, + ) + alice_shared_key = key[Attribute.VALUE] + + # We should have the same shared key + assert bob_shared_key == alice_shared_key + + +@pytest.mark.requires(Mechanism.RSA_PKCS) +def test_terrible_hybrid_file_encryption_app(session: pkcs11.Session) -> None: + # Proof of concept code only! + import io + + from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt + from oscrypto.symmetric import ( + aes_cbc_pkcs7_decrypt, + aes_cbc_pkcs7_encrypt, + ) + + # A key we generated earlier + session.generate_keypair(KeyType.RSA, 1024) + + pub = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PUBLIC_KEY) + pub = load_public_key(encode_rsa_public_key(pub)) + + key = session.generate_random(256) + iv = session.generate_random(128) + + source = b"This is my amazing file" + + with io.BytesIO() as dest: + # Write a 128-byte header containing our key and our IV + # strictly speaking we don't need to keep the IV secure but + # we may as well. + # + # FIXME: Because this is RSA 1.5, we should fill the rest of the + # frame with nonsense + assert dest.write(rsa_pkcs1v15_encrypt(pub, key + iv)) == 128 + _, ciphertext = aes_cbc_pkcs7_encrypt(key, source, iv) + dest.write(ciphertext) + + # Time passes + dest.seek(0) + + # Look up our private key + priv = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PRIVATE_KEY) + # Read the header + header = dest.read(priv.key_length // 8) + header = priv.decrypt(header, mechanism=Mechanism.RSA_PKCS) + + # The first 32 bytes is our key + key, header = header[:32], header[32:] + # The next 16 bytes is the IV + iv = header[:16] + # We can ignore the rest + + plaintext = aes_cbc_pkcs7_decrypt(key, dest.read(), iv) + + assert source == plaintext diff --git a/tests/test_rsa.py b/tests/test_rsa.py index f9e5343..05579cd 100644 --- a/tests/test_rsa.py +++ b/tests/test_rsa.py @@ -2,119 +2,134 @@ PKCS#11 RSA Public Key Cryptography """ +import pytest + import pkcs11 from pkcs11 import MGF, Attribute, KeyType, Mechanism, ObjectClass -from . import FIXME, TestCase, requires - - -class RSATests(TestCase): - @requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN) - def setUp(self): - super().setUp() - - self.public, self.private = self.session.generate_keypair(KeyType.RSA, 1024) - - @requires(Mechanism.RSA_PKCS) - def test_sign_pkcs_v15(self): - data = b"00000000" - - signature = self.private.sign(data, mechanism=Mechanism.RSA_PKCS) - self.assertIsNotNone(signature) - self.assertIsInstance(signature, bytes) - self.assertTrue(self.public.verify(data, signature, mechanism=Mechanism.RSA_PKCS)) - self.assertFalse(self.public.verify(data, b"1234", mechanism=Mechanism.RSA_PKCS)) - - @requires(Mechanism.SHA512_RSA_PKCS) - def test_sign_default(self): - data = b"HELLO WORLD" * 1024 - - signature = self.private.sign(data) - self.assertIsNotNone(signature) - self.assertIsInstance(signature, bytes) - self.assertTrue(self.public.verify(data, signature)) - self.assertFalse(self.public.verify(data, b"1234")) - - @requires(Mechanism.SHA512_RSA_PKCS) - def test_sign_stream(self): - data = ( - b"I" * 16, - b"N" * 16, - b"P" * 16, - b"U" * 16, - b"T" * 10, # don't align to the blocksize - ) - - signature = self.private.sign(data) - self.assertIsNotNone(signature) - self.assertIsInstance(signature, bytes) - self.assertTrue(self.public.verify(data, signature)) - - @requires(Mechanism.RSA_PKCS_OAEP) - @FIXME.opencryptoki # can't set key attributes - def test_key_wrap(self): - key = self.session.generate_key( - KeyType.AES, - 128, - template={ - Attribute.EXTRACTABLE: True, - Attribute.SENSITIVE: False, - }, - ) - - data = self.public.wrap_key(key) - self.assertNotEqual(data, key[Attribute.VALUE]) - - key2 = self.private.unwrap_key( - ObjectClass.SECRET_KEY, - KeyType.AES, - data, - template={ - Attribute.EXTRACTABLE: True, - Attribute.SENSITIVE: False, - }, - ) - - self.assertEqual(key[Attribute.VALUE], key2[Attribute.VALUE]) - - @requires(Mechanism.RSA_PKCS_OAEP) - def test_encrypt_oaep(self): - data = b"SOME DATA" - - crypttext = self.public.encrypt( - data, - mechanism=Mechanism.RSA_PKCS_OAEP, - mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None), - ) - - self.assertNotEqual(data, crypttext) - - plaintext = self.private.decrypt( - crypttext, - mechanism=Mechanism.RSA_PKCS_OAEP, - mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None), - ) - - self.assertEqual(data, plaintext) - - @requires(Mechanism.SHA1_RSA_PKCS_PSS) - def test_sign_pss(self): - data = b"SOME DATA" - - # These are the default params - signature = self.private.sign( - data, - mechanism=Mechanism.SHA1_RSA_PKCS_PSS, - mechanism_param=(Mechanism.SHA_1, MGF.SHA1, 20), - ) - - self.assertTrue(self.public.verify(data, signature, mechanism=Mechanism.SHA1_RSA_PKCS_PSS)) - - @requires(Mechanism.RSA_PKCS_OAEP) - def test_encrypt_too_much_data(self): - data = b"1234" * 128 - - # You can't encrypt lots of data with RSA - # This should ideally throw DataLen but you can't trust it - with self.assertRaises(pkcs11.PKCS11Error): - self.public.encrypt(data) +pytestmark = [pytest.mark.requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN)] + + +@pytest.fixture +def keypair(session: pkcs11.Session) -> tuple[pkcs11.PublicKey, pkcs11.PrivateKey]: + return session.generate_keypair(KeyType.RSA, 1024) + + +@pytest.mark.requires(Mechanism.RSA_PKCS) +def test_sign_pkcs_v15(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = b"00000000" + + signature = private_key.sign(data, mechanism=Mechanism.RSA_PKCS) + assert signature is not None + assert isinstance(signature, bytes) + assert public_key.verify(data, signature, mechanism=Mechanism.RSA_PKCS) + assert not public_key.verify(data, b"1234", mechanism=Mechanism.RSA_PKCS) + + +@pytest.mark.requires(Mechanism.SHA512_RSA_PKCS) +def test_sign_default(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = b"HELLO WORLD" * 1024 + + signature = private_key.sign(data) + assert signature is not None + assert isinstance(signature, bytes) + assert public_key.verify(data, signature) + assert not public_key.verify(data, b"1234") + + +@pytest.mark.requires(Mechanism.SHA512_RSA_PKCS) +def test_sign_stream(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = ( + b"I" * 16, + b"N" * 16, + b"P" * 16, + b"U" * 16, + b"T" * 10, # don't align to the blocksize + ) + + signature = private_key.sign(data) + assert signature is not None + assert isinstance(signature, bytes) + assert public_key.verify(data, signature) + + +@pytest.mark.requires(Mechanism.RSA_PKCS_OAEP) +@pytest.mark.xfail_opencryptoki # can't set key attributes +def test_key_wrap( + session: pkcs11.Session, keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey] +) -> None: + public_key, private_key = keypair + key = session.generate_key( + KeyType.AES, + 128, + template={ + Attribute.EXTRACTABLE: True, + Attribute.SENSITIVE: False, + }, + ) + + data = public_key.wrap_key(key) + assert data != key[Attribute.VALUE] + + key2 = private_key.unwrap_key( + ObjectClass.SECRET_KEY, + KeyType.AES, + data, + template={ + Attribute.EXTRACTABLE: True, + Attribute.SENSITIVE: False, + }, + ) + + assert key[Attribute.VALUE] == key2[Attribute.VALUE] + + +@pytest.mark.requires(Mechanism.RSA_PKCS_OAEP) +def test_encrypt_oaep(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = b"SOME DATA" + + crypttext = public_key.encrypt( + data, + mechanism=Mechanism.RSA_PKCS_OAEP, + mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None), + ) + + assert data != crypttext + + plaintext = private_key.decrypt( + crypttext, + mechanism=Mechanism.RSA_PKCS_OAEP, + mechanism_param=(Mechanism.SHA_1, MGF.SHA1, None), + ) + + assert data == plaintext + + +@pytest.mark.requires(Mechanism.SHA1_RSA_PKCS_PSS) +def test_sign_pss(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = b"SOME DATA" + + # These are the default params + signature = private_key.sign( + data, + mechanism=Mechanism.SHA1_RSA_PKCS_PSS, + mechanism_param=(Mechanism.SHA_1, MGF.SHA1, 20), + ) + + assert public_key.verify(data, signature, mechanism=Mechanism.SHA1_RSA_PKCS_PSS) + + +@pytest.mark.requires(Mechanism.RSA_PKCS_OAEP) +def test_encrypt_too_much_data(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKey]) -> None: + public_key, private_key = keypair + data = b"1234" * 128 + + # You can't encrypt lots of data with RSA + # This should ideally throw DataLen but you can't trust it + with pytest.raises(pkcs11.PKCS11Error): + public_key.encrypt(data) diff --git a/tests/test_sessions.py b/tests/test_sessions.py index 58c3d2e..a7459bb 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -2,158 +2,163 @@ PKCS#11 Sessions """ +import pytest + import pkcs11 +from tests.conftest import IS_NFAST, IS_OPENCRYPTOKI, IS_SOFTHSM + + +@pytest.mark.skipif(IS_NFAST or IS_OPENCRYPTOKI, reason="Login is required.") +def test_open_session(token: pkcs11.Token) -> None: + with token.open() as session: + assert isinstance(session, pkcs11.Session) + + +def test_open_session_and_login_user(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + assert isinstance(session, pkcs11.Session) + + +@pytest.mark.skipif( + not IS_SOFTHSM, reason="We don't have credentials to do this for other platforms." +) +def test_open_session_and_login_so(token: pkcs11.Token, so_pin: str) -> None: + with token.open(rw=True, so_pin=so_pin) as session: + assert isinstance(session, pkcs11.Session) + + +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) +def test_generate_key(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + key = session.generate_key(pkcs11.KeyType.AES, 128) + assert isinstance(key, pkcs11.Object) + assert isinstance(key, pkcs11.SecretKey) + assert isinstance(key, pkcs11.EncryptMixin) + + assert key.object_class is pkcs11.ObjectClass.SECRET_KEY + + # Test GetAttribute + assert key[pkcs11.Attribute.CLASS] is pkcs11.ObjectClass.SECRET_KEY + assert key[pkcs11.Attribute.TOKEN] is False + assert key[pkcs11.Attribute.LOCAL] is True + assert key[pkcs11.Attribute.MODIFIABLE] is True + assert key[pkcs11.Attribute.LABEL] == "" + + # Test SetAttribute + key[pkcs11.Attribute.LABEL] = "DEMO" + + assert key[pkcs11.Attribute.LABEL] == "DEMO" + + # Create another key with no capabilities + key = session.generate_key( + pkcs11.KeyType.AES, 128, label="MY KEY", id=b"\1\2\3\4", capabilities=0 + ) + assert isinstance(key, pkcs11.Object) + assert isinstance(key, pkcs11.SecretKey) + assert not isinstance(key, pkcs11.EncryptMixin) + + assert key.label == "MY KEY" + + +@pytest.mark.requires(pkcs11.Mechanism.RSA_PKCS_KEY_PAIR_GEN, pkcs11.Mechanism.RSA_PKCS) +def test_generate_keypair(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + pub, priv = session.generate_keypair(pkcs11.KeyType.RSA, 1024) + assert isinstance(pub, pkcs11.PublicKey) + assert isinstance(priv, pkcs11.PrivateKey) + + data = b"HELLO WORLD" + crypttext = pub.encrypt(data, mechanism=pkcs11.Mechanism.RSA_PKCS) + assert data != crypttext + text = priv.decrypt(crypttext, mechanism=pkcs11.Mechanism.RSA_PKCS) + assert data == text + + +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) +def test_get_objects(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") + + search = list(session.get_objects({pkcs11.Attribute.LABEL: "SAMPLE KEY"})) + + assert len(search) == 1 + assert key == search[0] + + +@pytest.mark.xfail_opencryptoki +def test_create_object(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + key = session.create_object( + { + pkcs11.Attribute.CLASS: pkcs11.ObjectClass.SECRET_KEY, + pkcs11.Attribute.KEY_TYPE: pkcs11.KeyType.AES, + pkcs11.Attribute.VALUE: b"1" * 16, + } + ) + + assert isinstance(key, pkcs11.SecretKey) + assert key.key_length == 128 + + +@pytest.mark.skipif(IS_NFAST, reason="nFast won't destroy objects.") +def test_destroy_object(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") + key.destroy() + + assert list(session.get_objects()) == [] + + +@pytest.mark.skipif(IS_NFAST, reason="nFast won't destroy objects.") +def test_copy_object(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") + new = key.copy( + { + pkcs11.Attribute.LABEL: "SOMETHING ELSE", + } + ) + + assert set(session.get_objects()) == {key, new} + + +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) +def test_get_key(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") + + key = session.get_key( + label="SAMPLE KEY", + ) + assert isinstance(key, pkcs11.SecretKey) + key.encrypt(b"test", mechanism_param=b"IV" * 8) + + +def test_get_key_not_found(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + with pytest.raises(pkcs11.NoSuchKey): + session.get_key(label="SAMPLE KEY") + + +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) +def test_get_key_vague(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") + session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY 2") + + with pytest.raises(pkcs11.MultipleObjectsReturned): + session.get_key(key_type=pkcs11.KeyType.AES) + + +@pytest.mark.skipif(IS_NFAST or IS_OPENCRYPTOKI, reason="Not supported.") +def test_seed_random(token: pkcs11.Token) -> None: + with token.open() as session: + session.seed_random(b"12345678") + -from . import FIXME, TOKEN_PIN, TOKEN_SO_PIN, Not, Only, TestCase, requires - - -class SessionTests(TestCase): - with_session = False - - @Not.nfast # Login is required - @Not.opencryptoki - def test_open_session(self): - with self.token.open() as session: - self.assertIsInstance(session, pkcs11.Session) - - def test_open_session_and_login_user(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - self.assertIsInstance(session, pkcs11.Session) - - @Only.softhsm2 # We don't have credentials to do this for other platforms - def test_open_session_and_login_so(self): - with self.token.open(rw=True, so_pin=TOKEN_SO_PIN) as session: - self.assertIsInstance(session, pkcs11.Session) - - @requires(pkcs11.Mechanism.AES_KEY_GEN) - def test_generate_key(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - key = session.generate_key(pkcs11.KeyType.AES, 128) - self.assertIsInstance(key, pkcs11.Object) - self.assertIsInstance(key, pkcs11.SecretKey) - self.assertIsInstance(key, pkcs11.EncryptMixin) - - self.assertIs(key.object_class, pkcs11.ObjectClass.SECRET_KEY) - - # Test GetAttribute - self.assertIs(key[pkcs11.Attribute.CLASS], pkcs11.ObjectClass.SECRET_KEY) - self.assertEqual(key[pkcs11.Attribute.TOKEN], False) - self.assertEqual(key[pkcs11.Attribute.LOCAL], True) - self.assertEqual(key[pkcs11.Attribute.MODIFIABLE], True) - self.assertEqual(key[pkcs11.Attribute.LABEL], "") - - # Test SetAttribute - key[pkcs11.Attribute.LABEL] = "DEMO" - - self.assertEqual(key[pkcs11.Attribute.LABEL], "DEMO") - - # Create another key with no capabilities - key = session.generate_key( - pkcs11.KeyType.AES, 128, label="MY KEY", id=b"\1\2\3\4", capabilities=0 - ) - self.assertIsInstance(key, pkcs11.Object) - self.assertIsInstance(key, pkcs11.SecretKey) - self.assertNotIsInstance(key, pkcs11.EncryptMixin) - - self.assertEqual(key.label, "MY KEY") - - @requires(pkcs11.Mechanism.RSA_PKCS_KEY_PAIR_GEN, pkcs11.Mechanism.RSA_PKCS) - def test_generate_keypair(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - pub, priv = session.generate_keypair(pkcs11.KeyType.RSA, 1024) - self.assertIsInstance(pub, pkcs11.PublicKey) - self.assertIsInstance(priv, pkcs11.PrivateKey) - - data = b"HELLO WORLD" - crypttext = pub.encrypt(data, mechanism=pkcs11.Mechanism.RSA_PKCS) - self.assertNotEqual(data, crypttext) - text = priv.decrypt(crypttext, mechanism=pkcs11.Mechanism.RSA_PKCS) - self.assertEqual(data, text) - - @requires(pkcs11.Mechanism.AES_KEY_GEN) - def test_get_objects(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") - - search = list( - session.get_objects( - { - pkcs11.Attribute.LABEL: "SAMPLE KEY", - } - ) - ) - - self.assertEqual(len(search), 1) - self.assertEqual(key, search[0]) - - @FIXME.opencryptoki - def test_create_object(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - key = session.create_object( - { - pkcs11.Attribute.CLASS: pkcs11.ObjectClass.SECRET_KEY, - pkcs11.Attribute.KEY_TYPE: pkcs11.KeyType.AES, - pkcs11.Attribute.VALUE: b"1" * 16, - } - ) - - self.assertIsInstance(key, pkcs11.SecretKey) - self.assertEqual(key.key_length, 128) - - @Not.nfast # nFast won't destroy objects - def test_destroy_object(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") - key.destroy() - - self.assertEqual(list(session.get_objects()), []) - - @Only.softhsm2 - def test_copy_object(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") - new = key.copy( - { - pkcs11.Attribute.LABEL: "SOMETHING ELSE", - } - ) - - self.assertEqual(set(session.get_objects()), {key, new}) - - @requires(pkcs11.Mechanism.AES_KEY_GEN) - def test_get_key(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") - - key = session.get_key( - label="SAMPLE KEY", - ) - self.assertIsInstance(key, pkcs11.SecretKey) - key.encrypt(b"test", mechanism_param=b"IV" * 8) - - def test_get_key_not_found(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - with self.assertRaises(pkcs11.NoSuchKey): - session.get_key(label="SAMPLE KEY") - - @requires(pkcs11.Mechanism.AES_KEY_GEN) - def test_get_key_vague(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") - session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY 2") - - with self.assertRaises(pkcs11.MultipleObjectsReturned): - session.get_key(key_type=pkcs11.KeyType.AES) - - @Not.nfast # Not supported - @Not.opencryptoki # Not supported - def test_seed_random(self): - with self.token.open() as session: - session.seed_random(b"12345678") - - def test_generate_random(self): - with self.token.open(user_pin=TOKEN_PIN) as session: - random = session.generate_random(16 * 8) - self.assertEqual(len(random), 16) - # Ensure we didn't get 16 bytes of zeros - self.assertTrue(all(c != "\0" for c in random)) +def test_generate_random(token: pkcs11.Token, pin: str) -> None: + with token.open(user_pin=pin) as session: + random = session.generate_random(16 * 8) + assert len(random) == 16 + # Ensure we didn't get 16 bytes of zeros + assert all(c != "\x00" for c in random) diff --git a/tests/test_slots_and_tokens.py b/tests/test_slots_and_tokens.py index 8e49818..1a9d3c1 100644 --- a/tests/test_slots_and_tokens.py +++ b/tests/test_slots_and_tokens.py @@ -2,72 +2,79 @@ PKCS#11 Slots and Tokens """ -import unittest +import pytest import pkcs11 +from tests.conftest import IS_NFAST, IS_OPENCRYPTOKI, IS_SOFTHSM, LIB_PATH -from . import LIB, TOKEN, Not, Only - - -class SlotsAndTokensTests(unittest.TestCase): - def test_double_initialise(self): - self.assertIsNotNone(pkcs11.lib(LIB)) - self.assertIsNotNone(pkcs11.lib(LIB)) - - def test_double_initialise_different_libs(self): - self.assertIsNotNone(pkcs11.lib(LIB)) - with self.assertRaises(pkcs11.AlreadyInitialized): - pkcs11.lib("somethingelse.so") - - @Only.softhsm2 - def test_get_slots(self): - lib = pkcs11.lib(LIB) - slots = lib.get_slots() - - self.assertEqual(len(slots), 2) - slot1, slot2 = slots - - self.assertIsInstance(slot1, pkcs11.Slot) - self.assertEqual(slot1.flags, pkcs11.SlotFlag.TOKEN_PRESENT) - - def test_get_mechanisms(self): - lib = pkcs11.lib(LIB) - slot, *_ = lib.get_slots() - mechanisms = slot.get_mechanisms() - self.assertIn(pkcs11.Mechanism.RSA_PKCS, mechanisms) - - def test_get_mechanism_info(self): - lib = pkcs11.lib(LIB) - slot, *_ = lib.get_slots() - info = slot.get_mechanism_info(pkcs11.Mechanism.RSA_PKCS_OAEP) - self.assertIsInstance(info, pkcs11.MechanismInfo) - - @Not.nfast # EC not supported - @Not.opencryptoki - def test_get_mechanism_info_ec(self): - lib = pkcs11.lib(LIB) - slot, *_ = lib.get_slots() - info = slot.get_mechanism_info(pkcs11.Mechanism.EC_KEY_PAIR_GEN) - self.assertIsInstance(info, pkcs11.MechanismInfo) - self.assertIn(pkcs11.MechanismFlag.EC_NAMEDCURVE, info.flags) - - @Only.softhsm2 - def test_get_tokens(self): - lib = pkcs11.lib(LIB) - - tokens = lib.get_tokens(token_flags=pkcs11.TokenFlag.RNG) - self.assertEqual(len(list(tokens)), 2) - - tokens = lib.get_tokens(token_label=TOKEN) - self.assertEqual(len(list(tokens)), 1) - - @Only.softhsm2 - def test_get_token(self): - lib = pkcs11.lib(LIB) - slot, *_ = lib.get_slots() - token = slot.get_token() - - self.assertIsInstance(token, pkcs11.Token) - self.assertEqual(token.label, TOKEN) - self.assertIn(pkcs11.TokenFlag.TOKEN_INITIALIZED, token.flags) - self.assertIn(pkcs11.TokenFlag.LOGIN_REQUIRED, token.flags) + +def test_double_initialise() -> None: + assert pkcs11.lib(LIB_PATH) is not None + assert pkcs11.lib(LIB_PATH) is not None + + +def test_double_initialise_different_libs() -> None: + assert pkcs11.lib(LIB_PATH) is not None + with pytest.raises(pkcs11.AlreadyInitialized): + pkcs11.lib("somethingelse.so") + + +@pytest.mark.skipif(not IS_SOFTHSM, reason="Only supported on SoftHSMv2.") +@pytest.mark.usefixtures("softhsm_token") +def test_get_slots() -> None: + lib = pkcs11.lib(LIB_PATH) + slots = lib.get_slots() + print(slots) + + assert len(slots) == 2 + slot1, slot2 = slots + + assert isinstance(slot1, pkcs11.Slot) + assert slot1.flags == pkcs11.SlotFlag.TOKEN_PRESENT + + +def test_get_mechanisms() -> None: + lib = pkcs11.lib(LIB_PATH) + slot, *_ = lib.get_slots() + mechanisms = slot.get_mechanisms() + assert pkcs11.Mechanism.RSA_PKCS in mechanisms + + +def test_get_mechanism_info() -> None: + lib = pkcs11.lib(LIB_PATH) + slot, *_ = lib.get_slots() + info = slot.get_mechanism_info(pkcs11.Mechanism.RSA_PKCS_OAEP) + assert isinstance(info, pkcs11.MechanismInfo) + + +@pytest.mark.skipif(IS_NFAST or IS_OPENCRYPTOKI, reason="EC not supported.") +def test_get_mechanism_info_ec() -> None: + lib = pkcs11.lib(LIB_PATH) + slot, *_ = lib.get_slots() + info = slot.get_mechanism_info(pkcs11.Mechanism.EC_KEY_PAIR_GEN) + assert isinstance(info, pkcs11.MechanismInfo) + assert pkcs11.MechanismFlag.EC_NAMEDCURVE in info.flags + + +@pytest.mark.skipif(not IS_SOFTHSM, reason="Only supported on SoftHSMv2.") +def test_get_tokens(softhsm_token: pkcs11.Token) -> None: + lib = pkcs11.lib(LIB_PATH) + + tokens = list(lib.get_tokens(token_flags=pkcs11.TokenFlag.RNG)) + print(tokens) + assert len(list(tokens)) == 2 + + tokens = lib.get_tokens(token_label=softhsm_token.label) + assert len(list(tokens)) == 1 + + +@pytest.mark.skipif(not IS_SOFTHSM, reason="Only supported on SoftHSMv2.") +def test_get_token(token: pkcs11.Token) -> None: + lib = pkcs11.lib(LIB_PATH) + slot, *_ = lib.get_slots() + actual_token = slot.get_token() + + assert isinstance(actual_token, pkcs11.Token) + assert actual_token.label == token.label + assert pkcs11.TokenFlag.TOKEN_INITIALIZED in actual_token.flags + assert pkcs11.TokenFlag.LOGIN_REQUIRED in actual_token.flags diff --git a/tests/test_threading.py b/tests/test_threading.py index 5ec3a38..88a1d34 100644 --- a/tests/test_threading.py +++ b/tests/test_threading.py @@ -7,37 +7,42 @@ import threading +import pytest + import pkcs11 -from . import Not, TestCase, requires +from .conftest import IS_NFAST + +pytestmark = [ + pytest.mark.skipif(IS_NFAST, reason="Deadlocks nfast ... something wrong with threading?") +] -@Not.nfast # Deadlocks nfast ... something wrong with threading? -class ThreadingTests(TestCase): - @requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) - def test_concurrency(self): - # Multiplexing a session between processes - self.session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") +@pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) +@pytest.mark.requires(pkcs11.Mechanism.AES_CBC_PAD) +def test_concurrency(session: pkcs11.Session) -> None: + # Multiplexing a session between processes + session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") - test_passed = [True] + test_passed = [True] - def thread_work(): - try: - data = b"1234" * 1024 * 1024 # Multichunk files - iv = self.session.generate_random(128) - key = self.session.get_key(label="LOOK ME UP") - self.assertIsNotNone(key.encrypt(data, mechanism_param=iv)) - except pkcs11.PKCS11Error: - test_passed[0] = False - raise + def thread_work(): + try: + data = b"1234" * 1024 * 1024 # Multichunk files + iv = session.generate_random(128) + key = session.get_key(label="LOOK ME UP") + assert key.encrypt(data, mechanism_param=iv) is not None + except pkcs11.PKCS11Error: + test_passed[0] = False + raise - threads = [threading.Thread(target=thread_work) for _ in range(10)] + threads = [threading.Thread(target=thread_work) for _ in range(10)] - for thread in threads: - thread.start() + for thread in threads: + thread.start() - # join each thread - for thread in threads: - thread.join() + # join each thread + for thread in threads: + thread.join() - self.assertTrue(test_passed[0]) + assert test_passed[0] diff --git a/tests/test_x509.py b/tests/test_x509.py index b4a7dca..e86fad7 100644 --- a/tests/test_x509.py +++ b/tests/test_x509.py @@ -5,24 +5,21 @@ import base64 import datetime import subprocess +from pathlib import Path +import pytest from asn1crypto import pem from asn1crypto.csr import CertificationRequest, CertificationRequestInfo from asn1crypto.keys import RSAPublicKey from asn1crypto.x509 import Certificate, Name, TbsCertificate, Time import pkcs11 -from pkcs11 import ( - Attribute, - KeyType, - Mechanism, -) +from pkcs11 import Attribute, KeyType, Mechanism from pkcs11.util.dsa import decode_dsa_signature from pkcs11.util.ec import decode_ecdsa_signature from pkcs11.util.rsa import encode_rsa_public_key from pkcs11.util.x509 import decode_x509_certificate, decode_x509_public_key - -from . import OPENSSL, Not, Only, TestCase, requires +from tests.conftest import IS_NFAST, IS_OPENCRYPTOKI, OPENSSL # X.509 self-signed certificate (generated with OpenSSL) # openssl req -x509 \ @@ -49,233 +46,245 @@ """) -class X509Tests(TestCase): - def test_import_ca_certificate_easy(self): - cert = self.session.create_object(decode_x509_certificate(CERT)) - self.assertIsInstance(cert, pkcs11.Certificate) - - @Not.nfast - @Not.opencryptoki - def test_import_ca_certificate(self): - cert = self.session.create_object(decode_x509_certificate(CERT, extended_set=True)) - self.assertIsInstance(cert, pkcs11.Certificate) - - self.assertEqual( - cert[Attribute.HASH_OF_ISSUER_PUBLIC_KEY], - b"\xf9\xc1\xb6\xe3\x43\xf3\xcf\x4c\xba\x8a" b"\x0b\x66\x86\x79\x35\xfb\x52\x85\xbf\xa8", - ) - # Cert is self signed - self.assertEqual( - cert[Attribute.HASH_OF_SUBJECT_PUBLIC_KEY], - b"\xf9\xc1\xb6\xe3\x43\xf3\xcf\x4c\xba\x8a" b"\x0b\x66\x86\x79\x35\xfb\x52\x85\xbf\xa8", - ) - - @requires(Mechanism.SHA1_RSA_PKCS) - def test_verify_certificate_rsa(self): - # Warning: proof of concept code only! - x509 = Certificate.load(CERT) - key = self.session.create_object(decode_x509_public_key(CERT)) - self.assertIsInstance(key, pkcs11.PublicKey) - - value = x509["tbs_certificate"].dump() - signature = x509.signature - - assert x509.signature_algo == "rsassa_pkcs1v15" - assert x509.hash_algo == "sha1" - - self.assertTrue(key.verify(value, signature, mechanism=Mechanism.SHA1_RSA_PKCS)) - - @requires(Mechanism.DSA_SHA1) - def test_verify_certificate_dsa(self): - # Warning: proof of concept code only! - CERT = base64.b64decode(""" - MIIDbjCCAy6gAwIBAgIJAKPBInGiPjXNMAkGByqGSM44BAMwRTELMAkGA1UEBhMC - QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp - dHMgUHR5IEx0ZDAeFw0xNzA3MDMxMjI1MTBaFw0xOTA3MDMxMjI1MTBaMEUxCzAJ - BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l - dCBXaWRnaXRzIFB0eSBMdGQwggG3MIIBLAYHKoZIzjgEATCCAR8CgYEA7U0AshA/ - 4MXQ3MHykoeotEoPc+OXFMJ2PHzKfbFD80UC5bloxC9kp908GG3emdqbJuCTfVUD - sex1vEgMj1sEwilBow954zMqncu5lLBIGZKjT6tloW8sFt50sE0l+YnBvAiw9uoL - 9lBOZLKh87zWPZUuORm8lWhZEwjUnZ+3S5ECFQCNJGd68RpctgkA1kDp33NhQhev - lQKBgQCQ6uYkvNpHMtXwyGII4JyOyStbteHjHdKfJfLNRyIEEq/E4e3Do6NGIr26 - Z7u9iBsA5/aU6gKSBrYprxY1hdR4gTRBNzSUDEzf7IX3bfRIbBhjlNBSBba5Fs0z - /kszZbZ8XYGVxs92aWFk/1JIZ0wnToC794+juq72/TvrtvxdowOBhAACgYAjoknQ - kRD0+x3GkbngQCU+VNspZuXboB22CU3bDGVAVhmI5N02M8NmeuN7SqqYZAlw01Ju - rzBF7i9VW4qxBaWszMCwyozerSVjZ2JA/Qubb57v/p7F3FDHq7E33FZzgyhOimds - rzXpVErCGJJ1oBGz5H5fvoKnQmfh0X8N/VHkZqOBpzCBpDAdBgNVHQ4EFgQUQayv - usUnpvRgc9OtXGddqMiwm5cwdQYDVR0jBG4wbIAUQayvusUnpvRgc9OtXGddqMiw - m5ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYD - VQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCjwSJxoj41zTAMBgNVHRME - BTADAQH/MAkGByqGSM44BAMDLwAwLAIUNE+zTuFe01v0BRTLarPtGK8ZHHcCFB9Y - YAwtpblAgUEdGuoAtnoEQ2tc - """) - - x509 = Certificate.load(CERT) - key = self.session.create_object(decode_x509_public_key(CERT)) - self.assertIsInstance(key, pkcs11.PublicKey) - - value = x509["tbs_certificate"].dump() - - assert x509.signature_algo == "dsa" - assert x509.hash_algo == "sha1" - - signature = decode_dsa_signature(x509.signature) - - self.assertTrue(key.verify(value, signature, mechanism=Mechanism.DSA_SHA1)) - - @requires(Mechanism.ECDSA_SHA1) - def test_verify_certificate_ecdsa(self): - # Warning: proof of concept code only! - CERT = base64.b64decode(""" - MIIDGjCCAsKgAwIBAgIJAL+PbwiJUZB1MAkGByqGSM49BAEwRTELMAkGA1UEBhMC - QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp - dHMgUHR5IEx0ZDAeFw0xNzA3MDMxMTUxMTBaFw0xOTA3MDMxMTUxMTBaMEUxCzAJ - BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l - dCBXaWRnaXRzIFB0eSBMdGQwggFLMIIBAwYHKoZIzj0CATCB9wIBATAsBgcqhkjO - PQEBAiEA/////wAAAAEAAAAAAAAAAAAAAAD///////////////8wWwQg/////wAA - AAEAAAAAAAAAAAAAAAD///////////////wEIFrGNdiqOpPns+u9VXaYhrxlHQaw - zFOw9jvOPD4n0mBLAxUAxJ02CIbnBJNqZnjhE50mt4GffpAEQQRrF9Hy4SxCR/i8 - 5uVjpEDydwN9gS3rM6D0oTlF2JjClk/jQuL+Gn+bjufrSnwPnhYrzjNXazFezsu2 - QGg3v1H1AiEA/////wAAAAD//////////7zm+q2nF56E87nKwvxjJVECAQEDQgAE - royPJHkCQMq55egxmQxkFWqiz+yJx0MZP98is99SrkiK5UadFim3r3ZSt5kfh/cc - Ccmy94BZCmihhGJ0F4eB2qOBpzCBpDAdBgNVHQ4EFgQURNXKlYGsAMItf4Ad8fkg - Rg9ATqEwdQYDVR0jBG4wbIAURNXKlYGsAMItf4Ad8fkgRg9ATqGhSaRHMEUxCzAJ - BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l - dCBXaWRnaXRzIFB0eSBMdGSCCQC/j28IiVGQdTAMBgNVHRMEBTADAQH/MAkGByqG - SM49BAEDRwAwRAIgAdJp/S9vSjS6EvRy/9zl5k2DBKGI52A3Ygsp1a96UicCIDul - m/eL2OcGdNbzqzsC11alhemJX7Qt9GOcVqQwROIm - """) - - x509 = Certificate.load(CERT) - key = self.session.create_object(decode_x509_public_key(CERT)) - self.assertIsInstance(key, pkcs11.PublicKey) - - value = x509["tbs_certificate"].dump() - - assert x509.signature_algo == "ecdsa" - assert x509.hash_algo == "sha1" - - signature = decode_ecdsa_signature(x509.signature) - - self.assertTrue(key.verify(value, signature, mechanism=Mechanism.ECDSA_SHA1)) - - @Only.openssl - @requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN, Mechanism.SHA1_RSA_PKCS) - def test_self_sign_certificate(self): - # Warning: proof of concept code only! - pub, priv = self.session.generate_keypair(KeyType.RSA, 1024) - - tbs = TbsCertificate( - { - "version": "v1", - "serial_number": 1, - "issuer": Name.build( +def test_import_ca_certificate_easy(session: pkcs11.Session) -> None: + cert = session.create_object(decode_x509_certificate(CERT)) + assert isinstance(cert, pkcs11.Certificate) + + +@pytest.mark.skipif(IS_NFAST or IS_OPENCRYPTOKI, reason="Unknown reason.") +def test_import_ca_certificate(session: pkcs11.Session) -> None: + cert = session.create_object(decode_x509_certificate(CERT, extended_set=True)) + assert isinstance(cert, pkcs11.Certificate) + + assert ( + cert[Attribute.HASH_OF_ISSUER_PUBLIC_KEY] == b"\xf9\xc1\xb6\xe3C\xf3\xcfL\xba\x8a" + b"\x0bf\x86y5\xfbR\x85\xbf\xa8" + ) + # Cert is self signed + assert ( + cert[Attribute.HASH_OF_SUBJECT_PUBLIC_KEY] == b"\xf9\xc1\xb6\xe3C\xf3\xcfL\xba\x8a" + b"\x0bf\x86y5\xfbR\x85\xbf\xa8" + ) + + +@pytest.mark.requires(Mechanism.SHA1_RSA_PKCS) +def test_verify_certificate_rsa(session: pkcs11.Session) -> None: + # Warning: proof of concept code only! + x509 = Certificate.load(CERT) + key = session.create_object(decode_x509_public_key(CERT)) + assert isinstance(key, pkcs11.PublicKey) + + value = x509["tbs_certificate"].dump() + signature = x509.signature + + assert x509.signature_algo == "rsassa_pkcs1v15" + assert x509.hash_algo == "sha1" + + assert key.verify(value, signature, mechanism=Mechanism.SHA1_RSA_PKCS) + + +@pytest.mark.requires(Mechanism.DSA_SHA1) +def test_verify_certificate_dsa(session: pkcs11.Session) -> None: + # Warning: proof of concept code only! + CERT = base64.b64decode(""" + MIIDbjCCAy6gAwIBAgIJAKPBInGiPjXNMAkGByqGSM44BAMwRTELMAkGA1UEBhMC + QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp + dHMgUHR5IEx0ZDAeFw0xNzA3MDMxMjI1MTBaFw0xOTA3MDMxMjI1MTBaMEUxCzAJ + BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l + dCBXaWRnaXRzIFB0eSBMdGQwggG3MIIBLAYHKoZIzjgEATCCAR8CgYEA7U0AshA/ + 4MXQ3MHykoeotEoPc+OXFMJ2PHzKfbFD80UC5bloxC9kp908GG3emdqbJuCTfVUD + sex1vEgMj1sEwilBow954zMqncu5lLBIGZKjT6tloW8sFt50sE0l+YnBvAiw9uoL + 9lBOZLKh87zWPZUuORm8lWhZEwjUnZ+3S5ECFQCNJGd68RpctgkA1kDp33NhQhev + lQKBgQCQ6uYkvNpHMtXwyGII4JyOyStbteHjHdKfJfLNRyIEEq/E4e3Do6NGIr26 + Z7u9iBsA5/aU6gKSBrYprxY1hdR4gTRBNzSUDEzf7IX3bfRIbBhjlNBSBba5Fs0z + /kszZbZ8XYGVxs92aWFk/1JIZ0wnToC794+juq72/TvrtvxdowOBhAACgYAjoknQ + kRD0+x3GkbngQCU+VNspZuXboB22CU3bDGVAVhmI5N02M8NmeuN7SqqYZAlw01Ju + rzBF7i9VW4qxBaWszMCwyozerSVjZ2JA/Qubb57v/p7F3FDHq7E33FZzgyhOimds + rzXpVErCGJJ1oBGz5H5fvoKnQmfh0X8N/VHkZqOBpzCBpDAdBgNVHQ4EFgQUQayv + usUnpvRgc9OtXGddqMiwm5cwdQYDVR0jBG4wbIAUQayvusUnpvRgc9OtXGddqMiw + m5ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYD + VQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCjwSJxoj41zTAMBgNVHRME + BTADAQH/MAkGByqGSM44BAMDLwAwLAIUNE+zTuFe01v0BRTLarPtGK8ZHHcCFB9Y + YAwtpblAgUEdGuoAtnoEQ2tc + """) + + x509 = Certificate.load(CERT) + key = session.create_object(decode_x509_public_key(CERT)) + assert isinstance(key, pkcs11.PublicKey) + + value = x509["tbs_certificate"].dump() + + assert x509.signature_algo == "dsa" + assert x509.hash_algo == "sha1" + + signature = decode_dsa_signature(x509.signature) + + assert key.verify(value, signature, mechanism=Mechanism.DSA_SHA1) + + +@pytest.mark.requires(Mechanism.ECDSA_SHA1) +def test_verify_certificate_ecdsa(session: pkcs11.Session) -> None: + # Warning: proof of concept code only! + CERT = base64.b64decode(""" + MIIDGjCCAsKgAwIBAgIJAL+PbwiJUZB1MAkGByqGSM49BAEwRTELMAkGA1UEBhMC + QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp + dHMgUHR5IEx0ZDAeFw0xNzA3MDMxMTUxMTBaFw0xOTA3MDMxMTUxMTBaMEUxCzAJ + BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l + dCBXaWRnaXRzIFB0eSBMdGQwggFLMIIBAwYHKoZIzj0CATCB9wIBATAsBgcqhkjO + PQEBAiEA/////wAAAAEAAAAAAAAAAAAAAAD///////////////8wWwQg/////wAA + AAEAAAAAAAAAAAAAAAD///////////////wEIFrGNdiqOpPns+u9VXaYhrxlHQaw + zFOw9jvOPD4n0mBLAxUAxJ02CIbnBJNqZnjhE50mt4GffpAEQQRrF9Hy4SxCR/i8 + 5uVjpEDydwN9gS3rM6D0oTlF2JjClk/jQuL+Gn+bjufrSnwPnhYrzjNXazFezsu2 + QGg3v1H1AiEA/////wAAAAD//////////7zm+q2nF56E87nKwvxjJVECAQEDQgAE + royPJHkCQMq55egxmQxkFWqiz+yJx0MZP98is99SrkiK5UadFim3r3ZSt5kfh/cc + Ccmy94BZCmihhGJ0F4eB2qOBpzCBpDAdBgNVHQ4EFgQURNXKlYGsAMItf4Ad8fkg + Rg9ATqEwdQYDVR0jBG4wbIAURNXKlYGsAMItf4Ad8fkgRg9ATqGhSaRHMEUxCzAJ + BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l + dCBXaWRnaXRzIFB0eSBMdGSCCQC/j28IiVGQdTAMBgNVHRMEBTADAQH/MAkGByqG + SM49BAEDRwAwRAIgAdJp/S9vSjS6EvRy/9zl5k2DBKGI52A3Ygsp1a96UicCIDul + m/eL2OcGdNbzqzsC11alhemJX7Qt9GOcVqQwROIm + """) + + x509 = Certificate.load(CERT) + key = session.create_object(decode_x509_public_key(CERT)) + assert isinstance(key, pkcs11.PublicKey) + + value = x509["tbs_certificate"].dump() + + assert x509.signature_algo == "ecdsa" + assert x509.hash_algo == "sha1" + + signature = decode_ecdsa_signature(x509.signature) + + assert key.verify(value, signature, mechanism=Mechanism.ECDSA_SHA1) + + +@pytest.mark.skipif(OPENSSL is None, reason="openssl command not found.") +@pytest.mark.requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN) +@pytest.mark.requires(Mechanism.SHA1_RSA_PKCS) +def test_self_sign_certificate(tmpdir: Path, session: pkcs11.Session) -> None: + # Warning: proof of concept code only! + pub, priv = session.generate_keypair(KeyType.RSA, 1024) + + tbs = TbsCertificate( + { + "version": "v1", + "serial_number": 1, + "issuer": Name.build( + { + "common_name": "Test Certificate", + } + ), + "subject": Name.build( + { + "common_name": "Test Certificate", + } + ), + "signature": { + "algorithm": "sha1_rsa", + "parameters": None, + }, + "validity": { + "not_before": Time( { - "common_name": "Test Certificate", + "utc_time": datetime.datetime( + 2017, 1, 1, 0, 0, tzinfo=datetime.timezone.utc + ), } ), - "subject": Name.build( + "not_after": Time( { - "common_name": "Test Certificate", + "utc_time": datetime.datetime( + 2038, 12, 31, 23, 59, tzinfo=datetime.timezone.utc + ), } ), - "signature": { - "algorithm": "sha1_rsa", - "parameters": None, - }, - "validity": { - "not_before": Time( - { - "utc_time": datetime.datetime( - 2017, 1, 1, 0, 0, tzinfo=datetime.timezone.utc - ), - } - ), - "not_after": Time( - { - "utc_time": datetime.datetime( - 2038, 12, 31, 23, 59, tzinfo=datetime.timezone.utc - ), - } - ), - }, - "subject_public_key_info": { - "algorithm": { - "algorithm": "rsa", - "parameters": None, - }, - "public_key": RSAPublicKey.load(encode_rsa_public_key(pub)), - }, - } - ) - - # Sign the TBS Certificate - value = priv.sign(tbs.dump(), mechanism=Mechanism.SHA1_RSA_PKCS) - - cert = Certificate( - { - "tbs_certificate": tbs, - "signature_algorithm": { - "algorithm": "sha1_rsa", + }, + "subject_public_key_info": { + "algorithm": { + "algorithm": "rsa", "parameters": None, }, - "signature_value": value, - } - ) - - # Pipe our certificate to OpenSSL to verify it - with subprocess.Popen( - (OPENSSL, "verify"), stdin=subprocess.PIPE, stdout=subprocess.DEVNULL - ) as proc: - proc.stdin.write(pem.armor("CERTIFICATE", cert.dump())) - proc.stdin.close() - self.assertEqual(proc.wait(), 0) - - @Only.openssl - @requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN, Mechanism.SHA1_RSA_PKCS) - def test_sign_csr(self): - # Warning: proof of concept code only! - pub, priv = self.session.generate_keypair(KeyType.RSA, 1024) - - info = CertificationRequestInfo( - { - "version": 0, - "subject": Name.build( - { - "common_name": "Test Certificate", - } - ), - "subject_pk_info": { - "algorithm": { - "algorithm": "rsa", - "parameters": None, - }, - "public_key": RSAPublicKey.load(encode_rsa_public_key(pub)), - }, - } - ) - - # Sign the CSR Info - value = priv.sign(info.dump(), mechanism=Mechanism.SHA1_RSA_PKCS) - - csr = CertificationRequest( - { - "certification_request_info": info, - "signature_algorithm": { - "algorithm": "sha1_rsa", + "public_key": RSAPublicKey.load(encode_rsa_public_key(pub)), + }, + } + ) + + # Sign the TBS Certificate + value = priv.sign(tbs.dump(), mechanism=Mechanism.SHA1_RSA_PKCS) + + cert = Certificate( + { + "tbs_certificate": tbs, + "signature_algorithm": { + "algorithm": "sha1_rsa", + "parameters": None, + }, + "signature_value": value, + } + ) + + pem_cert = pem.armor("CERTIFICATE", cert.dump()) + pem_path = tmpdir / "ca.pem" + with open(pem_path, "wb") as stream: + stream.write(pem_cert) + + # Pipe our certificate to OpenSSL to verify it + with subprocess.Popen( + (OPENSSL, "verify", "-CAfile", str(pem_path)), + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + ) as proc: + proc.stdin.write(pem_cert) + proc.stdin.close() + assert proc.wait() == 0 + + +@pytest.mark.skipif(OPENSSL is None, reason="openssl command not found.") +@pytest.mark.requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN, Mechanism.SHA1_RSA_PKCS) +def test_sign_csr(session: pkcs11.Session) -> None: + # Warning: proof of concept code only! + pub, priv = session.generate_keypair(KeyType.RSA, 1024) + + info = CertificationRequestInfo( + { + "version": 0, + "subject": Name.build( + { + "common_name": "Test Certificate", + } + ), + "subject_pk_info": { + "algorithm": { + "algorithm": "rsa", "parameters": None, }, - "signature": value, - } - ) - - # Pipe our CSR to OpenSSL to verify it - with subprocess.Popen( - (OPENSSL, "req", "-inform", "der", "-noout", "-verify"), - stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - ) as proc: - proc.stdin.write(csr.dump()) - proc.stdin.close() - - self.assertEqual(proc.wait(), 0) + "public_key": RSAPublicKey.load(encode_rsa_public_key(pub)), + }, + } + ) + + # Sign the CSR Info + value = priv.sign(info.dump(), mechanism=Mechanism.SHA1_RSA_PKCS) + + csr = CertificationRequest( + { + "certification_request_info": info, + "signature_algorithm": { + "algorithm": "sha1_rsa", + "parameters": None, + }, + "signature": value, + } + ) + + # Pipe our CSR to OpenSSL to verify it + with subprocess.Popen( + (OPENSSL, "req", "-inform", "der", "-noout", "-verify"), + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + ) as proc: + proc.stdin.write(csr.dump()) + proc.stdin.close() + + assert proc.wait() == 0