Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 136 additions & 57 deletions splitio/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ def _get_headers(self, extra_headers, sdk_key):
headers.update(extra_headers)
return headers

def _record_telemetry(self, status_code, elapsed):
"""
Record Telemetry info

:param status_code: http request status code
:type status_code: int

:param elapsed: response time elapsed.
:type status_code: int
"""
self._telemetry_runtime_producer.record_sync_latency(self._metric_name, elapsed)
if 200 <= status_code < 300:
self._telemetry_runtime_producer.record_successful_sync(self._metric_name, get_current_epoch_time_ms())
return

self._telemetry_runtime_producer.record_sync_error(self._metric_name, status_code)

class HttpClient(HttpClientBase):
"""HttpClient wrapper."""

Expand All @@ -140,7 +157,6 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
_LOGGER.debug("Initializing httpclient")
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
self._lock = threading.RLock()

def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint: disable=too-many-arguments
"""
Expand Down Expand Up @@ -208,23 +224,6 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
except Exception as exc: # pylint: disable=broad-except
raise HttpClientException(_EXC_MSG.format(source='request')) from exc

def _record_telemetry(self, status_code, elapsed):
"""
Record Telemetry info

:param status_code: http request status code
:type status_code: int

:param elapsed: response time elapsed.
:type status_code: int
"""
self._telemetry_runtime_producer.record_sync_latency(self._metric_name, elapsed)
if 200 <= status_code < 300:
self._telemetry_runtime_producer.record_successful_sync(self._metric_name, get_current_epoch_time_ms())
return

self._telemetry_runtime_producer.record_sync_error(self._metric_name, status_code)

class HttpClientAsync(HttpClientBase):
"""HttpClientAsync wrapper."""

Expand Down Expand Up @@ -350,7 +349,7 @@ async def close_session(self):
if not self._session.closed:
await self._session.close()

class HttpClientKerberos(HttpClient):
class HttpClientKerberos(HttpClientBase):
"""HttpClient wrapper."""

def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None, authentication_scheme=None, authentication_params=None):
Expand All @@ -367,11 +366,22 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
:type auth_url: str
:param telemetry_url: Optional alternative telemetry URL.
:type telemetry_url: str
:param authentication_scheme: Optional authentication scheme to use.
:type authentication_scheme: splitio.client.config.AuthenticateScheme
:param authentication_params: Optional authentication username and password to use.
:type authentication_params: [str, str]
"""
_LOGGER.debug("Initializing httpclient for Kerberos auth")
HttpClient.__init__(self, timeout=timeout, sdk_url=sdk_url, events_url=events_url, auth_url=auth_url, telemetry_url=telemetry_url)
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
self._authentication_scheme = authentication_scheme
self._authentication_params = authentication_params
self._lock = threading.RLock()
self._sessions = {'sdk': requests.Session(),
'events': requests.Session(),
'auth': requests.Session(),
'telemetry': requests.Session()}
self._set_authentication()

def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint: disable=too-many-arguments
"""
Expand All @@ -392,21 +402,49 @@ def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint:
"""
with self._lock:
start = get_current_epoch_time_ms()
with requests.Session() as session:
self._set_authentication(session)
try:
return self._do_get(server, path, sdk_key, query, extra_headers, start)

except requests.exceptions.ProxyError as exc:
_LOGGER.debug("Proxy Exception caught, resetting the http session")
self._sessions[server].close()
self._sessions[server] = requests.Session()
self._set_authentication(server_name=server)
try:
response = session.get(
_build_url(server, path, self._urls),
headers=self._get_headers(extra_headers, sdk_key),
params=query,
timeout=self._timeout
)
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
return HttpResponse(response.status_code, response.text, response.headers)

except Exception as exc: # pylint: disable=broad-except
return self._do_get(server, path, sdk_key, query, extra_headers, start)

except Exception as exc:
raise HttpClientException(_EXC_MSG.format(source='request')) from exc

except Exception as exc: # pylint: disable=broad-except
raise HttpClientException(_EXC_MSG.format(source='request')) from exc

