Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ class KafkaAdminClient(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances

"""
Expand Down
4 changes: 2 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class KafkaClient(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
"""

DEFAULT_CONFIG = {
Expand Down
4 changes: 2 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ class BrokerConnection(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
"""

DEFAULT_CONFIG = {
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class KafkaConsumer(six.Iterator):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances

Note:
Expand Down
3 changes: 0 additions & 3 deletions kafka/oauth/__init__.py

This file was deleted.

42 changes: 0 additions & 42 deletions kafka/oauth/abstract.py

This file was deleted.

4 changes: 2 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ class KafkaProducer(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances

Note:
Expand Down
49 changes: 46 additions & 3 deletions kafka/sasl/oauth.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from __future__ import absolute_import

import abc

from kafka.sasl.abc import SaslMechanism


class SaslMechanismOAuth(SaslMechanism):

def __init__(self, **config):
assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
assert isinstance(config['sasl_oauth_token_provider'], AbstractTokenProvider), \
'sasl_oauth_token_provider must implement kafka.sasl.oauth.AbstractTokenProvider'
self.token_provider = config['sasl_oauth_token_provider']
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
self._is_done = False
self._is_authenticated = False

Expand All @@ -32,13 +35,53 @@ def _token_extensions(self):
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
initial request.
"""
# Only run if the #extensions() method is implemented by the clients Token Provider class
# Builds up a string separated by \x01 via a dict of key value pairs
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
extensions = self.token_provider.extensions()
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
return '\x01' + msg if msg else ''

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated via SASL / OAuth'

# This statement is compatible with both Python 2.7 & 3+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})

class AbstractTokenProvider(ABC):
"""
A Token Provider must be used for the SASL OAuthBearer protocol.

The implementation should ensure token reuse so that multiple
calls at connect time do not create multiple tokens. The implementation
should also periodically refresh the token in order to guarantee
that each call returns an unexpired token. A timeout error should
be returned after a short period of inactivity so that the
broker can log debugging info and retry.

Token Providers MUST implement the token() method
"""

def __init__(self, **config):
pass

@abc.abstractmethod
def token(self):
"""
Returns a (str) ID/Access Token to be sent to the Kafka
client.
"""
pass

def extensions(self):
"""
This is an OPTIONAL method that may be implemented.

Returns a map of key-value pairs that can
be sent with the SASL/OAUTHBEARER initial client request. If
not implemented, the values are ignored. This feature is only available
in Kafka >= 2.1.0.

All returned keys and values should be type str
"""
return {}