Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: Encode CPE match strings directly to file #61

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions greenbone/scap/cpe_match/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
# SPDX-License-Identifier: GPL-3.0-or-later

import gzip
from array import array
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from typing import Sequence
from typing import Any, Optional, Sequence, TextIO

from pontos.nvd.models.cpe_match_string import CPEMatchString
from rich.console import Console
Expand Down Expand Up @@ -96,6 +95,21 @@ def add_match_strings(
for match_string in match_strings:
self.add_match_string(match_string)

def _encode_json(
self,
data: dict[str, Any],
out_file: TextIO,
validation_buffer: Optional[bytearray] = None,
*,
indent: int = 1,
):
encoder = JsonEncoder(indent=indent)

for chunk in encoder.iterencode(data):
out_file.write(chunk)
if validation_buffer is not None:
validation_buffer.extend(chunk.encode("utf-8"))

def write(self, file_name: str = "nvd_cpe_matches") -> None:
"""
Write the CPE data to JSON files with optional compression in the specified folder.
Expand All @@ -108,20 +122,21 @@ def write(self, file_name: str = "nvd_cpe_matches") -> None:
self._match_string_response.match_strings
)

encoder = JsonEncoder(indent=1)
char_array = array("b")
validation_buffer: Optional[bytearray] = None
if self.validate:
validation_buffer = bytearray()

response_dict = asdict(self._match_string_response)
convert_keys_to_camel(response_dict)
for chunk in encoder.iterencode(response_dict):
char_array.frombytes(chunk.encode("utf-8"))
json_data = char_array.tobytes()

self._validate_json(file_name, json_data)

if self._compress:
path = self._storage_path / f"{file_name}.json.gz"
path.write_bytes(gzip.compress(json_data))
with gzip.open(path, "wt", encoding="utf-8") as out_file:
self._encode_json(response_dict, out_file, validation_buffer)
else:
path = self._storage_path / f"{file_name}.json"
path.write_bytes(json_data)
with open(path, "wt", encoding="utf-8") as out_file:
self._encode_json(response_dict, out_file, validation_buffer)

if validation_buffer:
self._validate_json(file_name, validation_buffer.decode("utf-8"))
Loading