Skip to content

Commit cb19b61

Browse files
kevinjqliumnzpk
authored andcommitted
Fix kerberized hive client (#1941)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Closes #1744 (second try) First try (#1747) did not fully resolve the issue. See #1747 (review) yes <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: mnzpk <[email protected]>
1 parent 68a328e commit cb19b61

File tree

6 files changed

+2100
-1513
lines changed

6 files changed

+2100
-1513
lines changed

.github/workflows/python-ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ jobs:
5858
python-version: ${{ matrix.python }}
5959
cache: poetry
6060
cache-dependency-path: ./poetry.lock
61+
- name: Install system dependencies
62+
run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos
6163
- name: Install
6264
run: make install-dependencies
6365
- name: Linters

.github/workflows/python-integration.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ jobs:
5050
- uses: actions/checkout@v4
5151
with:
5252
fetch-depth: 2
53+
- name: Install system dependencies
54+
run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos
5355
- name: Install
5456
run: make install
5557
- name: Run integration tests

poetry.lock

Lines changed: 1989 additions & 1508 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/hive.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import logging
1919
import socket
2020
import time
21-
from functools import cached_property
2221
from types import TracebackType
2322
from typing import (
2423
TYPE_CHECKING,
@@ -159,7 +158,6 @@ def _init_thrift_transport(self) -> TTransport:
159158
else:
160159
return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")
161160

162-
@cached_property
163161
def _client(self) -> Client:
164162
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
165163
client = Client(protocol)
@@ -172,11 +170,11 @@ def __enter__(self) -> Client:
172170
if not self._transport.isOpen():
173171
try:
174172
self._transport.open()
175-
except TTransport.TTransportException:
173+
except (TypeError, TTransport.TTransportException):
176174
# reinitialize _transport
177175
self._transport = self._init_thrift_transport()
178176
self._transport.open()
179-
return self._client
177+
return self._client() # recreate the client
180178

181179
def __exit__(
182180
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ cachetools = "^5.5.0"
8282
pyiceberg-core = { version = "^0.4.0", optional = true }
8383
polars = { version = "^1.21.0", optional = true }
8484
thrift-sasl = { version = ">=0.4.3", optional = true }
85+
kerberos = {version = "^1.3.1", optional = true}
8586

8687
[tool.poetry.group.dev.dependencies]
8788
pytest = "7.4.4"
@@ -294,7 +295,7 @@ daft = ["getdaft"]
294295
polars = ["polars"]
295296
snappy = ["python-snappy"]
296297
hive = ["thrift"]
297-
hive-kerberos = ["thrift", "thrift_sasl"]
298+
hive-kerberos = ["thrift", "thrift_sasl", "kerberos"]
298299
s3fs = ["s3fs"]
299300
glue = ["boto3", "mypy-boto3-glue"]
300301
adlfs = ["adlfs"]

tests/catalog/test_hive.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint: disable=protected-access,redefined-outer-name
18+
import base64
1819
import copy
20+
import struct
21+
import threading
1922
import uuid
23+
from collections.abc import Generator
2024
from copy import deepcopy
25+
from typing import Optional
2126
from unittest.mock import MagicMock, call, patch
2227

2328
import pytest
29+
import thrift.transport.TSocket
2430
from hive_metastore.ttypes import (
2531
AlreadyExistsException,
2632
FieldSchema,
@@ -38,11 +44,13 @@
3844

3945
from pyiceberg.catalog import PropertiesUpdateSummary
4046
from pyiceberg.catalog.hive import (
47+
HIVE_KERBEROS_AUTH,
4148
LOCK_CHECK_MAX_WAIT_TIME,
4249
LOCK_CHECK_MIN_WAIT_TIME,
4350
LOCK_CHECK_RETRIES,
4451
HiveCatalog,
4552
_construct_hive_storage_descriptor,
53+
_HiveClient,
4654
)
4755
from pyiceberg.exceptions import (
4856
NamespaceAlreadyExistsError,
@@ -183,6 +191,59 @@ def hive_database(tmp_path_factory: pytest.TempPathFactory) -> HiveDatabase:
183191
)
184192

185193

194+
class SaslServer(threading.Thread):
195+
def __init__(self, socket: thrift.transport.TSocket.TServerSocket, response: bytes) -> None:
196+
super().__init__()
197+
self.daemon = True
198+
self._socket = socket
199+
self._response = response
200+
self._port = None
201+
self._port_bound = threading.Event()
202+
203+
def run(self) -> None:
204+
self._socket.listen()
205+
206+
try:
207+
address = self._socket.handle.getsockname()
208+
# AF_INET addresses are 2-tuples (host, port) and AF_INET6 are
209+
# 4-tuples (host, port, ...), i.e. port is always at index 1.
210+
_host, self._port, *_ = address
211+
finally:
212+
self._port_bound.set()
213+
214+
# Accept connections and respond to each connection with the same message.
215+
# The responsibility for closing the connection is on the client
216+
while True:
217+
try:
218+
client = self._socket.accept()
219+
if client:
220+
client.write(self._response)
221+
client.flush()
222+
except Exception:
223+
pass
224+
225+
@property
226+
def port(self) -> Optional[int]:
227+
self._port_bound.wait()
228+
return self._port
229+
230+
def close(self) -> None:
231+
self._socket.close()
232+
233+
234+
@pytest.fixture(scope="session")
235+
def kerberized_hive_metastore_fake_url() -> Generator[str, None, None]:
236+
server = SaslServer(
237+
# Port 0 means pick any available port.
238+
socket=thrift.transport.TSocket.TServerSocket(port=0),
239+
# Always return a message with status 5 (COMPLETE).
240+
response=struct.pack(">BI", 5, 0),
241+
)
242+
server.start()
243+
yield f"thrift://localhost:{server.port}"
244+
server.close()
245+
246+
186247
def test_no_uri_supplied() -> None:
187248
with pytest.raises(KeyError):
188249
HiveCatalog("production")
@@ -1239,3 +1300,45 @@ def test_create_hive_client_failure() -> None:
12391300
with pytest.raises(Exception, match="Connection failed"):
12401301
HiveCatalog._create_hive_client(properties)
12411302
assert mock_hive_client.call_count == 2
1303+
1304+
1305+
def test_create_hive_client_with_kerberos(
1306+
kerberized_hive_metastore_fake_url: str,
1307+
) -> None:
1308+
properties = {
1309+
"uri": kerberized_hive_metastore_fake_url,
1310+
"ugi": "user",
1311+
HIVE_KERBEROS_AUTH: "true",
1312+
}
1313+
client = HiveCatalog._create_hive_client(properties)
1314+
assert client is not None
1315+
1316+
1317+
def test_create_hive_client_with_kerberos_using_context_manager(
1318+
kerberized_hive_metastore_fake_url: str,
1319+
) -> None:
1320+
client = _HiveClient(
1321+
uri=kerberized_hive_metastore_fake_url,
1322+
kerberos_auth=True,
1323+
)
1324+
with (
1325+
patch(
1326+
"puresasl.mechanisms.kerberos.authGSSClientStep",
1327+
return_value=None,
1328+
),
1329+
patch(
1330+
"puresasl.mechanisms.kerberos.authGSSClientResponse",
1331+
return_value=base64.b64encode(b"Some Response"),
1332+
),
1333+
patch(
1334+
"puresasl.mechanisms.GSSAPIMechanism.complete",
1335+
return_value=True,
1336+
),
1337+
):
1338+
with client as open_client:
1339+
assert open_client._iprot.trans.isOpen()
1340+
1341+
# Use the context manager a second time to see if
1342+
# closing and re-opening work as expected.
1343+
with client as open_client:
1344+
assert open_client._iprot.trans.isOpen()

0 commit comments

Comments
 (0)