forked from openwallet-foundation/acapy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: DaevMithran <[email protected]>
- Loading branch information
1 parent
f499871
commit d442922
Showing
10 changed files
with
433 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
"""DID Indy Registry.""" | ||
|
||
import logging | ||
import re | ||
from typing import Optional, Pattern, Sequence | ||
|
||
from ....config.injection_context import InjectionContext | ||
from ....core.profile import Profile | ||
from ...base import BaseAnonCredsRegistrar, BaseAnonCredsResolver | ||
from ...models.anoncreds_cred_def import CredDef, CredDefResult, GetCredDefResult | ||
from ...models.anoncreds_revocation import ( | ||
GetRevListResult, | ||
GetRevRegDefResult, | ||
RevList, | ||
RevListResult, | ||
RevRegDef, | ||
RevRegDefResult, | ||
) | ||
from ...models.anoncreds_schema import AnonCredsSchema, GetSchemaResult, SchemaResult | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class DIDCheqdRegistry(BaseAnonCredsResolver, BaseAnonCredsRegistrar): | ||
"""DIDCheqdRegistry.""" | ||
|
||
def __init__(self): | ||
"""Initialize an instance. | ||
Args: | ||
None | ||
""" | ||
self._supported_identifiers_regex = re.compile(r"^did:cheqd:.*$") | ||
|
||
@property | ||
def supported_identifiers_regex(self) -> Pattern: | ||
"""Supported Identifiers regex.""" | ||
return self._supported_identifiers_regex | ||
# TODO: fix regex (too general) | ||
|
||
async def setup(self, context: InjectionContext): | ||
"""Setup.""" | ||
print("Successfully registered DIDCheqdRegistry") | ||
|
||
async def get_schema(self, profile: Profile, schema_id: str) -> GetSchemaResult: | ||
"""Get a schema from the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def register_schema( | ||
self, | ||
profile: Profile, | ||
schema: AnonCredsSchema, | ||
options: Optional[dict] = None, | ||
) -> SchemaResult: | ||
"""Register a schema on the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def get_credential_definition( | ||
self, profile: Profile, credential_definition_id: str | ||
) -> GetCredDefResult: | ||
"""Get a credential definition from the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def register_credential_definition( | ||
self, | ||
profile: Profile, | ||
schema: GetSchemaResult, | ||
credential_definition: CredDef, | ||
options: Optional[dict] = None, | ||
) -> CredDefResult: | ||
"""Register a credential definition on the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def get_revocation_registry_definition( | ||
self, profile: Profile, revocation_registry_id: str | ||
) -> GetRevRegDefResult: | ||
"""Get a revocation registry definition from the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def register_revocation_registry_definition( | ||
self, | ||
profile: Profile, | ||
revocation_registry_definition: RevRegDef, | ||
options: Optional[dict] = None, | ||
) -> RevRegDefResult: | ||
"""Register a revocation registry definition on the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def get_revocation_list( | ||
self, | ||
profile: Profile, | ||
revocation_registry_id: str, | ||
timestamp_from: Optional[int] = 0, | ||
timestamp_to: Optional[int] = None, | ||
) -> GetRevListResult: | ||
"""Get a revocation list from the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def register_revocation_list( | ||
self, | ||
profile: Profile, | ||
rev_reg_def: RevRegDef, | ||
rev_list: RevList, | ||
options: Optional[dict] = None, | ||
) -> RevListResult: | ||
"""Register a revocation list on the registry.""" | ||
raise NotImplementedError() | ||
|
||
async def update_revocation_list( | ||
self, | ||
profile: Profile, | ||
rev_reg_def: RevRegDef, | ||
prev_list: RevList, | ||
curr_list: RevList, | ||
revoked: Sequence[int], | ||
options: Optional[dict] = None, | ||
) -> RevListResult: | ||
"""Update a revocation list on the registry.""" | ||
raise NotImplementedError() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Routes for DID Cheqd Registry.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""DID manager for Cheqd.""" | ||
|
||
from aries_askar import AskarError, Key | ||
|
||
from .registrar import DidCheqdRegistrar | ||
from ...core.profile import Profile | ||
from ...wallet.askar import CATEGORY_DID | ||
from ...wallet.crypto import validate_seed | ||
from ...wallet.did_method import CHEQD, DIDMethods | ||
from ...wallet.did_parameters_validation import DIDParametersValidation | ||
from ...wallet.error import WalletError | ||
from ...wallet.key_type import ED25519, KeyType, KeyTypes | ||
from ...wallet.util import bytes_to_b58, b64_to_bytes, bytes_to_b64 | ||
|
||
|
||
class DidCheqdManager: | ||
"""DID manager for Cheqd.""" | ||
|
||
registrar: DidCheqdRegistrar | ||
|
||
def __init__(self, profile: Profile) -> None: | ||
"""Initialize the DID manager.""" | ||
self.profile = profile | ||
self.registrar = DidCheqdRegistrar() | ||
|
||
async def _get_key_type(self, key_type: str) -> KeyType: | ||
async with self.profile.session() as session: | ||
key_types = session.inject(KeyTypes) | ||
return key_types.from_key_type(key_type) or ED25519 | ||
|
||
def _create_key_pair(self, options: dict, key_type: KeyType) -> Key: | ||
seed = options.get("seed") | ||
if seed and not self.profile.settings.get("wallet.allow_insecure_seed"): | ||
raise WalletError("Insecure seed is not allowed") | ||
|
||
if seed: | ||
seed = validate_seed(seed) | ||
return Key.from_secret_bytes(key_type, seed) | ||
return Key.generate(key_type) | ||
|
||
async def register(self, options: dict) -> dict: | ||
"""Register a DID Cheqd.""" | ||
options = options or {} | ||
|
||
key_type = await self._get_key_type(options.get("key_type") or ED25519) | ||
did_validation = DIDParametersValidation(self.profile.inject(DIDMethods)) | ||
did_validation.validate_key_type(CHEQD, key_type) | ||
|
||
key_pair = self._create_key_pair(options, key_type.key_type) | ||
verkey_bytes = key_pair.get_public_bytes() | ||
verkey = bytes_to_b58(verkey_bytes) | ||
|
||
public_key_hex = verkey_bytes.hex() | ||
network = options.get("network") or "testnet" | ||
# generate payload | ||
did_document = await self.registrar.generate_did_doc(network, public_key_hex) | ||
did: str = did_document.get("id") or "" | ||
# request create did | ||
create_request_res = await self.registrar.create( | ||
{"didDocument": did_document, "network": network} | ||
) | ||
|
||
if create_request_res.get("state") == "action": | ||
job_id: str = create_request_res.get("jobId") | ||
sign_req: dict = create_request_res.get("signingRequest")[0] | ||
kid: str = sign_req.get("kid") | ||
payload_to_sign: str = sign_req.get("serializedPayload") | ||
# publish did | ||
publish_did_res = await self.registrar.create( | ||
{ | ||
"jobId": job_id, | ||
"network": network, | ||
"secret": { | ||
"signing_response": [ | ||
{ | ||
"kid": kid, | ||
"signature": bytes_to_b64( | ||
key_pair.key.sign_message( | ||
b64_to_bytes(payload_to_sign) | ||
) | ||
), | ||
} | ||
], | ||
}, | ||
} | ||
) | ||
if publish_did_res.get("state") != "finished": | ||
raise WalletError("Error registering DID") | ||
else: | ||
raise WalletError("Error registering DID") | ||
|
||
async with self.profile.session() as session: | ||
try: | ||
await session.handle.insert_key(verkey, key_pair) | ||
await session.handle.insert( | ||
CATEGORY_DID, | ||
did, | ||
value_json={ | ||
"did": did, | ||
"method": CHEQD.method_name, | ||
"verkey": verkey, | ||
"verkey_type": ED25519.key_type, | ||
"metadata": {}, | ||
}, | ||
tags={ | ||
"method": CHEQD.method_name, | ||
"verkey": verkey, | ||
"verkey_type": ED25519.key_type, | ||
}, | ||
) | ||
except AskarError as err: | ||
raise WalletError(f"Error registering DID: {err}") from err | ||
|
||
return { | ||
"did": did, | ||
"verkey": verkey, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
"""DID Registrar for Cheqd.""" | ||
|
||
from aiohttp import ClientSession | ||
|
||
|
||
class DidCheqdRegistrar: | ||
"""DID Registrar for Cheqd.""" | ||
|
||
DID_REGISTRAR_BASE_URL = "https://did-registrar.cheqd.net/1.0/" | ||
|
||
async def generate_did_doc(self, network: str, public_key_hex: str) -> dict | None: | ||
"""Generates a did_document with the provided params.""" | ||
async with ClientSession() as session: | ||
try: | ||
async with session.get( | ||
self.DID_REGISTRAR_BASE_URL + "did-document", | ||
params={ | ||
"verificationMethod": "Ed25519VerificationKey2020", | ||
"methodSpecificIdAlgo": "uuid", | ||
"network": network, | ||
"publicKeyHex": public_key_hex, | ||
}, | ||
) as response: | ||
if response.status == 200: | ||
return await response.json() | ||
finally: | ||
return None | ||
|
||
async def create(self, options: dict) -> dict | None: | ||
"""Request Create and Publish a DID Document.""" | ||
async with ClientSession() as session: | ||
try: | ||
async with session.post( | ||
self.DID_REGISTRAR_BASE_URL + "create", json=options | ||
) as response: | ||
if response.status == 200 or response.status == 201: | ||
return await response.json() | ||
finally: | ||
return None | ||
|
||
# async def update(self, options: dict) -> dict: | ||
# | ||
# async def deactivate(self, options: dict) -> dict: | ||
# | ||
# async def create_resource(self, options:dict) -> dict |
Oops, something went wrong.