From 1f1942f4e4acc5844490acbb7a845a575ba87f2e Mon Sep 17 00:00:00 2001 From: Jaspar Stach Date: Tue, 20 Feb 2024 14:14:56 +0100 Subject: [PATCH] Make the script even worse ... --- scripts/check-gmp.gmp.py | 152 ++++++++++++++++++++------------------- 1 file changed, 79 insertions(+), 73 deletions(-) diff --git a/scripts/check-gmp.gmp.py b/scripts/check-gmp.gmp.py index f787fac8..849f3d77 100644 --- a/scripts/check-gmp.gmp.py +++ b/scripts/check-gmp.gmp.py @@ -31,6 +31,7 @@ from decimal import Decimal from pathlib import Path from typing import Any +from xml.etree import ElementTree from gvm.protocols.gmp import Gmp from lxml import etree @@ -102,21 +103,18 @@ def __init__(self, path: str) -> None: path (string): Path to the database. """ self.cursor = None - self.con_db = None - self.db = Path(path) + self.db = None + self.db_path = Path(path) self.pid = os.getpid() # Try to read file with information about cached reports # First check whether the file exist or not try: - exist = self.db.is_file() - logger.debug("DB file exist?: %s ", exist) - - if not exist: - if not self.db.parent.is_dir(): - self.db.parent.mkdir(parents=True, exist_ok=True) - else: - self.db.touch() + if not self.db_path.is_file(): + logger.debug("DB is not existing. Creating ...") + # create if not existing + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.db_path.touch(exist_ok=True) # Connect to db self.connect_db() @@ -126,16 +124,14 @@ def __init__(self, path: str) -> None: host text, scan_end text, params_used text, - report text - )""" + report text)""" ) self.cursor.execute( """CREATE TABLE Instance( created_at text, pid integer, - pending integer default 0 - )""" + pending integer default 0)""" ) logger.debug("Tables created") @@ -144,7 +140,7 @@ def __init__(self, path: str) -> None: except PermissionError: raise ScriptError( - f"The selected temporary database file {self.db} or the" + f"The selected temporary database file {self.db_path} or the" " parent dir has not the correct permissions." ) @@ -159,24 +155,16 @@ def connect_db(self) -> None: Simply connect to the database at location """ try: - logger.debug("connect db: %s", self.db) - self.con_db = sqlite3.connect(str(self.db)) - self.cursor: sqlite3.Cursor = self.con_db.cursor() + logger.debug("Connect to DB: %s", self.db_path) + self.db = sqlite3.connect(str(self.db_path)) + self.cursor: sqlite3.Cursor = self.db.cursor() logger.debug(sqlite3.sqlite_version) except Exception as e: # pylint: disable=broad-except logger.debug(e) def close_db(self) -> None: """Close database""" - self.con_db.close() - - def set_host(self, host: str) -> None: - """Sets the host variable - - Arguments: - host (string): Given ip or hostname of target. - """ - self.host = host + self.db.close() def is_old_report(self, last_scan_end, params_used) -> bool: """Decide whether the current report is old or not @@ -266,14 +254,14 @@ def add_report(self, scan_end, params_used, report) -> None: ) # Save the changes - self.con_db.commit() + self.db.commit() def delete_report(self) -> None: """Delete report from database""" self.cursor.execute("DELETE FROM Report WHERE host=?", (self.host,)) # Save the changes - self.con_db.commit() + self.db.commit() def delete_entry_with_ip(self, ip) -> None: """Delete report from database with given ip @@ -283,11 +271,11 @@ def delete_entry_with_ip(self, ip) -> None: """ logger.debug("Delete entry with ip: %s", ip) self.cursor.execute("DELETE FROM Report WHERE host=?", (ip,)) - self.con_db.isolation_level = None + self.db.isolation_level = None self.cursor.execute("VACUUM") - self.con_db.isolation_level = "" # see: https://github.com/CxAalto/gtfspy/commit/8d05c3c94a6d4ca3ed675d88af93def7d5053bfe # pylint: disable=line-too-long # noqa: E501 + self.db.isolation_level = "" # see: https://github.com/CxAalto/gtfspy/commit/8d05c3c94a6d4ca3ed675d88af93def7d5053bfe # pylint: disable=line-too-long # noqa: E501 # Save the changes - self.con_db.commit() + self.db.commit() def delete_older_entries(self, days): """Delete reports from database older than given days @@ -303,7 +291,7 @@ def delete_older_entries(self, days): self.cursor.execute("VACUUM") # Save the changes - self.con_db.commit() + self.db.commit() def has_entries(self, pending: bool) -> Any: """Return number of instance entries @@ -435,7 +423,7 @@ def add_instance(self, pending) -> None: ) # Save the changes - self.con_db.commit() + self.db.commit() def get_oldest_pending_entries(self, number) -> list[Any]: """Return the oldest last entries of pending entries from database @@ -467,7 +455,7 @@ def update_pending_status(self, date, pending) -> None: ) # Save the changes - self.con_db.commit() + self.db.commit() def delete_instance(self, pid=None) -> None: """Delete instance from database @@ -486,13 +474,13 @@ def delete_instance(self, pid=None) -> None: self.cursor.execute("DELETE FROM Instance WHERE pid=?", (pid,)) # Save the changes - self.con_db.commit() + self.db.commit() def clean_orphaned_instances(self) -> None: """Delete non existing instance entries - This method check whether a pid exist on the os and if not then delete - the orphaned entry from database. + This method checks whether a pid exists on the os and if not then + delete the orphaned entry from database. """ self.cursor.execute("SELECT pid FROM Instance") @@ -581,12 +569,14 @@ def ping(gmp: Gmp, report_manager: ReportManager): if "200" in version_status: end_session( - report_manager, "GMP OK: Ping successful", NagiosStatus.NAGIOS_OK + report_manager, + f"{GmpError.GMP_OK.value}: Ping successful", + NagiosStatus.NAGIOS_OK, ) else: end_session( report_manager, - "GMP CRITICAL: Machine dead?", + f"{GmpError.GMP_CRITICAL.value}: Machine dead?", NagiosStatus.NAGIOS_CRITICAL, ) @@ -621,14 +611,14 @@ def status(gmp: Gmp, report_manager: ReportManager, script_args: Namespace): print("Getting task with filter_string='{filter_string}'") if script_args.task: - task = gmp.get_tasks(filter_string=filter_string) + task: str = gmp.get_tasks(filter_string=filter_string) if script_args.trend: trend = task.xpath("task/trend/text()") if not trend: end_session( report_manager, - "GMP UNKNOWN: Trend is not available.", + f"{GmpError.GMP_OK.value}: Trend is not available.", NagiosStatus.NAGIOS_UNKNOWN, ) @@ -637,33 +627,33 @@ def status(gmp: Gmp, report_manager: ReportManager, script_args: Namespace): if trend in ["up", "more"]: end_session( report_manager, - f"GMP CRITICAL: Trend is {trend}.", + f"{GmpError.GMP_CRITICAL.value}: Trend is {trend}.", NagiosStatus.NAGIOS_CRITICAL, ) elif trend in ["down", "same", "less"]: end_session( report_manager, - f"GMP OK: Trend is {trend}.", + f"{GmpError.GMP_OK.value}: Trend is {trend}.", NagiosStatus.NAGIOS_OK, ) else: end_session( report_manager, - f"GMP UNKNOWN: Trend is unknown: {trend}", + f"{GmpError.GMP_OK.value}: Trend is unknown: {trend}", NagiosStatus.NAGIOS_UNKNOWN, ) else: - last_report_id = task.xpath("task/last_report/report/@id") + last_report_id: str = task.xpath("task/last_report/report/@id") if not last_report_id: end_session( report_manager, - "GMP UNKNOWN: Report is not available", + f"{GmpError.GMP_OK.value}: Report is not available", NagiosStatus.NAGIOS_UNKNOWN, ) last_report_id = last_report_id[0] - last_scan_end = task.xpath( + last_scan_end: str = task.xpath( "task/last_report/report/scan_end/text()" ) @@ -702,7 +692,9 @@ def status(gmp: Gmp, report_manager: ReportManager, script_args: Namespace): ) -def filter_report(report_manager: ReportManager, report, script_args): +def filter_report( + report_manager: ReportManager, report: ElementTree, script_args +): """Filter out the information in a report This function filters the results of a given report. @@ -717,12 +709,13 @@ def filter_report(report_manager: ReportManager, report, script_args): if not results: end_session( report_manager, - "GMP UNKNOWN: Failed to get results list", + f"{GmpError.GMP_OK.value}: Failed to get results list", NagiosStatus.NAGIOS_UNKNOWN, ) results = results[0] # Init variables + ## make this an class NVTs()? any_found = False high_count = 0 medium_count = 0 @@ -740,7 +733,7 @@ def filter_report(report_manager: ReportManager, report, script_args): if not host: end_session( report_manager, - "GMP UNKNOWN: Failed to parse result host", + f"{GmpError.GMP_OK.value}: Failed to parse result host", NagiosStatus.NAGIOS_UNKNOWN, ) @@ -752,7 +745,7 @@ def filter_report(report_manager: ReportManager, report, script_args): if not threat: end_session( report_manager, - "GMP UNKNOWN: Failed to parse result threat.", + f"{GmpError.GMP_OK.value}: Failed to parse result threat.", NagiosStatus.NAGIOS_UNKNOWN, ) @@ -776,7 +769,7 @@ def filter_report(report_manager: ReportManager, report, script_args): else: end_session( report_manager, - f"GMP UNKNOWN: Unknown result threat: {threat}", + f"{GmpError.GMP_OK.value}: Unknown result threat: {threat}", NagiosStatus.NAGIOS_UNKNOWN, ) @@ -794,14 +787,14 @@ def filter_report(report_manager: ReportManager, report, script_args): ret = 0 if high_count > 0: - ret = NagiosStatus.NAGIOS_CRITICAL + ret = NagiosStatus.NAGIOS_CRITICAL.value elif medium_count > 0: - ret = NagiosStatus.NAGIOS_WARNING + ret = NagiosStatus.NAGIOS_WARNING.value if script_args.empty_as_unknown and ( not all_results or (not any_found and script_args.hostaddress) ): - ret = NagiosStatus.NAGIOS_UNKNOWN + ret = NagiosStatus.NAGIOS_UNKNOWN.value print( f"GMP {NAGIOS_MSG[ret]}: " @@ -1248,7 +1241,9 @@ def _parse_args(args: Namespace) -> Namespace: ) parser.add_argument( - "--clean", action="store_true", help="Activate to clean the database." + "--clean", + action="store_true", + help="Activate to clean the database. Default: False", ) parser.add_argument( @@ -1272,59 +1267,65 @@ def _parse_args(args: Namespace) -> Namespace: ) parser.add_argument( - "--apply-overrides", action="store_true", help="Apply overrides." + "--apply-overrides", + action="store_true", + help="Apply overrides. Default: False", ) parser.add_argument( - "--overrides", action="store_true", help="Include overrides." + "--overrides", + action="store_true", + help="Include overrides. Default: False", ) parser.add_argument( "-d", "--details", action="store_true", - help="Include connection details in output.", + help="Include connection details in output. Default: False", ) parser.add_argument( "-l", "--report-link", action="store_true", - help="Include URL of report in output.", + help="Include URL of report in output. Default: False", ) parser.add_argument( "--dfn", action="store_true", - help="Include DFN-CERT IDs on vulnerabilities in output.", + help="Include DFN-CERT IDs on vulnerabilities in output. Default: False", ) parser.add_argument( "--oid", action="store_true", - help="Include OIDs of NVTs finding vulnerabilities in output.", + help="Include OIDs of NVTs finding vulnerabilities in output. Default: False", ) parser.add_argument( "--descr", action="store_true", - help="Include descriptions of NVTs finding vulnerabilities in output.", + help="Include descriptions of NVTs finding vulnerabilities in output. Default: False", ) parser.add_argument( - "--showlog", action="store_true", help="Include log messages in output." + "--showlog", + action="store_true", + help="Include log messages in output. Default: False", ) parser.add_argument( "--show-ports", action="store_true", - help="Include port of given vulnerable nvt in output.", + help="Include port of given vulnerable nvt in output. Default: False", ) parser.add_argument( "--scanend", action="store_true", - help="Include timestamp of scan end in output.", + help="Include timestamp of scan end in output. Default: False", ) parser.add_argument( @@ -1340,7 +1341,7 @@ def _parse_args(args: Namespace) -> Namespace: "-e", "--empty-as-unknown", action="store_true", - help="Respond with UNKNOWN on empty results.", + help="Respond with UNKNOWN on empty results. Default: False", ) parser.add_argument( @@ -1348,10 +1349,15 @@ def _parse_args(args: Namespace) -> Namespace: "--max-running-instances", default=10, type=int, - help="Set the maximum simultaneous processes of check-gmp", + help="Set the maximum simultaneous processes of check-gmp. Default: 10", ) - parser.add_argument("--hostname", nargs="?", required=False) + parser.add_argument( + "--hostname", + nargs="?", + required=False, + help="The appliance I guess ...", + ) group = parser.add_mutually_exclusive_group(required=False) group.add_argument( @@ -1366,7 +1372,7 @@ def _parse_args(args: Namespace) -> Namespace: group.add_argument( "--days", type=int, - help="Delete database entries that are older than" " given days.", + help="Delete database entries that are older than given days.", ) group.add_argument("--ip", help="Delete database entry for given ip.") @@ -1415,7 +1421,7 @@ def main(gmp: Gmp, args: Namespace) -> None: sys.exit(1) # Set the host - report_manager.set_host(script_args.hostaddress) + report_manager.host = script_args.hostaddress # Check if no more than 10 instances of check-gmp runs simultaneously report_manager.check_instances() @@ -1425,7 +1431,7 @@ def main(gmp: Gmp, args: Namespace) -> None: except Exception as e: # pylint: disable=broad-except end_session( report_manager, - f"GMP CRITICAL: {str(e)}", + f"{GmpError.GMP_CRITICAL.value}: {str(e)}", NagiosStatus.NAGIOS_CRITICAL, )