From 48ce9be532b100609a79bbc7610edb5144a83e4b Mon Sep 17 00:00:00 2001 From: Andre Kahles Date: Mon, 18 Nov 2024 16:55:59 +0100 Subject: [PATCH] Improve inference of archive name - add list of allowed file name extensions - allow to provide archive name explicitly on the command line --- archiver/constants.py | 2 ++ archiver/helpers.py | 31 ++++++++++++++++++++++++++----- archiver/integrity.py | 13 ++++++++----- archiver/main.py | 3 ++- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/archiver/constants.py b/archiver/constants.py index b9b4dfc..37c3f71 100644 --- a/archiver/constants.py +++ b/archiver/constants.py @@ -12,5 +12,7 @@ ENCRYPTION_ALGORITHM = "AES256" ENV_VAR_MAPPER_MAX_CPUS = "ARCHIVER_MAX_CPUS_ENV_VAR" DEFAULT_COMPRESSION_LEVEL = 6 +ALLOWED_SUFFIXES = ['.part[0-9]+', '.tar', '.md5', '.lz', '.gpg', '.lst'] +ALLOWED_SUFFIXES_REG = '(' + ')|('.join(ALLOWED_SUFFIXES) + ')' MD5_LINE_REGEX = re.compile(r'(\S+)\s+(\S.*)') diff --git a/archiver/helpers.py b/archiver/helpers.py index d83bad4..3e37c2b 100644 --- a/archiver/helpers.py +++ b/archiver/helpers.py @@ -11,7 +11,8 @@ import unicodedata from .constants import READ_CHUNK_BYTE_SIZE, COMPRESSED_ARCHIVE_SUFFIX, \ - ENCRYPTED_ARCHIVE_SUFFIX, ENV_VAR_MAPPER_MAX_CPUS, MD5_LINE_REGEX + ENCRYPTED_ARCHIVE_SUFFIX, ENV_VAR_MAPPER_MAX_CPUS, MD5_LINE_REGEX, \ + ALLOWED_SUFFIXES_REG def get_files_with_type_in_directory_or_terminate(directory, file_type): @@ -339,15 +340,35 @@ def file_is_valid_archive_or_terminate(file_path): def filename_without_extensions(path): - """Removes every suffix, including .partX""" - suffixes_string = "".join(path.suffixes) + """Removes every allowed suffix, including .partX""" + suffixes = path.suffixes + if len(suffixes) > 0: + allowed_suffixes = [] + for s in suffixes[::-1]: + if re.match(ALLOWED_SUFFIXES_REG, s.lower()): + allowed_suffixes.append(s) + else: + break + suffixes = allowed_suffixes[::-1] + + suffixes_string = "".join(suffixes) return path.name[:-len(suffixes_string)] def filepath_without_extensions(path:Path) -> Path: """Removes every suffix, including .partX""" - suffixes_string = "".join(path.suffixes) + suffixes = path.suffixes + if len(suffixes) > 0: + allowed_suffixes = [] + for s in suffixes[::-1]: + if re.match(ALLOWED_SUFFIXES_REG, s.lower()): + allowed_suffixes.append(s) + else: + break + suffixes = allowed_suffixes[::-1] + + suffixes_string = "".join(suffixes) return path.parent / path.name[:-len(suffixes_string)] @@ -362,7 +383,7 @@ def infer_source_name(source_path: Path) -> Path: if len(unique_names) == 0: terminate_with_message('There are no archive files present') elif len(unique_names) > 1: - terminate_with_message(f'More than one possible archive name detected: {str(unique_names)}') + terminate_with_message(f'Automatic archive name detection has failed. More than one possible archive name detected: {str(unique_names)}\n optionally use --archive_name to specific archive name.') return unique_names[0] diff --git a/archiver/integrity.py b/archiver/integrity.py index e765a58..7656b23 100644 --- a/archiver/integrity.py +++ b/archiver/integrity.py @@ -10,7 +10,7 @@ from .listing import parse_tar_listing -def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None): +def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None, archive_name=None): archives_with_hashes = get_archives_with_hashes_from_path(source_path) is_encrypted = helpers.path_target_is_encrypted(source_path) @@ -20,10 +20,10 @@ def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None): check_result = shallow_integrity_check(archives_with_hashes, workers=threads) if source_path.is_dir(): - integrity_result = check_archive_list_integrity(source_path) + integrity_result = check_archive_list_integrity(source_path, archive_name) else: file_path = source_path.parent / Path(helpers.filename_without_archive_extensions(source_path)) - integrity_result = check_archive_part_integrity(file_path) + integrity_result = check_archive_part_integrity(file_path, archive_name) if not integrity_result: logging.error( @@ -74,10 +74,13 @@ def check_archive_part_integrity(source_name: Path) -> bool: return check_result -def check_archive_list_integrity(source_path: Path) -> bool: +def check_archive_list_integrity(source_path: Path, archive_name: str = None) -> bool: parts = helpers.get_parts(source_path) - source_name = helpers.infer_source_name(source_path) + if archive_name is None: + source_name = helpers.infer_source_name(source_path) + else: + source_name = source_path / Path(archive_name) logging.info(f'Found {parts} parts in archive {source_path.as_posix()}') check_result = True diff --git a/archiver/main.py b/archiver/main.py index 08c397b..05e4a5a 100644 --- a/archiver/main.py +++ b/archiver/main.py @@ -160,6 +160,7 @@ def parse_arguments(args): parser_check.add_argument("archive_dir", type=str, help="Select source archive directory or .tar.lz file") parser_check.add_argument("-d", "--deep", action="store_true", help="Verify integrity by unpacking archive and hashing each file") parser_check.add_argument("-n", "--threads", type=int, help=thread_help) + parser_check.add_argument("--archive_name", type=str, help="Provide explicit source name of the archive (if automatic detection fails") parser_check.set_defaults(func=handle_check) # Preparation checks @@ -285,7 +286,7 @@ def handle_check(args): source_path = Path(args.archive_dir) threads = helpers.get_threads_from_args_or_environment(args.threads) - if not check_integrity(source_path, args.deep, threads, args.work_dir): + if not check_integrity(source_path, args.deep, threads, args.work_dir, args.archive_name): # return a different error code to the default code of 1 to be able to distinguish # general errors from a successful run of the program with an unsuccessful outcome # not taking 2, as it usually stands for command line argument errors