From 7d1eb7e2c9a79e95cf586c5f7253ac511c390718 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Mon, 8 Jul 2024 13:19:23 -0400 Subject: [PATCH] Fix errors raised by new version of Pylint so tests pass again --- kafka/admin/client.py | 5 +++++ kafka/record/default_records.py | 8 +++++--- kafka/record/legacy_records.py | 2 ++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b01f8fe6..f74e09a80 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): topics=topics, allow_auto_topic_creation=auto_topic_creation ) + else: + raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported") future = self._send_request_to_node( self._client.least_loaded_node(), @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: + group_description = None assert len(response.groups) == 1 for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): if isinstance(response_field, Array): @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response): if response.API_VERSION <=2: described_group_information_list.append(None) group_description = GroupInformation._make(described_group_information_list) + if group_description is None: + raise Errors.BrokerResponseError("No group description received") error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 8b630cc8b..06be57621 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None: data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) - if compression_type == self.CODEC_SNAPPY: + elif compression_type == self.CODEC_SNAPPY: uncompressed = snappy_decode(data.tobytes()) - if compression_type == self.CODEC_LZ4: + elif compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) - if compression_type == self.CODEC_ZSTD: + elif compression_type == self.CODEC_ZSTD: uncompressed = zstd_decode(data.tobytes()) + else: + raise NotImplementedError(f"Compression type {compression_type} is not supported") self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 4439462f6..44b365b06 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool: compressed = lz4_encode_old_kafka(data) else: compressed = lz4_encode(data) + else: + raise NotImplementedError(f"Compression type {self._compression_type} is not supported") size = self.size_in_bytes( 0, timestamp=0, key=None, value=compressed) # We will try to reuse the same buffer if we have enough space