diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 27ad69312..c46bc7f3a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -149,8 +149,8 @@ class KafkaAdminClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider - instance. (See kafka.oauth.abstract). Default: None + sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer + token provider instance. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances """ diff --git a/kafka/client_async.py b/kafka/client_async.py index b72c05dac..d9a722cef 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -171,8 +171,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider - instance. (See kafka.oauth.abstract). Default: None + sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer + token provider instance. Default: None """ DEFAULT_CONFIG = { diff --git a/kafka/conn.py b/kafka/conn.py index 588b5fd86..7af7459da 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -183,8 +183,8 @@ class BrokerConnection(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider - instance. (See kafka.oauth.abstract). Default: None + sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer + token provider instance. Default: None """ DEFAULT_CONFIG = { diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 27be4588d..d517acf13 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -258,8 +258,8 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider - instance. (See kafka.oauth.abstract). Default: None + sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer + token provider instance. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances Note: diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py deleted file mode 100644 index 8c8349564..000000000 --- a/kafka/oauth/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from __future__ import absolute_import - -from kafka.oauth.abstract import AbstractTokenProvider diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py deleted file mode 100644 index 8d89ff51d..000000000 --- a/kafka/oauth/abstract.py +++ /dev/null @@ -1,42 +0,0 @@ -from __future__ import absolute_import - -import abc - -# This statement is compatible with both Python 2.7 & 3+ -ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) - -class AbstractTokenProvider(ABC): - """ - A Token Provider must be used for the SASL OAuthBearer protocol. - - The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - - Token Providers MUST implement the token() method - """ - - def __init__(self, **config): - pass - - @abc.abstractmethod - def token(self): - """ - Returns a (str) ID/Access Token to be sent to the Kafka - client. - """ - pass - - def extensions(self): - """ - This is an OPTIONAL method that may be implemented. - - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not implemented, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - """ - return {} diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e30e9b7be..1b9b12817 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -297,8 +297,8 @@ class KafkaProducer(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider - instance. (See kafka.oauth.abstract). Default: None + sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer + token provider instance. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances Note: diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py index d4f643d84..4041a93bd 100644 --- a/kafka/sasl/oauth.py +++ b/kafka/sasl/oauth.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import abc + from kafka.sasl.abc import SaslMechanism @@ -7,8 +9,9 @@ class SaslMechanismOAuth(SaslMechanism): def __init__(self, **config): assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' + assert isinstance(config['sasl_oauth_token_provider'], AbstractTokenProvider), \ + 'sasl_oauth_token_provider must implement kafka.sasl.oauth.AbstractTokenProvider' self.token_provider = config['sasl_oauth_token_provider'] - assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()' self._is_done = False self._is_authenticated = False @@ -32,9 +35,8 @@ def _token_extensions(self): Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER initial request. """ - # Only run if the #extensions() method is implemented by the clients Token Provider class # Builds up a string separated by \x01 via a dict of key value pairs - extensions = getattr(self.token_provider, 'extensions', lambda: [])() + extensions = self.token_provider.extensions() msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()]) return '\x01' + msg if msg else '' @@ -42,3 +44,44 @@ def auth_details(self): if not self.is_authenticated: raise RuntimeError('Not authenticated yet!') return 'Authenticated via SASL / OAuth' + +# This statement is compatible with both Python 2.7 & 3+ +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + +class AbstractTokenProvider(ABC): + """ + A Token Provider must be used for the SASL OAuthBearer protocol. + + The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + + Token Providers MUST implement the token() method + """ + + def __init__(self, **config): + pass + + @abc.abstractmethod + def token(self): + """ + Returns a (str) ID/Access Token to be sent to the Kafka + client. + """ + pass + + def extensions(self): + """ + This is an OPTIONAL method that may be implemented. + + Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not implemented, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + + All returned keys and values should be type str + """ + return {}