diff --git a/acapy_agent/admin/decorators/auth.py b/acapy_agent/admin/decorators/auth.py index 818f297d40..3d1f209893 100644 --- a/acapy_agent/admin/decorators/auth.py +++ b/acapy_agent/admin/decorators/auth.py @@ -1,6 +1,8 @@ """Authentication decorators for the admin API.""" import functools +import re +from typing import Optional, Pattern from aiohttp import web @@ -48,6 +50,8 @@ def tenant_authentication(handler): - check for a valid bearer token in the Autorization header if running in multi-tenant mode - check for a valid x-api-key header if running in single-tenant mode + - check if the base wallet has access to the requested path if running + in multi-tenant mode """ @functools.wraps(handler) @@ -61,11 +65,15 @@ async def tenant_auth(request): ) insecure_mode = bool(profile.settings.get("admin.admin_insecure_mode")) multitenant_enabled = profile.settings.get("multitenant.enabled") + base_wallet_allowed_route = _base_wallet_route_access( + profile.settings.get("multitenant.base_wallet_routes"), request.path + ) # CORS fix: allow OPTIONS method access to paths without a token if ( (multitenant_enabled and authorization_header) or (not multitenant_enabled and valid_key) + or (multitenant_enabled and valid_key and base_wallet_allowed_route) or insecure_mode or request.method == "OPTIONS" ): @@ -78,3 +86,25 @@ async def tenant_auth(request): ) return tenant_auth + + +def _base_wallet_route_access(additional_routes: str, request_path: str) -> bool: + """Check if request path matches additional routes.""" + additional_routes_pattern = _build_additional_routes_pattern(additional_routes) + return _matches_additional_routes(additional_routes_pattern, request_path) + + +def _build_additional_routes_pattern(pattern_string: str) -> Optional[Pattern]: + """Build pattern from space delimited list of paths.""" + # create array and add word boundary to avoid false positives + if pattern_string: + paths = pattern_string.split(" ") + return re.compile("^((?:)" + "|".join(paths) + ")$") + return None + + +def _matches_additional_routes(pattern: Pattern, path: str) -> bool: + """Matches request path to provided pattern.""" + if pattern and path: + return bool(pattern.match(path)) + return False diff --git a/acapy_agent/admin/server.py b/acapy_agent/admin/server.py index 84650ed11b..111d2f2a52 100644 --- a/acapy_agent/admin/server.py +++ b/acapy_agent/admin/server.py @@ -5,7 +5,7 @@ import re import warnings import weakref -from typing import Callable, Coroutine, Optional, Pattern, Sequence, cast +from typing import Callable, Coroutine, Optional import aiohttp_cors import jwt @@ -280,29 +280,6 @@ def __init__( self.websocket_queues = {} self.site = None self.multitenant_manager = context.inject_or(BaseMultitenantManager) - self._additional_route_pattern: Optional[Pattern] = None - - @property - def additional_routes_pattern(self) -> Optional[Pattern]: - """Pattern for configured additional routes to permit base wallet to access.""" - if self._additional_route_pattern: - return self._additional_route_pattern - - base_wallet_routes = self.context.settings.get("multitenant.base_wallet_routes") - base_wallet_routes = cast(Sequence[str], base_wallet_routes) - if base_wallet_routes: - self._additional_route_pattern = re.compile( - "^(?:" + "|".join(base_wallet_routes) + ")" - ) - return None - - def _matches_additional_routes(self, path: str) -> bool: - """Path matches additional_routes_pattern.""" - pattern = self.additional_routes_pattern - if pattern: - return bool(pattern.match(path)) - - return False async def make_application(self) -> web.Application: """Get the aiohttp application instance.""" diff --git a/acapy_agent/admin/tests/test_auth.py b/acapy_agent/admin/tests/test_auth.py index 1ebd0cd171..73680a1b00 100644 --- a/acapy_agent/admin/tests/test_auth.py +++ b/acapy_agent/admin/tests/test_auth.py @@ -133,3 +133,27 @@ async def test_multi_tenant_valid_auth_header(self): decor_func = tenant_authentication(self.decorated_handler) await decor_func(self.request) self.decorated_handler.assert_called_once_with(self.request) + + async def test_base_wallet_additional_route_allowed(self): + self.profile.settings["multitenant.base_wallet_routes"] = "/extra-route" + self.request = mock.MagicMock( + __getitem__=lambda _, k: self.request_dict[k], + headers={"x-api-key": "admin_api_key"}, + method="POST", + path="/extra-route", + ) + decor_func = tenant_authentication(self.decorated_handler) + await decor_func(self.request) + self.decorated_handler.assert_called_once_with(self.request) + + async def test_base_wallet_additional_route_denied(self): + self.profile.settings["multitenant.base_wallet_routes"] = "/extra-route" + self.request = mock.MagicMock( + __getitem__=lambda _, k: self.request_dict[k], + headers={"x-api-key": "admin_api_key"}, + method="POST", + path="/extra-route-wrong", + ) + decor_func = tenant_authentication(self.decorated_handler) + with self.assertRaises(web.HTTPUnauthorized): + await decor_func(self.request)