From 539c33e498e90356b302623e052d27bc10e0802c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 25 Jun 2025 11:24:10 -0700 Subject: [PATCH 1/2] Fixup sasl message handling for py2 --- kafka/sasl/gssapi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index c8e4f7cac..4785b1b75 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -68,10 +68,10 @@ def receive(self, auth_bytes): # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server client_flags = self.SASL_QOP_AUTH - server_flags = msg[0] + server_flags = struct.Struct('>b').unpack(msg[0:1])[0] message_parts = [ struct.Struct('>b').pack(client_flags & server_flags), - msg[1:], + msg[1:], # always agree to max message size from server self.auth_id.encode('utf-8'), ] # add authorization identity to the response, and GSS-wrap From 14a9e1352313b575ff1c6990445d5f88c3ced70b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 25 Jun 2025 11:24:23 -0700 Subject: [PATCH 2/2] fetcher py2 fix: no deque.copy() --- kafka/consumer/fetcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1888d38bf..1689b23f1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -613,7 +613,8 @@ def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process # use copy.copy to avoid runtimeerror on mutation from different thread - discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()} + # TODO: switch to deque.copy() with py3 + discard = {fetch.topic_partition for fetch in copy.copy(self._completed_fetches)} current = self._next_partition_records if current: discard.add(current.topic_partition)