-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
79feb31
commit 2ba7b72
Showing
16 changed files
with
1,942 additions
and
1,906 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,168 +0,0 @@ | ||
""" | ||
PKCS#11 Tests | ||
The following environment variables will influence the behaviour of test cases: | ||
- PKCS11_MODULE, mandatory, points to the library/DLL to use for testing | ||
- PKCS11_TOKEN_LABEL, mandatory, contains the token label | ||
- PKCS11_TOKEN_PIN, optional (default is None), contains the PIN/passphrase of the token | ||
- PKCS11_TOKEN_SO_PIN, optional (default is same as PKCS11_TOKEN_PIN), security officer PIN | ||
- OPENSSL_PATH, optional, path to openssl executable (i.e. the folder that contains it) | ||
""" | ||
|
||
import os | ||
import shutil | ||
import unittest | ||
from functools import wraps | ||
from warnings import warn | ||
|
||
import pkcs11 | ||
|
||
try: | ||
LIB = os.environ["PKCS11_MODULE"] | ||
except KeyError as ex: | ||
raise RuntimeError("Must define `PKCS11_MODULE' to run tests.") from ex | ||
|
||
|
||
try: | ||
TOKEN = os.environ["PKCS11_TOKEN_LABEL"] | ||
except KeyError as ex: | ||
raise RuntimeError("Must define `PKCS11_TOKEN_LABEL' to run tests.") from ex | ||
|
||
TOKEN_PIN = os.environ.get("PKCS11_TOKEN_PIN") # Can be None | ||
if TOKEN_PIN is None: | ||
warn("`PKCS11_TOKEN_PIN' env variable is unset.", stacklevel=2) | ||
|
||
TOKEN_SO_PIN = os.environ.get("PKCS11_TOKEN_SO_PIN") | ||
if TOKEN_SO_PIN is None: | ||
TOKEN_SO_PIN = TOKEN_PIN | ||
warn( | ||
"`PKCS11_TOKEN_SO_PIN' env variable is unset. Using value from `PKCS11_TOKEN_PIN'", | ||
stacklevel=2, | ||
) | ||
|
||
OPENSSL = shutil.which("openssl", path=os.environ.get("OPENSSL_PATH")) | ||
if OPENSSL is None: | ||
warn("Path to OpenSSL not found. Please adjust `PATH' or define `OPENSSL_PATH'", stacklevel=2) | ||
|
||
|
||
class TestCase(unittest.TestCase): | ||
"""Base test case, optionally creates a token and a session.""" | ||
|
||
with_token = True | ||
"""Creates a token for this test case.""" | ||
with_session = True | ||
"""Creates a session for this test case.""" | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
super().setUpClass() | ||
cls.lib = lib = pkcs11.lib(LIB) | ||
|
||
if cls.with_token or cls.with_session: | ||
cls.token = lib.get_token(token_label=TOKEN) | ||
|
||
def setUp(self): | ||
super().setUp() | ||
|
||
if self.with_session: | ||
self.session = self.token.open(user_pin=TOKEN_PIN) | ||
|
||
def tearDown(self): | ||
if self.with_session: | ||
self.session.close() | ||
|
||
super().tearDown() | ||
|
||
|
||
def requires(*mechanisms): | ||
""" | ||
Decorates a function or class as requiring mechanisms, else they are | ||
skipped. | ||
""" | ||
|
||
def check_requirements(self): | ||
"""Determine what, if any, required mechanisms are unavailable.""" | ||
unavailable = set(mechanisms) - self.token.slot.get_mechanisms() | ||
|
||
if unavailable: | ||
raise unittest.SkipTest("Requires %s" % ", ".join(map(str, unavailable))) | ||
|
||
def inner(func): | ||
@wraps(func) | ||
def wrapper(self, *args, **kwargs): | ||
check_requirements(self) | ||
|
||
return func(self, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return inner | ||
|
||
|
||
def xfail(condition): | ||
"""Mark a test that's expected to fail for a given condition.""" | ||
|
||
def inner(func): | ||
if condition: | ||
return unittest.expectedFailure(func) | ||
|
||
else: | ||
return func | ||
|
||
return inner | ||
|
||
|
||
class Is: | ||
""" | ||
Test what device we're using. | ||
""" | ||
|
||
# trick: str.endswith() can accept tuples, | ||
# see https://stackoverflow.com/questions/18351951/check-if-string-ends-with-one-of-the-strings-from-a-list | ||
softhsm2 = LIB.lower().endswith( | ||
("libsofthsm2.so", "libsofthsm2.dylib", "softhsm2.dll", "softhsm2-x64.dll") | ||
) | ||
nfast = LIB.lower().endswith(("libcknfast.so", "cknfast.dll")) | ||
opencryptoki = LIB.endswith("libopencryptoki.so") | ||
travis = os.environ.get("TRAVIS") == "true" | ||
|
||
|
||
class Avail: | ||
""" | ||
Test if a resource is available | ||
""" | ||
|
||
# openssl is searched across the exec path. Optionally, OPENSSL_PATH env variable can be defined | ||
# in case there is no direct path to it (i.e. PATH does not point to it) | ||
openssl = OPENSSL is not None | ||
|
||
|
||
class Only: | ||
""" | ||
Limit tests to given conditions | ||
""" | ||
|
||
softhsm2 = unittest.skipUnless(Is.softhsm2, "SoftHSMv2 only") | ||
openssl = unittest.skipUnless(Avail.openssl, "openssl not found in the path") | ||
|
||
|
||
class Not: | ||
""" | ||
Ignore tests for given devices | ||
""" | ||
|
||
softhsm2 = unittest.skipIf(Is.softhsm2, "Not supported by SoftHSMv2") | ||
nfast = unittest.skipIf(Is.nfast, "Not supported by nFast") | ||
opencryptoki = unittest.skipIf(Is.opencryptoki, "Not supported by OpenCryptoki") | ||
|
||
|
||
class FIXME: | ||
""" | ||
Tests is broken on this platform. | ||
""" | ||
|
||
softhsm2 = xfail(Is.softhsm2) | ||
nfast = xfail(Is.nfast) | ||
opencryptoki = xfail(Is.opencryptoki) | ||
travis = xfail(Is.travis) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
import os | ||
import secrets | ||
import shutil | ||
import string | ||
import subprocess | ||
from pathlib import Path | ||
from typing import Iterator | ||
from unittest import mock | ||
from warnings import warn | ||
|
||
import pytest | ||
from _pytest.fixtures import SubRequest | ||
|
||
import pkcs11 | ||
|
||
ALLOWED_RANDOM_CHARS = string.ascii_letters + string.digits | ||
LIB_PATH = os.environ.get("PKCS11_MODULE", "/usr/lib/softhsm/libsofthsm2.so") | ||
|
||
# trick: str.endswith() can accept tuples, | ||
# see https://stackoverflow.com/questions/18351951/check-if-string-ends-with-one-of-the-strings-from-a-list | ||
IS_SOFTHSM = LIB_PATH.lower().endswith( | ||
("libsofthsm2.so", "libsofthsm2.dylib", "softhsm2.dll", "softhsm2-x64.dll") | ||
) | ||
IS_NFAST = LIB_PATH.lower().endswith(("libcknfast.so", "cknfast.dll")) | ||
IS_OPENCRYPTOKI = LIB_PATH.endswith("libopencryptoki.so") | ||
|
||
OPENSSL = shutil.which("openssl", path=os.environ.get("OPENSSL_PATH")) | ||
if OPENSSL is None: | ||
warn("Path to OpenSSL not found. Please adjust `PATH' or define `OPENSSL_PATH'", stacklevel=2) | ||
|
||
|
||
def pytest_collection_modifyitems(items) -> None: | ||
for item in items: | ||
markers = [marker.name for marker in item.iter_markers()] | ||
if "xfail_nfast" in markers and IS_NFAST: | ||
item.add_marker( | ||
pytest.mark.xfail(IS_NFAST, reason="Expected failure with nFast.", strict=True) | ||
) | ||
if "xfail_softhsm" in markers and IS_SOFTHSM: | ||
item.add_marker( | ||
pytest.mark.xfail( | ||
IS_SOFTHSM, reason="Expected failure with SoftHSMvs.", strict=True | ||
) | ||
) | ||
if "xfail_opencryptoki" in markers: | ||
item.add_marker( | ||
pytest.mark.xfail( | ||
IS_OPENCRYPTOKI, reason="Expected failure with OpenCryptoki.", strict=True | ||
) | ||
) | ||
|
||
|
||
def get_random_string(length): | ||
return "".join(secrets.choice(ALLOWED_RANDOM_CHARS) for i in range(length)) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def lib(): | ||
return pkcs11.lib(LIB_PATH) | ||
|
||
|
||
@pytest.fixture | ||
def softhsm_setup(tmp_path: Path) -> Iterator[Path]: # pragma: hsm | ||
"""Fixture to set up a unique SoftHSM2 configuration.""" | ||
softhsm_dir = tmp_path / "softhsm" | ||
token_dir = softhsm_dir / "tokens" | ||
token_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
softhsm2_conf = tmp_path / "softhsm2.conf" | ||
print("# SoftHSMv2 conf:", softhsm2_conf) | ||
|
||
with open(softhsm2_conf, "w", encoding="utf-8") as stream: | ||
stream.write(f"""# SoftHSM v2 configuration file | ||
directories.tokendir = {token_dir} | ||
objectstore.backend = file | ||
# ERROR, WARNING, INFO, DEBUG | ||
log.level = DEBUG | ||
# If CKF_REMOVABLE_DEVICE flag should be set | ||
slots.removable = false | ||
# Enable and disable PKCS#11 mechanisms using slots.mechanisms. | ||
slots.mechanisms = ALL | ||
# If the library should reset the state on fork | ||
library.reset_on_fork = false""") | ||
|
||
with mock.patch.dict(os.environ, {"SOFTHSM2_CONF": str(softhsm2_conf)}): | ||
yield softhsm_dir | ||
|
||
|
||
@pytest.fixture | ||
def so_pin() -> str: | ||
return get_random_string(12) | ||
|
||
|
||
@pytest.fixture | ||
def pin() -> str: | ||
return get_random_string(12) | ||
|
||
|
||
@pytest.fixture | ||
def softhsm_token(request: "SubRequest", lib, so_pin: str, pin: str) -> pkcs11.Token: | ||
"""Get a unique token for the current test.""" | ||
request.getfixturevalue("softhsm_setup") | ||
token = get_random_string(8) | ||
|
||
args = ( | ||
"softhsm2-util", | ||
"--init-token", | ||
"--free", | ||
"--label", | ||
token, | ||
"--so-pin", | ||
so_pin, | ||
"--pin", | ||
pin, | ||
) | ||
print("+", " ".join(args)) | ||
subprocess.run(args, check=True) | ||
|
||
# Reinitialize library if already loaded (tokens are only seen after (re-)initialization). | ||
lib.reinitialize() | ||
|
||
return lib.get_token(token_label=token) | ||
|
||
|
||
@pytest.fixture | ||
def softhsm_session(softhsm_token: pkcs11.Token, pin: str) -> Iterator[pkcs11.Session]: | ||
session = softhsm_token.open(user_pin=pin) | ||
yield session | ||
session.close() | ||
|
||
|
||
@pytest.fixture | ||
def token(softhsm_token: pkcs11.Token) -> pkcs11.Token: | ||
return softhsm_token | ||
|
||
|
||
@pytest.fixture | ||
def session( | ||
request: "SubRequest", softhsm_session: pkcs11.Session, softhsm_token: pkcs11.Token | ||
) -> pkcs11.Session: | ||
# Skip test if session does not support required mechanisms | ||
requirements = [mark.args[0] for mark in request.node.iter_markers(name="requires")] | ||
if requirements: | ||
unavailable = set(requirements) - softhsm_token.slot.get_mechanisms() | ||
|
||
if unavailable: | ||
pytest.skip("Requires %s" % ", ".join(map(str, unavailable))) | ||
|
||
return softhsm_session |
Oops, something went wrong.