From 2fb631bb92abc8a90fddd51302f481222b63c98b Mon Sep 17 00:00:00 2001 From: Erik Moravec Date: Thu, 11 May 2023 00:20:43 +0200 Subject: [PATCH 01/11] merged metadata layer to page --- config/example.config.py | 6 + sec_certs_page/common/bindings.py | 529 ++++++++++++++++++ sec_certs_page/static/base.css | 14 + .../templates/cc/entry/index.html.jinja2 | 9 +- .../templates/common/entry.html.jinja2 | 43 +- .../templates/fips/entry/index.html.jinja2 | 9 +- 6 files changed, 607 insertions(+), 3 deletions(-) create mode 100644 sec_certs_page/common/bindings.py diff --git a/config/example.config.py b/config/example.config.py index dd74b31e..4954260a 100644 --- a/config/example.config.py +++ b/config/example.config.py @@ -90,3 +90,9 @@ # Whether to anonymize the site (for review). ANONYMOUS = False ANONYMOUS_GIT = "" + +# List of URLs to fetch additional metadata binding files from. +BINDING_URLS = [] + +# Secret key to validate jwt tokens of downloaded binding files with. +BINDINGS_SECRET_KEY = "" \ No newline at end of file diff --git a/sec_certs_page/common/bindings.py b/sec_certs_page/common/bindings.py new file mode 100644 index 00000000..da8d9593 --- /dev/null +++ b/sec_certs_page/common/bindings.py @@ -0,0 +1,529 @@ +import logging +import os +import json +import datetime +import asyncio +import hashlib +import shutil +import base64 +from urllib.parse import urlparse, unquote +import urllib +from typing import Tuple, Union, Any +import requests +import jwt +import click +from flask import current_app +from pymongo.collection import Collection + + +from .. import mongo + +logger = logging.getLogger(__name__) +Document = Any + +class Binding(): + """ A binding data class + """ + + def __init__(self, cert_id: str, header_name: str): + """Binding object constructor + + Args: + cert_id (str): certificate id + header_name (str): header name + """ + self.cert_id: Union[str, int] = cert_id if not cert_id.isdigit() else int( + cert_id) + self.header_name: str = header_name + self.header_data: list[object] = [] + + def match_name(self, name: str) -> bool: + """ Function to match a string to the header_name + + Args: + name (str): Match candidate + + Returns: + bool: Whether the name matches to the binding obj header_name + """ + return name == urllib.parse.quote( + self.header_name) or name == self.header_name + + def update_headers_if_match(self, name: str, header: list[object]) -> None: + """ Update header_data if this objects header_name matches with given name + + Args: + name (str): Match candidate + header (list[object]): New header data + """ + if self.match_name(name): + self.header_data = header + for h in self.header_data: + if "metadata_preview" in h and verify_url(h["metadata_preview"]): + click.echo(f"Converting image to base64") + h["metadata_preview"] = get_base64_preview(h["metadata_preview"]) + + def __str__(self) -> str: + return self.__repr__() + + def __repr__(self) -> str: + return f"Binding {self.cert_id} to {self.header_name}" + + def __getitem__(self, key: str) -> Union[str, int, list[object]]: + return { + "cert_id": self.cert_id, + "header_name": self.header_name, + "header_data": self.header_data + }[key] + +def get_base64_preview(url: str) -> str: + """ Function to get the base64 encoded representation of an image at a given URL + + Args: + url (str): Preview pic location + + Returns: + str: Base64 encoded preview image + """ + response = urllib.request.urlopen(url) + file_content = response.read() + base64_content = base64.b64encode(file_content) + base64_string = base64_content.decode('utf-8') + return base64_string + +def url_to_local_path(url: str) -> str: + """ Transorms local URL to local path + + Args: + url (str): URL to transform + + Returns: + str: Transformed URL + """ + parsed_url = urlparse(url) + path = unquote(parsed_url.path.strip("/")) + return path + + +def github_url_to_api(url: str) -> str: + """ Transform the standard public github URL to a github API URL + + Args: + url (str): URL to transform + + Returns: + str: Transformed API URL + """ + parsed_url = urlparse(url) + api_url = f"https://api.github.com/repos{parsed_url.path}" + if parsed_url.query: + api_url += f"?{parsed_url.query}" + return api_url + + +def get_filename_from_url(url: str) -> str: + """ Parse out the filename from a given url + + Args: + url (str): URL to parse + + Returns: + str: Parsed out filename + """ + path_components = url.split("/") + filename = path_components[-1] + if not filename: + filename = path_components[-2] + return os.path.basename(filename) + + +def verify_timestamp(timestamp: str) -> bool: + """ Verifies timestamp validity + + Args: + timestamp (str): timestamp to validate + + Returns: + bool: Whether the given timestamp is valid + """ + try: + datetime.datetime.fromisoformat(timestamp) + return True + except ValueError: + return False + + +def verify_url(url: str) -> bool: + """ Verifies URL validity + + Args: + url (str): URL to validate + + Returns: + bool: Whether the given URL is valid + """ + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except ValueError: + return False + + +def verify_jwt(data: dict[str, Union[str, object]]) -> bool: + """ Verifies the given object jwt validity + + Args: + data (dict[str, Union[str, object]]): object to validate + + Returns: + bool: Whether the JWT in the data object is valid + """ + jwt_token = data['JWT'] + del data["JWT"] + encoded_jwt = jwt.encode(data, key=current_app.config["BINDINGS_SECRET_KEY"], algorithm="HS256") + if not jwt_token: + click.echo("JWT token is missing") + return False + if encoded_jwt != jwt_token: + return False + return True + + +async def download_file(file_url: str, file_path: str, sha256: str = "", verbose: bool = False) -> None: + """ Download a file from the given URL + + Args: + file_url (str): URL of the file to download + file_path (str): Path to where to download the file + sha256 (str, optional): Expected hash of the downloaded file. Defaults to "". + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + if verbose: + click.echo(f"Downloading {file_url}...") + response = requests.get(file_url, timeout=1000) + if sha256: + file_sha256 = hashlib.sha256(response.content).digest().hex() + if file_sha256 != sha256: + click.echo( + f"Metadata SHA256 verification failed for file {file_path.split('/')[-1]}") + try: + with open(file_path, "wb") as f: + f.write(response.content) + except: + click.echo(f"Cant open url from file {file_url}") + if verbose: + click.echo(f"Downloaded {file_url}.") + + +async def download_binding_files(url: str, output_dir: str, file_ext: str = ".json", verbose: bool = False) -> None: + """ Download all binding files from a given url. + + Args: + url (str): URL to download binding files from + output_dir (str): Path to write binding files to + file_ext (str): Optional file extension of binding files to be downloaded + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + if verbose: + click.echo(f"Downloading bindings files from {url}") + + if url.startswith("https://github.com"): + tasks = [] + repo_url = url.split("/tree")[0] + dir_path = url.split("/main")[1] + api_url = github_url_to_api(repo_url) + "/contents" + dir_path + + response = requests.get(api_url) + data = {} + if response.status_code == 200: + try: + data = response.json() + except json.decoder.JSONDecodeError as e: + click.echo(f"Error decoding JSON: {e}") + click.echo(f"Response content: {response.text.splitlines()[:5]}") + return + else: + click.echo( + f"Request failed with status code {response.status_code} - {response.reason}" + ) + files = [ + (file["name"], file["download_url"]) + for file in data + if file["name"].endswith(file_ext) + ] + + for file_name, file_url in files: + tasks.append(asyncio.create_task( + download_file( + file_url, + os.path.join(output_dir, file_name), + verbose=verbose))) + + await asyncio.gather(*tasks) + if verbose: + click.echo( + f"Downloaded {len(tasks)} {file_ext} files from {url} to {output_dir}.") + + elif os.path.isdir(url_to_local_path(url)): + path = url_to_local_path(url) + for file_name in os.listdir(path): + if verbose: + click.echo(f"copying local file: {file_name}") + if file_name.endswith(f".{file_ext}"): + shutil.copy(os.path.join(path, file_name), os.path.join(output_dir, file_name)) + if verbose: + click.echo( + f"Copied {len(os.listdir(path))} {file_ext} files from {path} to {output_dir}.") + else: + click.echo(f"Error: Unsupported URL format of: {url}") + + +def download_all_bindings_sync( + urls: list[str], + output_dir: str, + verbose: bool = False) -> None: + """ Download binding files from all given URLs + + Args: + urls (list[str]): list of URLs to download from + output_dir (str): Path to write the binding files + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + os.makedirs(output_dir, exist_ok=True) + + for url in urls: + asyncio.run( + download_binding_files( + url, + output_dir, + verbose=verbose)) + if verbose: + click.echo(f"Download completed for {url}.\n") + + +async def process_binding_files(bindings_dir: str, download_dir: str, download_headers: bool = True, verbose: bool = True) -> list[Binding]: + """ Processing of all binding files in given directory + + Args: + bindings_dir (str): Directory where binding files are located + download_dir (str): Directory where binding files are to be downloaded + download_headers (bool, optional): Whether to download the headers files. Defaults to True. + verbose (bool, optional): _description_. Defaults to True. + + Returns: + list[Binding]: list of binding Objects identified in the binding files + """ + os.makedirs(download_dir, exist_ok=True) + tasks = [] + bindings = [] + for filename in os.listdir(bindings_dir): + if filename.endswith(".json"): + if verbose: + click.echo(f"\nProcessing binding file: {filename}") + filepath = os.path.join(bindings_dir, filename) + with open(filepath, "r", encoding="utf8") as f: + data = json.load(f) + if not verify_jwt(data): + click.echo(f"JWT verification failed processing binding file {filename}") + continue + for item in data["data"]: + timestamp = item["timestamp"] + if not verify_timestamp(timestamp): + click.echo(f"Invalid timestamp for file: {filename}") + continue + url = item["metadata_header_url"] + header_file_name = get_filename_from_url( + urllib.parse.unquote(url)) + if not verify_url(url): + click.echo(f"Invalid metadata_header_url for file: {url}") + continue + if not download_headers: + continue + + for cert_id in item["certificate_ids"]: + bindings.append(Binding(cert_id, header_file_name)) + if os.path.exists( + url_to_local_path(url)) and download_headers: + click.echo(f"Copying header file from: {url_to_local_path(url)}") + shutil.copy(url_to_local_path(url), os.path.join( + download_dir, header_file_name + )) + continue + tasks.append(asyncio.create_task( + download_file(url, os.path.join( + download_dir, header_file_name + ), verbose=verbose))) + + click.echo(f"Identified {len(bindings)} bindings") + click.echo(f'\nDownloading {len(tasks)} header files') + await asyncio.gather(*tasks) + return bindings + + +async def process_header_files(headers_dir: str, download_dir: str, bindings: list[Binding], download_metadata: bool = True, verbose: bool = False) -> None: + """Function to process all JSON header files in a given directory + + Args: + headers_dir (str): Path to the directory containing the JSON header files + download_dir (str): Where to download the header files to + bindings (list[Binding]): Bindings objects to be enriched with header_data + download_metadata (bool, optional): Whether to download the metadata files. Defaults to True. + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + if verbose: + click.echo(f"Processing header files in {headers_dir}") + os.makedirs(download_dir, exist_ok=True) + tasks = [] + for file_name in os.listdir(headers_dir): + if file_name.endswith(".json"): + file_path: str = os.path.join(headers_dir, file_name) + with open(file_path, "r", encoding="utf8") as f: + if verbose: + click.echo(f"Processing {file_name}") + json_obj = json.load(f) + if not verify_jwt(json_obj): + click.echo(f"JWT verification failed for header file {file_name}") + continue + for data_obj in json_obj["data"]: + if not verify_timestamp(data_obj["timestamp"]): + click.echo(f"Invalid timestamp in file {file_name}") + continue + if not verify_url(data_obj["metadata_url"]): + click.echo(f"Invalid URL in file {file_name}") + continue + download_destination = os.path.join( + download_dir, data_obj["metadata_url"].split("/")[-1]) + if download_metadata: + tasks.append( + asyncio.create_task( + download_file( + data_obj["metadata_url"], + download_destination, + data_obj["metadata_sha256"], verbose))) + for binding in bindings: + binding.update_headers_if_match( + file_name, json_obj["data"]) + + click.echo(f"Downloading {len(tasks)} metadata files") + await asyncio.gather(*tasks) + + +def get_instance_paths() -> Tuple[str, str, str]: + """ Get instance paths for bindings, headers and metadata files + + Returns: + Tuple[str, str, str]: The three bindings, headers and metadata files directories paths + """ + instance_path = current_app.instance_path + return ( + os.path.join(instance_path, "bindings"), + os.path.join(instance_path, "headers"), + os.path.join(instance_path, "metadata") + ) + + +def process_and_download_all( + download_bindings: bool = True, + download_headers: bool = True, + download_metadata: bool = True, + verbose: bool = False) -> list[Binding]: + """ Encapsulates the function calls to download all bindings, process them and to process all header files + + Args: + download_bindings (bool, optional): Whether to download binding files. Defaults to True. + download_headers (bool, optional): Whether to download header files. Defaults to True. + download_metadata (bool, optional): Whether to download metadata files. Defaults to True. + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + + Returns: + list[Binding]: _description_ + """ + bindings_path, headers_path, metadata_path = get_instance_paths() + binding_urls = current_app.config["BINDING_URLS"] + if download_bindings: + download_all_bindings_sync( + binding_urls, bindings_path, verbose=verbose) + bindings = asyncio.run(process_binding_files( + bindings_path, headers_path, download_headers, verbose=verbose)) + asyncio.run( + process_header_files( + headers_path, + metadata_path, + bindings, + download_metadata, + verbose=verbose)) + return bindings + + +def purge_headers_data(collection: Collection[Document]) -> None: + """ Purges all existing headers_data from the MongoDB collection + + Args: + collection (Collection[Document]): MongoDB collection to purge headers from + """ + collection.update_many( + {'metadata_headers': {'$exists': True}}, + {'$unset': {'metadata_headers': ''}} + ) + + +def update_one( + collection: Collection[Document], + binding: Binding, + verbose: bool = False) -> None: + """ Updates a record in mongoDB with new header_data + + Args: + collection (Collection[Document]): MongoDB collection to update the records in + binding (Binding): An object representing the binding - cert_id and header_data + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + cert_id = binding["cert_id"] if "NIST" not in binding["cert_id"] else int(binding["cert_id"].strip("NIST-")) + cert = collection.find_one({"cert_id": cert_id}) + if cert is not None: + if verbose: + click.echo( + f'Found Cert {cert_id}, updating it with {len(binding["header_data"])} new headers.') + if "metadata_headers" not in cert: + if verbose: + click.echo("Creating metadata headers") + collection.update_one({"cert_id": cert_id}, { + "$set": {"metadata_headers": [binding["header_data"]]}}) + else: + if verbose: + click.echo("Pushing to metadata headers") + collection.update_one({"cert_id": cert_id}, {"$push": { + "metadata_headers": binding["header_data"]}}) + + +def update_bindings( + download_bindings: bool = True, + download_headers: bool = True, + download_metadata: bool = False, + purge: bool = True, verbose: bool = False) -> None: + """ Function to be used as a task in seccerts encapsulating all necessary function calls + + Args: + download_bindings (bool, optional): Whether to download binding files. Defaults to True. + download_headers (bool, optional): Whether to download header files. Defaults to True. + download_metadata (bool, optional): Whether to download metadata files. Defaults to True. + purge (bool, optional): Whether to purge the data before update. Defaults to True. + verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. + """ + bindings = process_and_download_all( + download_bindings, + download_headers, + download_metadata, + verbose=verbose) + click.echo( + "\033[32m" + + "Finished processing bindings, updating mongoDB collections now" + + "\033[0m") + cc = mongo.db.get_collection("cc") + fips = mongo.db.get_collection("fips") + if purge: + purge_headers_data(fips) + + for binding in bindings: + update_one(cc, binding, verbose=verbose) + update_one(fips, binding, verbose=verbose) \ No newline at end of file diff --git a/sec_certs_page/static/base.css b/sec_certs_page/static/base.css index cef0046f..3d49f9c9 100644 --- a/sec_certs_page/static/base.css +++ b/sec_certs_page/static/base.css @@ -228,4 +228,18 @@ body { .callout-dark { background-color: var(--bs-dark-bg-subtle); border-left: 0.25rem solid var(--bs-dark-border-subtle); +} + +.metadata-header-table { + border-collapse: collapse; + table-layout: fixed; +} + +.metadata-header-table-row { + border: 1px solid black; +} + +.metadata-header-table-cell { + border: 1px solid black; + word-break: break-word; } \ No newline at end of file diff --git a/sec_certs_page/templates/cc/entry/index.html.jinja2 b/sec_certs_page/templates/cc/entry/index.html.jinja2 index ac5a0668..a45d1b62 100644 --- a/sec_certs_page/templates/cc/entry/index.html.jinja2 +++ b/sec_certs_page/templates/cc/entry/index.html.jinja2 @@ -2,7 +2,7 @@ {% set crumbs = True %} {% set search = True %} {% from "cc/utils.html.jinja2" import cc_keywords, security_keywords, render_status, render_sars, render_sfrs, render_category, render_eal, render_frontpage %} -{% from "common/entry.html.jinja2" import cryptography_keywords, device_keywords, security_keywords, other_keywords, render_local_files, render_pdf_meta, render_cves, render_references %} +{% from "common/entry.html.jinja2" import cryptography_keywords, device_keywords, security_keywords, other_keywords, render_local_files, render_pdf_meta, render_cves, render_referencess, metadata_binding_card %} {% block title %} {{ cert["name"]|default("", true) + " | seccerts.org" }} @@ -446,6 +446,13 @@ {% endif %} </div> + {% if cert["metadata_headers"] %} + <h3 class="mt-3">Additional Metadata</h3> + {% for header in cert["metadata_headers"] %} + {{ metadata_binding_card("cc", loop.index, header) }} + {% endfor %} + {% endif %} + <div class="mt-5 mb-5"> <h2 class="anchor" id="updates">Updates <span style="font-size: 0.5em; vertical-align: middle" tabindex="0" class="badge bg-secondary" diff --git a/sec_certs_page/templates/common/entry.html.jinja2 b/sec_certs_page/templates/common/entry.html.jinja2 index 5d2c18bf..42d49642 100644 --- a/sec_certs_page/templates/common/entry.html.jinja2 +++ b/sec_certs_page/templates/common/entry.html.jinja2 @@ -251,4 +251,45 @@ {% macro other_keywords(keyword_scan, doc_prefix, hidden=[], map_funcs={}) -%} {{ keywords_card(keyword_scan, doc_prefix, "other", "Other", {"Standards": "standard_id", "Technical reports": "technical_report_id"}, hidden, map_funcs) }} -{%- endmacro %} \ No newline at end of file +{%- endmacro %} + +{% macro metadata_binding_card(doc_prefix, index, data) %} + {% for measurement in data %} + {% set header_label = measurement["measurement_tool"] + " data by \"" + measurement["measurement_author"] + "\" on " + measurement["timestamp"] %} + <div class="card mt-3 mb-3"> + <div class="card-header" id="{{ doc_prefix }}-{{ index }}-{{ loop.index }}-head"> + <h4 class="mb-0"> + <button class="btn btn-link" data-bs-toggle="collapse" + data-bs-target="#{{ doc_prefix }}-{{ index }}-{{ loop.index }}-card" aria-expanded="false" + aria-controls="{{ doc_prefix }}-{{ index }}-{{ loop.index }}-card"> + {{ header_label }} + <i class="fas fa-chevron-down"></i> + <i class="fas fa-chevron-up"></i> + </button> + </h4> + </div> + + <div id="{{ doc_prefix }}-{{ index }}-{{ loop.index }}-card" class="collapse" + aria-labelledby="{{ doc_prefix }}-{{ index }}-{{ loop.index }}-head"> + <div class="card-body text-dark"> + <table class="metadata-header-table"> + {% for key, value in measurement.items() %} + <tr class="metadata-header-table-row mt-1"> + <td class="metadata-header-table-cell" style="width: 20%">{{ key }}</td> + {% if key == 'metadata_url' %} + <td class="metadata-header-table-cell"><a href="{{ value }}" class="link">{{ value }}</a></td> + {% elif key == 'metadata_preview' %} + <td class="metadata-header-table-cell"> + <img src='data:image/jpeg;base64, {{ value }}' style='max-width:100%'> + </td> + {% else %} + <td class="metadata-header-table-cell">{{ value }}</td> + {% endif %} + </tr> + {% endfor %} + </table> + </div> + </div> + </div> + {% endfor %} +{% endmacro %} \ No newline at end of file diff --git a/sec_certs_page/templates/fips/entry/index.html.jinja2 b/sec_certs_page/templates/fips/entry/index.html.jinja2 index 2e1eb635..4919e7dd 100644 --- a/sec_certs_page/templates/fips/entry/index.html.jinja2 +++ b/sec_certs_page/templates/fips/entry/index.html.jinja2 @@ -2,7 +2,7 @@ {% set crumbs = True %} {% set search = True %} {% from "fips/utils.html.jinja2" import render_status, cryptography_keywords, device_keywords, security_keywords, other_keywords, render_type %} -{% from "common/entry.html.jinja2" import render_local_files, render_pdf_meta, render_cves, render_references %} +{% from "common/entry.html.jinja2" import render_local_files, render_pdf_meta, render_cves, render_references, metadata_binding_card %} {% block title %} <title> {{ cert["web_data"]["module_name"]|default("", true) + " | seccerts.org" }} @@ -336,6 +336,13 @@ {% endif %} </div> + {% if cert["metadata_headers"] %} + <h3 class="mt-3">Additional Metadata</h3> + {% for header in cert["metadata_headers"] %} + {{ metadata_binding_card("fips", loop.index, header) }} + {% endfor %} + {% endif %} + <div class="mt-5 mb-5"> <h2 class="anchor" id="updates">Updates <span style="font-size: 0.5em; vertical-align: middle" tabindex="0" class="badge bg-secondary" From c94992d59078933cd4e6c2623b8b0c8124bab0fc Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 00:22:13 +0200 Subject: [PATCH 02/11] added pyjwt to requirements --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 3e912bef..0fdd9778 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,7 @@ "click", "requests", "feedgen", + "pyjwt", ] [project.optional-dependencies] From 0e4878d65c6854a6028c71dfa99bc3249e6eb788 Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:14:58 +0200 Subject: [PATCH 03/11] unification of get requests and hotlink image instead of base64 conversion --- sec_certs_page/common/bindings.py | 95 ++++++++----------- .../templates/common/entry.html.jinja2 | 4 +- 2 files changed, 40 insertions(+), 59 deletions(-) diff --git a/sec_certs_page/common/bindings.py b/sec_certs_page/common/bindings.py index da8d9593..a6b3b671 100644 --- a/sec_certs_page/common/bindings.py +++ b/sec_certs_page/common/bindings.py @@ -1,12 +1,9 @@ -import logging import os import json import datetime import asyncio import hashlib import shutil -import base64 -from urllib.parse import urlparse, unquote import urllib from typing import Tuple, Union, Any import requests @@ -18,9 +15,9 @@ from .. import mongo -logger = logging.getLogger(__name__) Document = Any + class Binding(): """ A binding data class """ @@ -58,10 +55,6 @@ def update_headers_if_match(self, name: str, header: list[object]) -> None: """ if self.match_name(name): self.header_data = header - for h in self.header_data: - if "metadata_preview" in h and verify_url(h["metadata_preview"]): - click.echo(f"Converting image to base64") - h["metadata_preview"] = get_base64_preview(h["metadata_preview"]) def __str__(self) -> str: return self.__repr__() @@ -76,20 +69,6 @@ def __getitem__(self, key: str) -> Union[str, int, list[object]]: "header_data": self.header_data }[key] -def get_base64_preview(url: str) -> str: - """ Function to get the base64 encoded representation of an image at a given URL - - Args: - url (str): Preview pic location - - Returns: - str: Base64 encoded preview image - """ - response = urllib.request.urlopen(url) - file_content = response.read() - base64_content = base64.b64encode(file_content) - base64_string = base64_content.decode('utf-8') - return base64_string def url_to_local_path(url: str) -> str: """ Transorms local URL to local path @@ -100,8 +79,8 @@ def url_to_local_path(url: str) -> str: Returns: str: Transformed URL """ - parsed_url = urlparse(url) - path = unquote(parsed_url.path.strip("/")) + parsed_url = urllib.parse.urlparse(url) + path = urllib.parse.unquote(parsed_url.path.strip("/")) return path @@ -114,7 +93,7 @@ def github_url_to_api(url: str) -> str: Returns: str: Transformed API URL """ - parsed_url = urlparse(url) + parsed_url = urllib.parse.urlparse(url) api_url = f"https://api.github.com/repos{parsed_url.path}" if parsed_url.query: api_url += f"?{parsed_url.query}" @@ -163,7 +142,7 @@ def verify_url(url: str) -> bool: bool: Whether the given URL is valid """ try: - result = urlparse(url) + result = urllib.parse.urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False @@ -180,7 +159,8 @@ def verify_jwt(data: dict[str, Union[str, object]]) -> bool: """ jwt_token = data['JWT'] del data["JWT"] - encoded_jwt = jwt.encode(data, key=current_app.config["BINDINGS_SECRET_KEY"], algorithm="HS256") + encoded_jwt = jwt.encode( + data, key=current_app.config["BINDINGS_SECRET_KEY"], algorithm="HS256") if not jwt_token: click.echo("JWT token is missing") return False @@ -240,7 +220,8 @@ async def download_binding_files(url: str, output_dir: str, file_ext: str = ".js data = response.json() except json.decoder.JSONDecodeError as e: click.echo(f"Error decoding JSON: {e}") - click.echo(f"Response content: {response.text.splitlines()[:5]}") + click.echo( + f"Response content: {response.text.splitlines()[:5]}") return else: click.echo( @@ -270,7 +251,8 @@ async def download_binding_files(url: str, output_dir: str, file_ext: str = ".js if verbose: click.echo(f"copying local file: {file_name}") if file_name.endswith(f".{file_ext}"): - shutil.copy(os.path.join(path, file_name), os.path.join(output_dir, file_name)) + shutil.copy(os.path.join(path, file_name), + os.path.join(output_dir, file_name)) if verbose: click.echo( f"Copied {len(os.listdir(path))} {file_ext} files from {path} to {output_dir}.") @@ -324,7 +306,8 @@ async def process_binding_files(bindings_dir: str, download_dir: str, download_h with open(filepath, "r", encoding="utf8") as f: data = json.load(f) if not verify_jwt(data): - click.echo(f"JWT verification failed processing binding file {filename}") + click.echo( + f"JWT verification failed processing binding file {filename}") continue for item in data["data"]: timestamp = item["timestamp"] @@ -335,7 +318,8 @@ async def process_binding_files(bindings_dir: str, download_dir: str, download_h header_file_name = get_filename_from_url( urllib.parse.unquote(url)) if not verify_url(url): - click.echo(f"Invalid metadata_header_url for file: {url}") + click.echo( + f"Invalid metadata_header_url for file: {url}") continue if not download_headers: continue @@ -344,15 +328,16 @@ async def process_binding_files(bindings_dir: str, download_dir: str, download_h bindings.append(Binding(cert_id, header_file_name)) if os.path.exists( url_to_local_path(url)) and download_headers: - click.echo(f"Copying header file from: {url_to_local_path(url)}") + click.echo( + f"Copying header file from: {url_to_local_path(url)}") shutil.copy(url_to_local_path(url), os.path.join( - download_dir, header_file_name - )) + download_dir, header_file_name + )) continue tasks.append(asyncio.create_task( download_file(url, os.path.join( - download_dir, header_file_name - ), verbose=verbose))) + download_dir, header_file_name + ), verbose=verbose))) click.echo(f"Identified {len(bindings)} bindings") click.echo(f'\nDownloading {len(tasks)} header files') @@ -382,7 +367,8 @@ async def process_header_files(headers_dir: str, download_dir: str, bindings: li click.echo(f"Processing {file_name}") json_obj = json.load(f) if not verify_jwt(json_obj): - click.echo(f"JWT verification failed for header file {file_name}") + click.echo( + f"JWT verification failed for header file {file_name}") continue for data_obj in json_obj["data"]: if not verify_timestamp(data_obj["timestamp"]): @@ -461,10 +447,8 @@ def purge_headers_data(collection: Collection[Document]) -> None: Args: collection (Collection[Document]): MongoDB collection to purge headers from """ - collection.update_many( - {'metadata_headers': {'$exists': True}}, - {'$unset': {'metadata_headers': ''}} - ) + result = collection.delete_many({}) + print(f"Deleted {result} metadata bindings") def update_one( @@ -478,22 +462,21 @@ def update_one( binding (Binding): An object representing the binding - cert_id and header_data verbose (bool, optional): Whether to click.echo out extra information. Defaults to False. """ - cert_id = binding["cert_id"] if "NIST" not in binding["cert_id"] else int(binding["cert_id"].strip("NIST-")) + cert_id = binding["cert_id"] if "NIST" not in binding["cert_id"] else int( + binding["cert_id"].strip("NIST-")) cert = collection.find_one({"cert_id": cert_id}) if cert is not None: if verbose: click.echo( f'Found Cert {cert_id}, updating it with {len(binding["header_data"])} new headers.') - if "metadata_headers" not in cert: - if verbose: - click.echo("Creating metadata headers") - collection.update_one({"cert_id": cert_id}, { - "$set": {"metadata_headers": [binding["header_data"]]}}) - else: - if verbose: - click.echo("Pushing to metadata headers") - collection.update_one({"cert_id": cert_id}, {"$push": { - "metadata_headers": binding["header_data"]}}) + click.echo("Pushing to metadata headers") + collection.update_one({"cert_id": cert_id}, {"$push": { + "metadata_headers": binding["header_data"]}}) + else: + if verbose: + click.echo("Inserting new metadata binding") + collection.insert_one( + {"cert_id": cert_id, "metadata_headers": binding["metadata_header"]}) def update_bindings( @@ -519,11 +502,9 @@ def update_bindings( "\033[32m" + "Finished processing bindings, updating mongoDB collections now" + "\033[0m") - cc = mongo.db.get_collection("cc") - fips = mongo.db.get_collection("fips") + collection = mongo.db["metadata_bindings"] if purge: - purge_headers_data(fips) + purge_headers_data(collection) for binding in bindings: - update_one(cc, binding, verbose=verbose) - update_one(fips, binding, verbose=verbose) \ No newline at end of file + update_one(collection, binding, verbose=verbose) diff --git a/sec_certs_page/templates/common/entry.html.jinja2 b/sec_certs_page/templates/common/entry.html.jinja2 index 42d49642..dbdbb769 100644 --- a/sec_certs_page/templates/common/entry.html.jinja2 +++ b/sec_certs_page/templates/common/entry.html.jinja2 @@ -277,10 +277,10 @@ <tr class="metadata-header-table-row mt-1"> <td class="metadata-header-table-cell" style="width: 20%">{{ key }}</td> {% if key == 'metadata_url' %} - <td class="metadata-header-table-cell"><a href="{{ value }}" class="link">{{ value }}</a></td> + <td class="metadata-header-table-cell"><a href="{{ value }}" class="link" target="_blank" rel="noopener">{{ value }}</a></td> {% elif key == 'metadata_preview' %} <td class="metadata-header-table-cell"> - <img src='data:image/jpeg;base64, {{ value }}' style='max-width:100%'> + <img src='{{ value }}' style='max-width:100%'> </td> {% else %} <td class="metadata-header-table-cell">{{ value }}</td> From 848755f56153450348a2ba2ea0bc61c5b52c5909 Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:15:44 +0200 Subject: [PATCH 04/11] added metadata headers data from another collection to data given cert entry pages --- sec_certs_page/cc/views.py | 4 ++++ sec_certs_page/fips/views.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/sec_certs_page/cc/views.py b/sec_certs_page/cc/views.py index 2b9e6d32..ad96242c 100644 --- a/sec_certs_page/cc/views.py +++ b/sec_certs_page/cc/views.py @@ -299,6 +299,10 @@ def entry(hashid): cpes = list(map(load, mongo.db.cpe.find({"_id": {"$in": list(doc["heuristics"]["cpe_matches"])}}))) else: cpes = [] + with sentry_sdk.start_span(op="mongo", description="Find metadata bindings"): + found = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) + if found: + doc["metadata_headers"] = found["metadata_headers"] with sentry_sdk.start_span(op="files", description="Find local files"): local_files = entry_download_files(hashid, current_app.config["DATASET_PATH_CC_DIR"]) with sentry_sdk.start_span(op="network", description="Find network"): diff --git a/sec_certs_page/fips/views.py b/sec_certs_page/fips/views.py index 50e6cd69..99b7389b 100644 --- a/sec_certs_page/fips/views.py +++ b/sec_certs_page/fips/views.py @@ -379,6 +379,10 @@ def entry(hashid): cpes = list(map(load, mongo.db.cpe.find({"_id": {"$in": list(doc["heuristics"]["cpe_matches"])}}))) else: cpes = [] + with sentry_sdk.start_span(op="mongo", description="Find metadata bindings"): + found = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) + if found: + doc["metadata_headers"] = found["metadata_headers"] with sentry_sdk.start_span(op="files", description="Find local files"): local_files = entry_download_files( hashid, current_app.config["DATASET_PATH_FIPS_DIR"], documents=("target",) From b9c3afd21b9246a665cb0d4197afba588e7db08b Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:24:17 +0200 Subject: [PATCH 05/11] fixed metadata headers key --- sec_certs_page/common/bindings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sec_certs_page/common/bindings.py b/sec_certs_page/common/bindings.py index a6b3b671..962b402e 100644 --- a/sec_certs_page/common/bindings.py +++ b/sec_certs_page/common/bindings.py @@ -476,7 +476,7 @@ def update_one( if verbose: click.echo("Inserting new metadata binding") collection.insert_one( - {"cert_id": cert_id, "metadata_headers": binding["metadata_header"]}) + {"cert_id": cert_id, "metadata_headers": binding["header_data"]}) def update_bindings( From d6e44aa28856f176bfc4e3776f61012cad816ea7 Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:33:18 +0200 Subject: [PATCH 06/11] fixed data to views --- sec_certs_page/cc/views.py | 7 ++++--- sec_certs_page/common/bindings.py | 2 +- sec_certs_page/fips/views.py | 7 ++++--- sec_certs_page/templates/cc/entry/index.html.jinja2 | 4 ++-- sec_certs_page/templates/fips/entry/index.html.jinja2 | 4 ++-- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/sec_certs_page/cc/views.py b/sec_certs_page/cc/views.py index ad96242c..b2630f59 100644 --- a/sec_certs_page/cc/views.py +++ b/sec_certs_page/cc/views.py @@ -300,9 +300,9 @@ def entry(hashid): else: cpes = [] with sentry_sdk.start_span(op="mongo", description="Find metadata bindings"): - found = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) - if found: - doc["metadata_headers"] = found["metadata_headers"] + additional_metadata = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) + if not additional_metadata: + additional_metadata = [] with sentry_sdk.start_span(op="files", description="Find local files"): local_files = entry_download_files(hashid, current_app.config["DATASET_PATH_CC_DIR"]) with sentry_sdk.start_span(op="network", description="Find network"): @@ -321,6 +321,7 @@ def entry(hashid): local_files=local_files, json=StorageFormat(raw_doc).to_json_mapping(), network=cert_network, + additional_metadata=additional_metadata, ) else: return abort(404) diff --git a/sec_certs_page/common/bindings.py b/sec_certs_page/common/bindings.py index 962b402e..bfa1cc08 100644 --- a/sec_certs_page/common/bindings.py +++ b/sec_certs_page/common/bindings.py @@ -448,7 +448,7 @@ def purge_headers_data(collection: Collection[Document]) -> None: collection (Collection[Document]): MongoDB collection to purge headers from """ result = collection.delete_many({}) - print(f"Deleted {result} metadata bindings") + print(f"Deleted {result.deleted_count} metadata bindings") def update_one( diff --git a/sec_certs_page/fips/views.py b/sec_certs_page/fips/views.py index 99b7389b..86856261 100644 --- a/sec_certs_page/fips/views.py +++ b/sec_certs_page/fips/views.py @@ -380,9 +380,9 @@ def entry(hashid): else: cpes = [] with sentry_sdk.start_span(op="mongo", description="Find metadata bindings"): - found = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) - if found: - doc["metadata_headers"] = found["metadata_headers"] + additional_metadata = mongo.db.metadata_bindings.find_one({"cert_id" : doc["cert_id"]}) + if not additional_metadata: + additional_metadata = [] with sentry_sdk.start_span(op="files", description="Find local files"): local_files = entry_download_files( hashid, current_app.config["DATASET_PATH_FIPS_DIR"], documents=("target",) @@ -399,6 +399,7 @@ def entry(hashid): local_files=local_files, json=StorageFormat(raw_doc).to_json_mapping(), policy_link=constants.FIPS_SP_URL.format(doc["cert_id"]), + additional_metadata=additional_metadata, ) else: return abort(404) diff --git a/sec_certs_page/templates/cc/entry/index.html.jinja2 b/sec_certs_page/templates/cc/entry/index.html.jinja2 index a45d1b62..ba3ffc68 100644 --- a/sec_certs_page/templates/cc/entry/index.html.jinja2 +++ b/sec_certs_page/templates/cc/entry/index.html.jinja2 @@ -446,9 +446,9 @@ {% endif %} </div> - {% if cert["metadata_headers"] %} + {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in cert["metadata_headers"] %} + {% for header in additional_metadata %} {{ metadata_binding_card("cc", loop.index, header) }} {% endfor %} {% endif %} diff --git a/sec_certs_page/templates/fips/entry/index.html.jinja2 b/sec_certs_page/templates/fips/entry/index.html.jinja2 index 4919e7dd..6fe1311c 100644 --- a/sec_certs_page/templates/fips/entry/index.html.jinja2 +++ b/sec_certs_page/templates/fips/entry/index.html.jinja2 @@ -336,9 +336,9 @@ {% endif %} </div> - {% if cert["metadata_headers"] %} + {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in cert["metadata_headers"] %} + {% for header in additional_metadata %} {{ metadata_binding_card("fips", loop.index, header) }} {% endfor %} {% endif %} From 657a46fa8cef101963e55771cea4b116656152bb Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:39:03 +0200 Subject: [PATCH 07/11] fixed additional metadata access --- sec_certs_page/templates/cc/entry/index.html.jinja2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sec_certs_page/templates/cc/entry/index.html.jinja2 b/sec_certs_page/templates/cc/entry/index.html.jinja2 index ba3ffc68..5a6e740c 100644 --- a/sec_certs_page/templates/cc/entry/index.html.jinja2 +++ b/sec_certs_page/templates/cc/entry/index.html.jinja2 @@ -448,7 +448,7 @@ {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in additional_metadata %} + {% for header in additional_metadata["metadata_headers"] %} {{ metadata_binding_card("cc", loop.index, header) }} {% endfor %} {% endif %} From 3a5bc40da4751b8a32c1ff4e5c42994a76d4801e Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Thu, 11 May 2023 03:40:48 +0200 Subject: [PATCH 08/11] fixed metadata access --- sec_certs_page/templates/fips/entry/index.html.jinja2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sec_certs_page/templates/fips/entry/index.html.jinja2 b/sec_certs_page/templates/fips/entry/index.html.jinja2 index 6fe1311c..b4ae555d 100644 --- a/sec_certs_page/templates/fips/entry/index.html.jinja2 +++ b/sec_certs_page/templates/fips/entry/index.html.jinja2 @@ -338,7 +338,7 @@ {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in additional_metadata %} + {% for header in additional_metadata["metadata_headers"] %} {{ metadata_binding_card("fips", loop.index, header) }} {% endfor %} {% endif %} From 82939705e268aa446ffe0e8017841e0f28fa0d6a Mon Sep 17 00:00:00 2001 From: xmoravec <xmoravec@mail.muni.com> Date: Thu, 11 May 2023 03:48:41 +0200 Subject: [PATCH 09/11] additional metadata render fix --- sec_certs_page/templates/cc/entry/index.html.jinja2 | 4 +--- sec_certs_page/templates/common/entry.html.jinja2 | 2 +- sec_certs_page/templates/fips/entry/index.html.jinja2 | 4 +--- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sec_certs_page/templates/cc/entry/index.html.jinja2 b/sec_certs_page/templates/cc/entry/index.html.jinja2 index 5a6e740c..f9e2b922 100644 --- a/sec_certs_page/templates/cc/entry/index.html.jinja2 +++ b/sec_certs_page/templates/cc/entry/index.html.jinja2 @@ -448,9 +448,7 @@ {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in additional_metadata["metadata_headers"] %} - {{ metadata_binding_card("cc", loop.index, header) }} - {% endfor %} + {{ metadata_binding_card("cc", additional_metadata["metadata_headers"]) }} {% endif %} <div class="mt-5 mb-5"> diff --git a/sec_certs_page/templates/common/entry.html.jinja2 b/sec_certs_page/templates/common/entry.html.jinja2 index dbdbb769..a19440eb 100644 --- a/sec_certs_page/templates/common/entry.html.jinja2 +++ b/sec_certs_page/templates/common/entry.html.jinja2 @@ -253,7 +253,7 @@ "Technical reports": "technical_report_id"}, hidden, map_funcs) }} {%- endmacro %} -{% macro metadata_binding_card(doc_prefix, index, data) %} +{% macro metadata_binding_card(doc_prefix, data) %} {% for measurement in data %} {% set header_label = measurement["measurement_tool"] + " data by \"" + measurement["measurement_author"] + "\" on " + measurement["timestamp"] %} <div class="card mt-3 mb-3"> diff --git a/sec_certs_page/templates/fips/entry/index.html.jinja2 b/sec_certs_page/templates/fips/entry/index.html.jinja2 index b4ae555d..2701812b 100644 --- a/sec_certs_page/templates/fips/entry/index.html.jinja2 +++ b/sec_certs_page/templates/fips/entry/index.html.jinja2 @@ -338,9 +338,7 @@ {% if additional_metadata %} <h3 class="mt-3">Additional Metadata</h3> - {% for header in additional_metadata["metadata_headers"] %} - {{ metadata_binding_card("fips", loop.index, header) }} - {% endfor %} + {{ metadata_binding_card("fips", additional_metadata["metadata_headers"]) }} {% endif %} <div class="mt-5 mb-5"> From 9e22260f54744f74459475b23d64c9c7590810f3 Mon Sep 17 00:00:00 2001 From: Erik Moravec <xmoravec@fi.muni.cz> Date: Fri, 12 May 2023 00:15:16 +0200 Subject: [PATCH 10/11] fixed a typo in import --- sec_certs_page/templates/cc/entry/index.html.jinja2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sec_certs_page/templates/cc/entry/index.html.jinja2 b/sec_certs_page/templates/cc/entry/index.html.jinja2 index f9e2b922..fbcf0cc8 100644 --- a/sec_certs_page/templates/cc/entry/index.html.jinja2 +++ b/sec_certs_page/templates/cc/entry/index.html.jinja2 @@ -2,7 +2,7 @@ {% set crumbs = True %} {% set search = True %} {% from "cc/utils.html.jinja2" import cc_keywords, security_keywords, render_status, render_sars, render_sfrs, render_category, render_eal, render_frontpage %} -{% from "common/entry.html.jinja2" import cryptography_keywords, device_keywords, security_keywords, other_keywords, render_local_files, render_pdf_meta, render_cves, render_referencess, metadata_binding_card %} +{% from "common/entry.html.jinja2" import cryptography_keywords, device_keywords, security_keywords, other_keywords, render_local_files, render_pdf_meta, render_cves, render_references, metadata_binding_card %} {% block title %} <title> {{ cert["name"]|default("", true) + " | seccerts.org" }} From ec245b550e79cc29f11d214ed42ae902564e5271 Mon Sep 17 00:00:00 2001 From: Erik Moravec <468981@mail.muni.cz> Date: Fri, 16 Jun 2023 23:24:02 +0200 Subject: [PATCH 11/11] updated jwt to assymetric algorithm --- config/example.config.py | 2 +- sec_certs_page/common/bindings.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/example.config.py b/config/example.config.py index 4954260a..39e61d38 100644 --- a/config/example.config.py +++ b/config/example.config.py @@ -95,4 +95,4 @@ BINDING_URLS = [] # Secret key to validate jwt tokens of downloaded binding files with. -BINDINGS_SECRET_KEY = "" \ No newline at end of file +BINDINGS_PUBLIC_KEY = "" \ No newline at end of file diff --git a/sec_certs_page/common/bindings.py b/sec_certs_page/common/bindings.py index bfa1cc08..b2d7ca27 100644 --- a/sec_certs_page/common/bindings.py +++ b/sec_certs_page/common/bindings.py @@ -160,7 +160,7 @@ def verify_jwt(data: dict[str, Union[str, object]]) -> bool: jwt_token = data['JWT'] del data["JWT"] encoded_jwt = jwt.encode( - data, key=current_app.config["BINDINGS_SECRET_KEY"], algorithm="HS256") + data, key=current_app.config["BINDINGS_PUBLIC_KEY"], algorithm="RS256") if not jwt_token: click.echo("JWT token is missing") return False