Skip to content

Added functionality for export of failure logs #591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 48 commits into
base: telemetry
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
65a75f4
added functionality for export of failure logs
saishreeeee Jun 10, 2025
5305308
changed logger.error to logger.debug in exc.py
saishreeeee Jun 11, 2025
ba83c33
Fix telemetry loss during Python shutdown
saishreeeee Jun 11, 2025
131db92
unit tests for export_failure_log
saishreeeee Jun 12, 2025
3abc40d
try-catch blocks to make telemetry failures non-blocking for connecto…
saishreeeee Jun 12, 2025
ffa4787
removed redundant try/catch blocks, added try/catch block to initiali…
saishreeeee Jun 12, 2025
cc077f3
skip null fields in telemetry request
saishreeeee Jun 12, 2025
2c6fd44
removed dup import, renamed func, changed a filter_null_values to lamda
saishreeeee Jun 12, 2025
89540a1
removed unnecassary class variable and a redundant try/except block
saishreeeee Jun 12, 2025
52a1152
public functions defined at interface level
saishreeeee Jun 12, 2025
3dcdcfa
changed export_event and flush to private functions
saishreeeee Jun 13, 2025
b2714c9
formatting
saishreeeee Jun 13, 2025
377a87b
changed connection_uuid to thread local in thrift backend
saishreeeee Jun 13, 2025
c9376b8
made errors more specific
saishreeeee Jun 13, 2025
bbfadf2
revert change to connection_uuid
saishreeeee Jun 13, 2025
9bce26b
reverting change in close in telemetry client
saishreeeee Jun 13, 2025
ef4514d
JsonSerializableMixin
saishreeeee Jun 13, 2025
8924835
isdataclass check in JsonSerializableMixin
saishreeeee Jun 13, 2025
65361e7
convert TelemetryClientFactory to module-level functions, replace Noo…
saishreeeee Jun 16, 2025
1722a77
renamed connection_uuid as session_id_hex
saishreeeee Jun 16, 2025
e841434
added NotImplementedError to abstract class, added unit tests
saishreeeee Jun 16, 2025
2f89266
formatting
saishreeeee Jun 16, 2025
5564bbb
added PEP-249 link, changed NoopTelemetryClient implementation
saishreeeee Jun 17, 2025
1e4e8cf
removed unused import
saishreeeee Jun 17, 2025
55b29bc
made telemetry client close a module-level function
saishreeeee Jun 17, 2025
93bf170
unit tests verbose
saishreeeee Jun 17, 2025
45f5ccf
debug logs in unit tests
saishreeeee Jun 17, 2025
8ff1c1f
debug logs in unit tests
saishreeeee Jun 17, 2025
8bdd324
removed ABC from mixin, added try/catch block around executor shutdown
saishreeeee Jun 17, 2025
f99f7ea
checking stuff
saishreeeee Jun 17, 2025
b972c8a
finding out
saishreeeee Jun 17, 2025
7ca3636
finding out more
saishreeeee Jun 17, 2025
0ac8ed2
more more finding out more nice
saishreeeee Jun 17, 2025
c457a09
locks are useless anyways
saishreeeee Jun 17, 2025
5f07a84
haha
saishreeeee Jun 17, 2025
1115e25
normal
saishreeeee Jun 17, 2025
de1ed87
:= looks like walrus horizontally
saishreeeee Jun 17, 2025
554aeaf
one more
saishreeeee Jun 17, 2025
fffac5f
walrus again
saishreeeee Jun 17, 2025
b77208a
old stuff without walrus seems to fail
saishreeeee Jun 17, 2025
733c288
manually do the walrussing
saishreeeee Jun 17, 2025
ca8b958
change 3.13t, v2
saishreeeee Jun 17, 2025
3eabac9
formatting, added walrus
saishreeeee Jun 17, 2025
fb9ef43
formatting
saishreeeee Jun 17, 2025
1e795aa
removed walrus, removed test before stalling test
saishreeeee Jun 17, 2025
2c293a5
changed order of stalling test
saishreeeee Jun 18, 2025
d237255
removed debugging, added TelemetryClientFactory
saishreeeee Jun 18, 2025
f101b19
remove more debugging
saishreeeee Jun 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 72 additions & 29 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
OperationalError,
SessionAlreadyClosedError,
CursorAlreadyClosedError,
InterfaceError,
NotSupportedError,
ProgrammingError,
)
from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.thrift_backend import ThriftBackend
Expand Down Expand Up @@ -50,8 +53,8 @@
TOperationState,
)
from databricks.sql.telemetry.telemetry_client import (
TelemetryClientFactory,
TelemetryHelper,
TelemetryClientFactory,
)
from databricks.sql.telemetry.models.enums import DatabricksClientType
from databricks.sql.telemetry.models.event import (
Expand Down Expand Up @@ -305,13 +308,13 @@ def read(self) -> Optional[OAuthToken]:

TelemetryClientFactory.initialize_telemetry_client(
telemetry_enabled=self.telemetry_enabled,
connection_uuid=self.get_session_id_hex(),
session_id_hex=self.get_session_id_hex(),
auth_provider=auth_provider,
host_url=self.host,
)

self._telemetry_client = TelemetryClientFactory.get_telemetry_client(
connection_uuid=self.get_session_id_hex()
session_id_hex=self.get_session_id_hex()
)

driver_connection_params = DriverConnectionParameters(
Expand Down Expand Up @@ -421,7 +424,10 @@ def cursor(
Will throw an Error if the connection has been closed.
"""
if not self.open:
raise Error("Cannot create cursor from closed connection")
raise InterfaceError(
"Cannot create cursor from closed connection",
session_id_hex=self.get_session_id_hex(),
)

cursor = Cursor(
self,
Expand Down Expand Up @@ -464,14 +470,17 @@ def _close(self, close_cursors=True) -> None:

self.open = False

self._telemetry_client.close()
TelemetryClientFactory.close(self.get_session_id_hex())

def commit(self):
"""No-op because Databricks does not support transactions"""
pass

def rollback(self):
raise NotSupportedError("Transactions are not supported on Databricks")
raise NotSupportedError(
"Transactions are not supported on Databricks",
session_id_hex=self.get_session_id_hex(),
)


class Cursor:
Expand Down Expand Up @@ -523,7 +532,10 @@ def __iter__(self):
for row in self.active_result_set:
yield row
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def _determine_parameter_approach(
self, params: Optional[TParameterCollection]
Expand Down Expand Up @@ -660,7 +672,10 @@ def _close_and_clear_active_result_set(self):

def _check_not_closed(self):
if not self.open:
raise Error("Attempting operation on closed cursor")
raise InterfaceError(
"Attempting operation on closed cursor",
session_id_hex=self.connection.get_session_id_hex(),
)

def _handle_staging_operation(
self, staging_allowed_local_path: Union[None, str, List[str]]
Expand All @@ -677,8 +692,9 @@ def _handle_staging_operation(
elif isinstance(staging_allowed_local_path, type(list())):
_staging_allowed_local_paths = staging_allowed_local_path
else:
raise Error(
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands"
raise ProgrammingError(
"You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands",
session_id_hex=self.connection.get_session_id_hex(),
)

abs_staging_allowed_local_paths = [
Expand Down Expand Up @@ -706,8 +722,9 @@ def _handle_staging_operation(
else:
continue
if not allow_operation:
raise Error(
"Local file operations are restricted to paths within the configured staging_allowed_local_path"
raise ProgrammingError(
"Local file operations are restricted to paths within the configured staging_allowed_local_path",
session_id_hex=self.connection.get_session_id_hex(),
)

# May be real headers, or could be json string
Expand Down Expand Up @@ -735,9 +752,10 @@ def _handle_staging_operation(
handler_args.pop("local_file")
return self._handle_staging_remove(**handler_args)
else:
raise Error(
raise ProgrammingError(
f"Operation {row.operation} is not supported. "
+ "Supported operations are GET, PUT, and REMOVE"
+ "Supported operations are GET, PUT, and REMOVE",
session_id_hex=self.connection.get_session_id_hex(),
)

def _handle_staging_put(
Expand All @@ -749,7 +767,10 @@ def _handle_staging_put(
"""

if local_file is None:
raise Error("Cannot perform PUT without specifying a local_file")
raise ProgrammingError(
"Cannot perform PUT without specifying a local_file",
session_id_hex=self.connection.get_session_id_hex(),
)

with open(local_file, "rb") as fh:
r = requests.put(url=presigned_url, data=fh, headers=headers)
Expand All @@ -765,8 +786,9 @@ def _handle_staging_put(
# fmt: on

if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
raise OperationalError(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
session_id_hex=self.connection.get_session_id_hex(),
)

if r.status_code == ACCEPTED:
Expand All @@ -784,15 +806,19 @@ def _handle_staging_get(
"""

if local_file is None:
raise Error("Cannot perform GET without specifying a local_file")
raise ProgrammingError(
"Cannot perform GET without specifying a local_file",
session_id_hex=self.connection.get_session_id_hex(),
)

r = requests.get(url=presigned_url, headers=headers)

# response.ok verifies the status code is not between 400-600.
# Any 2xx or 3xx will evaluate r.ok == True
if not r.ok:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
raise OperationalError(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
session_id_hex=self.connection.get_session_id_hex(),
)

with open(local_file, "wb") as fp:
Expand All @@ -806,8 +832,9 @@ def _handle_staging_remove(
r = requests.delete(url=presigned_url, headers=headers)

if not r.ok:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
raise OperationalError(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}",
session_id_hex=self.connection.get_session_id_hex(),
)

def execute(
Expand Down Expand Up @@ -1005,8 +1032,9 @@ def get_async_execution_result(self):

return self
else:
raise Error(
f"get_execution_result failed with Operation status {operation_state}"
raise OperationalError(
f"get_execution_result failed with Operation status {operation_state}",
session_id_hex=self.connection.get_session_id_hex(),
)

def executemany(self, operation, seq_of_parameters):
Expand Down Expand Up @@ -1156,7 +1184,10 @@ def fetchall(self) -> List[Row]:
if self.active_result_set:
return self.active_result_set.fetchall()
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def fetchone(self) -> Optional[Row]:
"""
Expand All @@ -1170,7 +1201,10 @@ def fetchone(self) -> Optional[Row]:
if self.active_result_set:
return self.active_result_set.fetchone()
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def fetchmany(self, size: int) -> List[Row]:
"""
Expand All @@ -1192,21 +1226,30 @@ def fetchmany(self, size: int) -> List[Row]:
if self.active_result_set:
return self.active_result_set.fetchmany(size)
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def fetchall_arrow(self) -> "pyarrow.Table":
self._check_not_closed()
if self.active_result_set:
return self.active_result_set.fetchall_arrow()
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def fetchmany_arrow(self, size) -> "pyarrow.Table":
self._check_not_closed()
if self.active_result_set:
return self.active_result_set.fetchmany_arrow(size)
else:
raise Error("There is no active result set")
raise ProgrammingError(
"There is no active result set",
session_id_hex=self.connection.get_session_id_hex(),
)

def cancel(self) -> None:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/databricks/sql/exc.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
import json
import logging

logger = logging.getLogger(__name__)
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory

logger = logging.getLogger(__name__)

### PEP-249 Mandated ###
# https://peps.python.org/pep-0249/#exceptions
class Error(Exception):
"""Base class for DB-API2.0 exceptions.
`message`: An optional user-friendly error message. It should be short, actionable and stable
`context`: Optional extra context about the error. MUST be JSON serializable
"""

def __init__(self, message=None, context=None, *args, **kwargs):
def __init__(
self, message=None, context=None, session_id_hex=None, *args, **kwargs
):
super().__init__(message, *args, **kwargs)
self.message = message
self.context = context or {}

error_name = self.__class__.__name__
if session_id_hex:
telemetry_client = TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
telemetry_client.export_failure_log(error_name, self.message)

def __str__(self):
return self.message

Expand Down
Loading
Loading