def _do_get(self, server, path, sdk_key, query, extra_headers, start):
"""
Issue a get request.
:param server: Whether the request is for SDK server, Events server or Auth server.
:typee server: str
:param path: path to append to the host url.
:type path: str
:param sdk_key: sdk key.
:type sdk_key: str
:param query: Query string passed as dictionary.
:type query: dict
:param extra_headers: key/value pairs of possible extra headers.
:type extra_headers: dict

:return: Tuple of status_code & response text
:rtype: HttpResponse
"""
with self._sessions[server].get(
_build_url(server, path, self._urls),
headers=self._get_headers(extra_headers, sdk_key),
params=query,
timeout=self._timeout
) as response:
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
return HttpResponse(response.status_code, response.text, response.headers)

def post(self, server, path, sdk_key, body, query=None, extra_headers=None): # pylint: disable=too-many-arguments
"""
Issue a POST request.
Expand All @@ -429,31 +467,72 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
"""
with self._lock:
start = get_current_epoch_time_ms()
with requests.Session() as session:
self._set_authentication(session)
try:
return self._do_post(server, path, sdk_key, query, extra_headers, body, start)

except requests.exceptions.ProxyError as exc:
_LOGGER.debug("Proxy Exception caught, resetting the http session")
self._sessions[server].close()
self._sessions[server] = requests.Session()
self._set_authentication(server_name=server)
try:
response = session.post(
_build_url(server, path, self._urls),
params=query,
headers=self._get_headers(extra_headers, sdk_key),
json=body,
timeout=self._timeout,
)
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
return HttpResponse(response.status_code, response.text, response.headers)
except Exception as exc: # pylint: disable=broad-except
return self._do_post(server, path, sdk_key, query, extra_headers, body, start)

except Exception as exc:
raise HttpClientException(_EXC_MSG.format(source='request')) from exc

def _set_authentication(self, session):
if self._authentication_scheme == AuthenticateScheme.KERBEROS_SPNEGO:
_LOGGER.debug("Using Kerberos Spnego Authentication")
if self._authentication_params != [None, None]:
session.auth = HTTPKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1], mutual_authentication=OPTIONAL)
else:
session.auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
elif self._authentication_scheme == AuthenticateScheme.KERBEROS_PROXY:
_LOGGER.debug("Using Kerberos Proxy Authentication")
if self._authentication_params != [None, None]:
session.mount('https://', HTTPAdapterWithProxyKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1]))
else:
session.mount('https://', HTTPAdapterWithProxyKerberosAuth())
except Exception as exc: # pylint: disable=broad-except
raise HttpClientException(_EXC_MSG.format(source='request')) from exc

def _do_post(self, server, path, sdk_key, query, extra_headers, body, start):
"""
Issue a POST request.

:param server: Whether the request is for SDK server or Events server.
:typee server: str
:param path: path to append to the host url.
:type path: str
:param sdk_key: sdk key.
:type sdk_key: str
:param body: body sent in the request.
:type body: str
:param query: Query string passed as dictionary.
:type query: dict
:param extra_headers: key/value pairs of possible extra headers.
:type extra_headers: dict

:return: Tuple of status_code & response text
:rtype: HttpResponse
"""
with self._sessions[server].post(
_build_url(server, path, self._urls),
params=query,
headers=self._get_headers(extra_headers, sdk_key),
json=body,
timeout=self._timeout,
) as response:
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
return HttpResponse(response.status_code, response.text, response.headers)

def _set_authentication(self, server_name=None):
"""
Set the authentication for all self._sessions variables based on authentication scheme.

:param server: If set, will only add the auth for its session variable, otherwise will set all sessions.
:typee server: str
"""
for server in ['sdk', 'events', 'auth', 'telemetry']:
if server_name is not None and server_name != server:
continue
if self._authentication_scheme == AuthenticateScheme.KERBEROS_SPNEGO:
_LOGGER.debug("Using Kerberos Spnego Authentication")
if self._authentication_params != [None, None]:
self._sessions[server].auth = HTTPKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1], mutual_authentication=OPTIONAL)
else:
self._sessions[server].auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
elif self._authentication_scheme == AuthenticateScheme.KERBEROS_PROXY:
_LOGGER.debug("Using Kerberos Proxy Authentication")
if self._authentication_params != [None, None]:
self._sessions[server].mount('https://', HTTPAdapterWithProxyKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1]))
else:
self._sessions[server].mount('https://', HTTPAdapterWithProxyKerberosAuth())
Loading
Loading