Skip to content

Commit b989197

Browse files
kevinjqliuFokko
authored andcommitted
Fix thrift client connection for Kerberos Hive Client (#1747)
Closes #1744 `TSaslClientTransport` cannot be reopen. This PR changes the behavior to recreate a `TSaslClientTransport` when its already closed. Note, `_HiveClient` should be used with context manager, but can be used without.
1 parent 998ce7c commit b989197

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

pyiceberg/catalog/hive.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
import socket
2020
import time
21+
from functools import cached_property
2122
from types import TracebackType
2223
from typing import (
2324
TYPE_CHECKING,
@@ -142,40 +143,47 @@ class _HiveClient:
142143
"""Helper class to nicely open and close the transport."""
143144

144145
_transport: TTransport
145-
_client: Client
146146
_ugi: Optional[List[str]]
147147

148148
def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT):
149149
self._uri = uri
150150
self._kerberos_auth = kerberos_auth
151151
self._ugi = ugi.split(":") if ugi else None
152+
self._transport = self._init_thrift_transport()
152153

153-
self._init_thrift_client()
154-
155-
def _init_thrift_client(self) -> None:
154+
def _init_thrift_transport(self) -> TTransport:
156155
url_parts = urlparse(self._uri)
157-
158156
socket = TSocket.TSocket(url_parts.hostname, url_parts.port)
159-
160157
if not self._kerberos_auth:
161-
self._transport = TTransport.TBufferedTransport(socket)
158+
return TTransport.TBufferedTransport(socket)
162159
else:
163-
self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")
160+
return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")
164161

162+
@cached_property
163+
def _client(self) -> Client:
165164
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
166-
167-
self._client = Client(protocol)
165+
client = Client(protocol)
166+
if self._ugi:
167+
client.set_ugi(*self._ugi)
168+
return client
168169

169170
def __enter__(self) -> Client:
170-
self._transport.open()
171-
if self._ugi:
172-
self._client.set_ugi(*self._ugi)
171+
"""Make sure the transport is initialized and open."""
172+
if not self._transport.isOpen():
173+
try:
174+
self._transport.open()
175+
except TTransport.TTransportException:
176+
# reinitialize _transport
177+
self._transport = self._init_thrift_transport()
178+
self._transport.open()
173179
return self._client
174180

175181
def __exit__(
176182
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
177183
) -> None:
178-
self._transport.close()
184+
"""Close transport if it was opened."""
185+
if self._transport.isOpen():
186+
self._transport.close()
179187

180188

181189
def _construct_hive_storage_descriptor(

0 commit comments

Comments
 (0)