Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
python-version: "3.11"

- name: Pip Install
run: |
Expand Down
4 changes: 1 addition & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,4 @@ Compatibility
---------------

- If databend version >= v0.9.0 or later, you need to use databend-sqlalchemy version >= v0.1.0.
- The databend-sqlalchemy use [databend-py](https://github.com/datafuselabs/databend-py) as internal driver when version < v0.4.0, but when version >= v0.4.0 it use [databend driver python binding](https://github.com/datafuselabs/bendsql/blob/main/bindings/python/README.md) as internal driver. The only difference between the two is that the connection parameters provided in the DSN are different. When using the corresponding version, you should refer to the connection parameters provided by the corresponding Driver.


- The databend-sqlalchemy use [databend-py](https://github.com/databendlabs/databend-py) as internal driver when version < v0.4.0, but when version >= v0.4.0 it use [databend driver python binding](https://github.com/databendlabs/bendsql/blob/main/bindings/python/README.md) as internal driver. The only difference between the two is that the connection parameters provided in the DSN are different. When using the corresponding version, you should refer to the connection parameters provided by the corresponding Driver.
2 changes: 1 addition & 1 deletion databend_sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python


VERSION = (0, 4, 8)
VERSION = (0, 5, 0)
__version__ = ".".join(str(x) for x in VERSION)
145 changes: 42 additions & 103 deletions databend_sqlalchemy/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
# Many docstrings in this file are based on the PEP, which is in the public domain.
import decimal
import re
import uuid
from datetime import datetime, date, time, timedelta
from databend_sqlalchemy.errors import Error, ServerException, NotSupportedError
from databend_sqlalchemy.errors import Error, NotSupportedError

from databend_driver import BlockingDatabendClient

Expand Down Expand Up @@ -49,7 +48,7 @@ def escape_item(self, item):
elif isinstance(item, decimal.Decimal):
return self.escape_number(item)
elif isinstance(item, timedelta):
return self.escape_string(f'{item.total_seconds()} seconds') + '::interval'
return self.escape_string(f"{item.total_seconds()} seconds") + "::interval"
elif isinstance(item, (datetime, date, time, timedelta)):
return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S"))
else:
Expand Down Expand Up @@ -106,7 +105,7 @@ def commit(self):
pass

def cursor(self):
return Cursor(self.client.get_conn())
return Cursor(self.client.cursor())

def rollback(self):
raise NotSupportedError("Transactions are not supported") # pragma: no cover
Expand All @@ -120,22 +119,8 @@ class Cursor:
visible by other cursors or connections.
"""

_STATE_NONE = None
_STATE_RUNNING = "Running"
_STATE_SUCCEEDED = "Succeeded"

def __init__(self, conn):
self._db = conn
self._reset_state()

def _reset_state(self):
"""Reset state about the previous query in preparation for running another query"""
self._uuid = None
self._rownumber = 0
# Internal helper state
self._state = self._STATE_NONE
self._rows = None
self._columns = None
self.inner = conn

@property
def rowcount(self):
Expand All @@ -159,50 +144,35 @@ def description(self):
The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
section below.
"""
# Sleep until we're done or we got the columns
if self._columns is None:
return []
return self._columns
try:
return self.inner.description
except Exception as e:
raise Error(str(e)) from e

def close(self):
self._reset_state()
try:
self.inner.close()
except Exception as e:
raise Error(str(e)) from e

def mogrify(self, query, parameters):
if parameters:
query = query % _escaper.escape_args(parameters)
return query

def execute(self, operation, parameters=None, is_response=True):
def execute(self, operation, parameters=None):
"""Prepare and execute a database operation (query or command)."""

#ToDo - Fix this, which is preventing the execution of blank DDL sunch as CREATE INDEX statements which aren't currently supported
# ToDo - Fix this, which is preventing the execution of blank DDL sunch as CREATE INDEX statements which aren't currently supported
# Seems hard to fix when statements are coming from metadata.create_all()
if operation == "":
if not operation:
return

self._reset_state()

self._state = self._STATE_RUNNING
self._uuid = uuid.uuid1()

try:
query = self.mogrify(operation, parameters)
query = query.replace('%%', '%')
if is_response:
rows = self._db.query_iter(query)
schema = rows.schema()
columns = []
for field in schema.fields():
columns.append((field.name, field.data_type))
if self._state != self._STATE_RUNNING:
raise Exception("Should be running if processing response")
self._rows = rows
self._columns = columns
self._state = self._STATE_SUCCEEDED
else:
self._db.exec(query)
query = query.replace("%%", "%")
return self.inner.execute(query)
except Exception as e:
# We have to raise dbAPI error
raise Error(str(e)) from e

def executemany(self, operation, seq_of_parameters):
Expand All @@ -224,34 +194,30 @@ def executemany(self, operation, seq_of_parameters):
m = RE_INSERT_VALUES.match(operation)
if m:
try:
q_prefix = m.group(1) #.replace('%%', '%') % ()
q_prefix = m.group(1)
q_values = m.group(2).rstrip()

for parameters in seq_of_parameters:
values_list.append(q_values % _escaper.escape_args(parameters))
query = "{} {};".format(q_prefix, ",".join(values_list))
self._db.exec(query)
return self.inner.execute(query)
except Exception as e:
# We have to raise dbAPI error
raise Error(str(e)) from e
else:
for parameters in seq_of_parameters:
self.execute(operation, parameters, is_response=False)
self.execute(operation, parameters)

def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or ``None`` when
no more data is available."""
if self._state == self._STATE_NONE:
raise Error("No query yet")
if not self._rows:
raise Error("No rows yet")
else:
self._rownumber += 1
try:
row = self._rows.__next__()
except StopIteration:
try:
row = self.inner.fetchone()
if row is None:
return None
return row.values()
except Exception as e:
raise Error(str(e)) from e

def fetchmany(self, size=None):
"""Fetch the next set of rows of a query result, returning a sequence of sequences (e.g. a
Expand All @@ -262,63 +228,36 @@ def fetchmany(self, size=None):
fetch as many rows as indicated by the size parameter. If this is not possible due to the
specified number of rows not being available, fewer rows may be returned.
"""
if self._state == self._STATE_NONE:
raise Error("No query yet")

if size is None:
size = 1

data = []
if self._rows:
for row in self._rows:
self._rownumber += 1
data.append(row.values())
if len(data) == size:
break
return data
try:
rows = self.inner.fetchmany(size)
return [row.values() for row in rows]
except Exception as e:
raise Error(str(e)) from e

def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences
(e.g. a list of tuples).
"""
if self._state == self._STATE_NONE:
raise Error("No query yet")

data = []
if self._rows:
for row in self._rows:
self._rownumber += 1
data.append(row.values())
return data
try:
rows = self.inner.fetchall()
return [row.values() for row in rows]
except Exception as e:
raise Error(str(e)) from e

def __next__(self):
"""Return the next row from the currently executing SQL statement using the same semantics
as :py:meth:`fetchone`. A ``StopIteration`` exception is raised when the result set is
exhausted.
"""
if not self._rows:
raise StopIteration()
n = self._rows.__next__()
return n.values()
try:
return self.inner.__next__()
except StopIteration as e:
raise e
except Exception as e:
raise Error(str(e)) from e

next = fetchone
next = __next__

def __iter__(self):
"""Return self to make cursors compatible to the iteration protocol."""
return self

def cancel(self):
if self._state == self._STATE_NONE:
raise ServerException("No query yet")
if self._uuid is None:
if self._state != self._STATE_RUNNING:
raise ServerException("Query should be running")
return
# Replace current running query to cancel it
self._db.execute("SELECT 1")
self._state = self._STATE_SUCCEEDED
self._uuid = None
self._rows = None

def poll(self):
pass
Loading
Loading