Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions databend_sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
NDJSONFormat,
ParquetFormat,
ORCFormat,
AVROFormat,
AmazonS3,
AzureBlobStorage,
GoogleCloudStorage,
Expand Down
3 changes: 3 additions & 0 deletions databend_sqlalchemy/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Many docstrings in this file are based on the PEP, which is in the public domain.
import decimal
import re
import json
from datetime import datetime, date, time, timedelta
from databend_sqlalchemy.errors import Error, NotSupportedError

Expand Down Expand Up @@ -56,6 +57,8 @@ def escape_item(self, item):
return self.escape_string(item.strftime("%Y-%m-%d %H:%M:%S.%f")) + "::timestamp"
elif isinstance(item, date):
return self.escape_string(item.strftime("%Y-%m-%d")) + "::date"
elif isinstance(item, dict):
return self.escape_string(f'parse_json({json.dumps(item)})')
else:
return self.escape_string(item)

Expand Down
47 changes: 45 additions & 2 deletions databend_sqlalchemy/databend_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"Comment",
"CommentBlock",
"Ident",
"IdentVariable",
"ColumnPosition",
"LiteralString",
"LiteralCodeString",
Expand Down Expand Up @@ -119,6 +120,7 @@
"Caret",
"LBrace",
"RBrace",
"Dollar",
"RArrow",
"LongRArrow",
"FatRArrow",
Expand Down Expand Up @@ -187,8 +189,10 @@
"BY",
"BROTLI",
"BZ2",
"BLOCK",
"CALL",
"CASE",
"CASE_SENSITIVE",
"CAST",
"CATALOG",
"CATALOGS",
Expand All @@ -205,6 +209,7 @@
"CONTINUE",
"CHAR",
"COLUMN",
"COLUMN_MATCH_MODE",
"COLUMNS",
"CHARACTER",
"CONFLICT",
Expand All @@ -223,6 +228,7 @@
"DATA",
"DATE",
"DATE_ADD",
"DATE_DIFF",
"DATE_PART",
"DATE_SUB",
"DATE_TRUNC",
Expand Down Expand Up @@ -266,6 +272,7 @@
"ENGINE",
"ENGINES",
"EPOCH",
"MICROSECOND",
"ERROR_ON_COLUMN_COUNT_MISMATCH",
"ESCAPE",
"EXCEPTION_BACKTRACE",
Expand Down Expand Up @@ -295,6 +302,7 @@
"FORMAT_NAME",
"FORMATS",
"FRAGMENTS",
"FRIDAY",
"FROM",
"FULL",
"FUNCTION",
Expand All @@ -305,12 +313,14 @@
"GET",
"GENERATED",
"GEOMETRY",
"GEOGRAPHY",
"GLOBAL",
"GRAPH",
"GROUP",
"GZIP",
"HAVING",
"HIGH",
"HILBERT",
"HISTORY",
"HIVE",
"HOUR",
Expand All @@ -321,6 +331,7 @@
"IDENTIFIER",
"IF",
"IN",
"INCLUDE_QUERY_ID",
"INCREMENTAL",
"INDEX",
"INFORMATION",
Expand All @@ -336,6 +347,9 @@
"INTERVAL",
"INTO",
"INVERTED",
"PREVIOUS_DAY",
"PROCEDURE",
"PROCEDURES",
"IMMEDIATE",
"IS",
"ISODOW",
Expand All @@ -346,7 +360,9 @@
"JWT",
"KEY",
"KILL",
"LAST_DAY",
"LATERAL",
"LINEAR",
"LOCATION_PREFIX",
"LOCKS",
"LOGICAL",
Expand Down Expand Up @@ -378,6 +394,7 @@
"MODIFY",
"MATERIALIZED",
"MUST_CHANGE_PASSWORD",
"NEXT_DAY",
"NON_DISPLAY",
"NATURAL",
"NETWORK",
Expand Down Expand Up @@ -430,6 +447,7 @@
"PRIORITY",
"PURGE",
"PUT",
"PARTIAL",
"QUARTER",
"QUERY",
"QUOTE",
Expand All @@ -445,6 +463,7 @@
"REPLACE",
"RETURN_FAILED_ONLY",
"REVERSE",
"SAMPLE",
"MERGE",
"MATCHED",
"MISSING_FIELD_AS",
Expand Down Expand Up @@ -475,6 +494,8 @@
"RLIKE",
"RAW",
"OPTIMIZED",
"DECORRELATED",
"SATURDAY",
"SCHEMA",
"SCHEMAS",
"SECOND",
Expand All @@ -487,6 +508,7 @@
"UNSET",
"SESSION",
"SETTINGS",
"VARIABLES",
"STAGES",
"STATISTIC",
"SUMMARY",
Expand All @@ -497,6 +519,7 @@
"SINGLE",
"SIZE_LIMIT",
"MAX_FILES",
"MONDAY",
"SKIP_HEADER",
"SMALLINT",
"SNAPPY",
Expand All @@ -505,6 +528,7 @@
"STAGE",
"SYNTAX",
"USAGE",
"USE_RAW_PATH",
"UPDATE",
"UPLOAD",
"SEQUENCE",
Expand Down Expand Up @@ -534,6 +558,7 @@
"TENANTS",
"TENANT",
"THEN",
"THURSDAY",
"TIMESTAMP",
"TIMEZONE_HOUR",
"TIMEZONE_MINUTE",
Expand All @@ -548,6 +573,7 @@
"TRUNCATE",
"TRY_CAST",
"TSV",
"TUESDAY",
"TUPLE",
"TYPE",
"UNBOUNDED",
Expand All @@ -567,11 +593,12 @@
"USING",
"VACUUM",
"VALUES",
"VALIDATION_MODE",
"VARBINARY",
"VARCHAR",
"VARIANT",
"VARIABLE",
"VERBOSE",
"GRAPHICAL",
"VIEW",
"VIEWS",
"VIRTUAL",
Expand Down Expand Up @@ -605,6 +632,7 @@
"UDF",
"HANDLER",
"LANGUAGE",
"STATE",
"TASK",
"TASKS",
"TOP",
Expand All @@ -620,6 +648,7 @@
"INTEGRATION",
"ENABLED",
"WEBHOOK",
"WEDNESDAY",
"ERROR_INTEGRATION",
"AUTO_INGEST",
"PIPE_EXECUTION_PAUSED",
Expand All @@ -632,8 +661,21 @@
"ABORT",
"ROLLBACK",
"TEMPORARY",
"TEMP",
"SECONDS",
"DAYS",
"DICTIONARY",
"DICTIONARIES",
"PRIMARY",
"SOURCE",
"SQL",
"SUNDAY",
"WAREHOUSES",
"INSPECT",
"ASSIGN",
"NODES",
"UNASSIGN",
"ONLINE",
}


