diff --git a/greenbone/scap/cpe_match/json.py b/greenbone/scap/cpe_match/json.py index 37fedbc..9802e08 100644 --- a/greenbone/scap/cpe_match/json.py +++ b/greenbone/scap/cpe_match/json.py @@ -6,8 +6,9 @@ from array import array from dataclasses import asdict, dataclass from datetime import datetime +from io import TextIOWrapper from pathlib import Path -from typing import Sequence +from typing import Sequence, Any, Optional from pontos.nvd.models.cpe_match_string import CPEMatchString from rich.console import Console @@ -96,6 +97,22 @@ def add_match_strings( for match_string in match_strings: self.add_match_string(match_string) + def _encode_json( + self, + data: dict[str, Any], + out_file: TextIOWrapper, + validation_buffer: Optional[bytearray] = None, + *, + indent: int = 1, + ): + encoder = JsonEncoder(indent = indent) + + for chunk in encoder.iterencode(data): + out_file.write(chunk) + if validation_buffer is not None: + validation_buffer.extend(chunk.encode("utf-8")) + + def write(self, file_name: str = "nvd_cpe_matches") -> None: """ Write the CPE data to JSON files with optional compression in the specified folder. @@ -108,20 +125,21 @@ def write(self, file_name: str = "nvd_cpe_matches") -> None: self._match_string_response.match_strings ) - encoder = JsonEncoder(indent=1) - char_array = array("b") + validation_buffer: Optional[bytearray] = None + if self.validate: + validation_buffer = bytearray() response_dict = asdict(self._match_string_response) convert_keys_to_camel(response_dict) - for chunk in encoder.iterencode(response_dict): - char_array.frombytes(chunk.encode("utf-8")) - json_data = char_array.tobytes() - - self._validate_json(file_name, json_data) if self._compress: path = self._storage_path / f"{file_name}.json.gz" - path.write_bytes(gzip.compress(json_data)) + with gzip.open(path, "wt", encoding="utf-8") as out_file: + self._encode_json(response_dict, out_file, validation_buffer) else: path = self._storage_path / f"{file_name}.json" - path.write_bytes(json_data) + with open(path, "wt", encoding="utf-8") as out_file: + self._encode_json(response_dict, out_file, validation_buffer) + + if self.validate: + self._validate_json(file_name, validation_buffer)