Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class KafkaAdminClient(object):
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -181,6 +184,7 @@ class KafkaAdminClient(object):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
Expand Down
4 changes: 4 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class KafkaClient(object):
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -206,6 +209,7 @@ class KafkaClient(object):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
Expand Down
4 changes: 4 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class BrokerConnection(object):
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -216,6 +219,7 @@ class BrokerConnection(object):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
Expand Down
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ class KafkaConsumer(six.Iterator):
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -317,6 +320,7 @@ class KafkaConsumer(six.Iterator):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ class KafkaProducer(object):
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -347,6 +350,7 @@ class KafkaProducer(object):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
Expand Down
17 changes: 12 additions & 5 deletions kafka/sasl/gssapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ class SaslMechanismGSSAPI(SaslMechanism):

def __init__(self, **config):
assert gssapi is not None, 'GSSAPI lib not available'
assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config:
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
self._is_done = False
self._is_authenticated = False
self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host']
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
if config.get('sasl_kerberos_name', None) is not None:
self.auth_id = str(config['sasl_kerberos_name'])
else:
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name
if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name):
self.gssapi_name = config['sasl_kerberos_name']
else:
self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
self._next_token = self._client_ctx.step(None)

Expand All @@ -54,7 +61,7 @@ def receive(self, auth_bytes):
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
else:
# unwraps message containing supported protection levels and msg size
msg = client_ctx.unwrap(received_token).message
msg = self._client_ctx.unwrap(auth_bytes).message
# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
Expand Down
2 changes: 1 addition & 1 deletion kafka/sasl/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
class SaslMechanismOAuth(SaslMechanism):

def __init__(self, **config):
assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
self.token_provider = config['sasl_oauth_token_provider']
assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
self._is_done = False
self._is_authenticated = False
Expand Down
6 changes: 3 additions & 3 deletions kafka/sasl/plain.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
class SaslMechanismPlain(SaslMechanism):

def __init__(self, **config):
if config['security_protocol'] == 'SASL_PLAINTEXT':
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
log.warning('Sending username and password in the clear')
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
assert 'sasl_plain_username' in config, 'sasl_plain_username required for PLAIN sasl'
assert 'sasl_plain_password' in config, 'sasl_plain_password required for PLAIN sasl'

self.username = config['sasl_plain_username']
self.password = config['sasl_plain_password']
Expand Down
8 changes: 4 additions & 4 deletions kafka/sasl/scram.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def xor_bytes(left, right):


class SaslMechanismScram(SaslMechanism):

def __init__(self, **config):
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for SCRAM sasl'
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for SCRAM sasl'
if config['security_protocol'] == 'SASL_PLAINTEXT':
assert 'sasl_plain_username' in config, 'sasl_plain_username required for SCRAM sasl'
assert 'sasl_plain_password' in config, 'sasl_plain_password required for SCRAM sasl'
assert config.get('sasl_mechanism', '') in ScramClient.MECHANISMS, 'Unrecognized SCRAM mechanism'
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
log.warning('Exchanging credentials in the clear during Sasl Authentication')

self._scram_client = ScramClient(
Expand Down