Expand Down Expand Up @@ -998,7 +1040,8 @@ def visit_copy_into(self, copy_into, **kw):

result = f"COPY INTO {target}" f" FROM {source}"
if hasattr(copy_into, "files") and isinstance(copy_into.files, list):
result += f"FILES = {', '.join([f for f in copy_into.files])}"
quoted_files = [f"'{f}'" for f in copy_into.files]
result += f" FILES = ({', '.join(quoted_files)})"
if hasattr(copy_into, "pattern") and copy_into.pattern:
result += f" PATTERN = '{copy_into.pattern}'"
if not isinstance(copy_into.file_format, NoneType):
Expand Down
38 changes: 38 additions & 0 deletions databend_sqlalchemy/dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class Compression(Enum):
DEFLATE = "DEFLATE"
RAW_DEFLATE = "RAW_DEFLATE"
XZ = "XZ"
SNAPPY = "SNAPPY"


class CopyFormat(ClauseElement):
Expand Down Expand Up @@ -401,6 +402,30 @@ def __init__(
class ParquetFormat(CopyFormat):
format_type = "PARQUET"

def __init__(
self,
*,
missing_field_as: str = None,
compression: Compression = None,
):
super().__init__()
if missing_field_as:
if missing_field_as not in ["ERROR", "FIELD_DEFAULT"]:
raise TypeError(
'Missing Field As should be "ERROR" or "FIELD_DEFAULT".'
)
self.options["MISSING_FIELD_AS"] = f"{missing_field_as}"
if compression:
if compression not in [Compression.ZSTD, Compression.SNAPPY]:
raise TypeError(
'Compression should be None, ZStd, or Snappy.'
)
self.options["COMPRESSION"] = compression.value


class AVROFormat(CopyFormat):
format_type = "AVRO"

def __init__(
self,
*,
Expand All @@ -418,6 +443,19 @@ def __init__(
class ORCFormat(CopyFormat):
format_type = "ORC"

def __init__(
self,
*,
missing_field_as: str = None,
):
super().__init__()
if missing_field_as:
if missing_field_as not in ["ERROR", "FIELD_DEFAULT"]:
raise TypeError(
'Missing Field As should be "ERROR" or "FIELD_DEFAULT".'
)
self.options["MISSING_FIELD_AS"] = f"{missing_field_as}"


class StageClause(ClauseElement, FromClauseRole):
"""Stage Clause"""
Expand Down
34 changes: 33 additions & 1 deletion tests/test_copy_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,38 @@ def test_copy_into_table_sub_select_column_clauses(self):
checkparams={"1_1": "xyz", "IF_1": "NULL", "IF_2": "NOTNULL"},
)

def test_copy_into_table_files(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
schema="test_schema",
)

copy_into = CopyIntoTable(
target=tbl,
from_=GoogleCloudStorage(
uri="gcs://some-bucket/a/path/to/files",
credentials="XYZ",
),
files=['one','two','three'],
file_format=CSVFormat(),
)

self.assert_compile(
copy_into,
(
"COPY INTO test_schema.atable"
" FROM 'gcs://some-bucket/a/path/to/files' "
"CONNECTION = ("
" ENDPOINT_URL = 'https://storage.googleapis.com' "
" CREDENTIAL = 'XYZ' "
") FILES = ('one', 'two', 'three')"
" FILE_FORMAT = (TYPE = CSV)"
),
)


class CopyIntoResultTest(fixtures.TablesTest):
run_create_tables = "each"
Expand Down Expand Up @@ -204,7 +236,7 @@ def test_copy_into_stage_and_table(self, connection):
eq_(r.rowcount, 1000)
copy_into_results = r.context.copy_into_location_results()
eq_(copy_into_results['rows_unloaded'], 1000)
eq_(copy_into_results['input_bytes'], 16250)
# eq_(copy_into_results['input_bytes'], 16250) # input bytes will differ, the table is random
# eq_(copy_into_results['output_bytes'], 4701) # output bytes differs

# now copy into table
Expand Down