Skip to content

Commit

Permalink
Add dialect options for engine, cluster keys and transient
Browse files Browse the repository at this point in the history
  • Loading branch information
rad-pat committed Apr 27, 2024
1 parent 0c8d54e commit 3a62596
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 0 deletions.
49 changes: 49 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,52 @@ The Merge command can be used as below::
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)

Table Options
---------------------

Databend SQLAlchemy supports databend specific table options for Engine, Cluster Keys and Transient tables

The table options can be used as below::

from sqlalchemy import Table, Column
from sqlalchemy import MetaData, create_engine

engine = create_engine(db.url, echo=False)

meta = MetaData()
# Example of Transient Table
t_transient = Table(
"t_transient",
meta,
Column("c1", Integer),
databend_transient=True,
)

# Example of Engine
t_engine = Table(
"t_engine",
meta,
Column("c1", Integer),
databend_engine='Memory',
)

# Examples of Table with Cluster Keys
t_cluster_1 = Table(
"t_cluster_1",
meta,
Column("c1", Integer),
databend_cluster_by=[c1],
)
#
c = Column("id", Integer)
c2 = Column("Name", String)
t_cluster_2 = Table(
't_cluster_2',
meta,
c,
c2,
databend_cluster_by=[cast(c, String), c2],
)

meta.create_all(engine)
106 changes: 106 additions & 0 deletions databend_sqlalchemy/databend_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@
#
# Note: parts of the file come from https://github.com/snowflakedb/snowflake-sqlalchemy
# licensed under the same Apache 2.0 License

"""
Databend Table Options
------------------------
Several options for CREATE TABLE are supported directly by the Databend
dialect in conjunction with the :class:`_schema.Table` construct:
* ``ENGINE``::
Table("some_table", metadata, ..., databend_engine=FUSE|Memory|Random|Iceberg|Delta)
* ``CLUSTER KEY``::
Table("some_table", metadata, ..., databend_cluster_by=str|LIST(expr|str))
* ``TRANSIENT``::
Table("some_table", metadata, ..., databend_transient=True|False)
"""

import decimal
import re
import operator
Expand Down Expand Up @@ -369,6 +391,45 @@ def visit_drop_schema(self, drop, **kw):
schema = self.preparer.format_schema(drop.element)
return "DROP SCHEMA " + schema

def visit_create_table(self, create, **kw):
table = create.element
db_opts = table.dialect_options["databend"]
if "transient" in db_opts and db_opts["transient"]:
if "transient" not in [p.lower() for p in table._prefixes]:
table._prefixes.append("TRANSIENT")
return super().visit_create_table(create, **kw)

def post_create_table(self, table):
table_opts = []
db_opts = table.dialect_options["databend"]

engine = db_opts.get("engine")
if engine is not None:
table_opts.append(f" ENGINE={engine}")

cluster_keys = db_opts.get("cluster_by")
if cluster_keys is not None:
if isinstance(cluster_keys, str):
cluster_by = cluster_keys
elif isinstance(cluster_keys, list):
cluster_by = ", ".join(
self.sql_compiler.process(
expr if not isinstance(expr, str) else table.c[expr],
include_table=False,
literal_binds=True,
)
for expr in cluster_keys
)
else:
cluster_by = ''
table_opts.append(
f"\n CLUSTER BY ( {cluster_by} )"
)

#ToDo - Engine options

return " ".join(table_opts)


class DatabendDialect(default.DefaultDialect):
name = "databend"
Expand Down Expand Up @@ -615,6 +676,51 @@ def get_view_names(self, connection, schema=None, **kw):
result = connection.execute(query, dict(schema_name=schema))
return [row[0] for row in result]

@reflection.cache
def get_table_options(self, connection, table_name, schema=None, **kw):
options = {}

# transient??
# engine: str
# cluster_by: list[expr]
# engine_options: dict

# engine_regex = r'ENGINE=(\w+)'
# cluster_key_regex = r'CLUSTER BY \((.*)\)'

query = text(
"""
SELECT engine_full, cluster_by, is_transient
FROM system.tables
WHERE database = :schema_name
and name = :table_name
"""
).bindparams(
bindparam("table_name", type_=sqltypes.Unicode),
bindparam("schema_name", type_=sqltypes.Unicode)
)
if schema is None:
schema = self.default_schema_name

result = connection.execute(query, dict(table_name=table_name, schema_name=schema)).one_or_none()
if not result:
raise NoSuchTableError(
f'{self.identifier_preparer.quote_identifier(schema)}.'
f'{self.identifier_preparer.quote_identifier(table_name)}'
)

if result.engine_full:
options["databend_engine"] = result.engine_full
if result.cluster_by:
cluster_by = re.match(r'\((.*)\)', result.cluster_by).group(1)
options["databend_cluster_by"] = cluster_by
if result.is_transient:
options["databend_is_transient"] = result.is_transient

# engine options

return options

def do_rollback(self, dbapi_connection):
# No transactions
pass
Expand Down
5 changes: 5 additions & 0 deletions databend_sqlalchemy/requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,8 @@ def json_type(self):
"""target platform implements a native JSON type."""

return exclusions.closed() #ToDo - This could be enabled if primary keys were supported

@property
def reflect_table_options(self):
"""Target database must support reflecting table_options."""
return exclusions.open()
192 changes: 192 additions & 0 deletions tests/test_table_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@

from sqlalchemy.testing import config, fixture, fixtures, util
from sqlalchemy.testing.assertions import AssertsCompiledSQL
from sqlalchemy import Table, Column, Integer, String, func, MetaData, schema, cast


class CompileDatabendTableOptionsTest(fixtures.TestBase, AssertsCompiledSQL):

__only_on__ = "databend"

def test_create_table_transient_on(self):
m = MetaData()
tbl = Table(
'atable', m, Column("id", Integer),
databend_transient=True,
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TRANSIENT TABLE atable (id INTEGER)")

def test_create_table_transient_off(self):
m = MetaData()
tbl = Table(
'atable', m, Column("id", Integer),
databend_transient=False,
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER)")

def test_create_table_engine(self):
m = MetaData()
tbl = Table(
'atable', m, Column("id", Integer),
databend_engine='Memory',
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) ENGINE=Memory")

def test_create_table_cluster_by_column_str(self):
m = MetaData()
tbl = Table(
'atable', m, Column("id", Integer),
databend_cluster_by='id',
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) CLUSTER BY ( id )")

def test_create_table_cluster_by_column_strs(self):
m = MetaData()
tbl = Table(
'atable', m, Column("id", Integer), Column("Name", String),
databend_cluster_by=['id', 'Name'],
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, \"Name\" VARCHAR) CLUSTER BY ( id, \"Name\" )")

def test_create_table_cluster_by_column_object(self):
m = MetaData()
c = Column("id", Integer)
tbl = Table(
'atable', m, c,
databend_cluster_by=[c],
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) CLUSTER BY ( id )")

def test_create_table_cluster_by_column_objects(self):
m = MetaData()
c = Column("id", Integer)
c2 = Column("Name", String)
tbl = Table(
'atable', m, c, c2,
databend_cluster_by=[c, c2],
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, \"Name\" VARCHAR) CLUSTER BY ( id, \"Name\" )")

def test_create_table_cluster_by_column_expr(self):
m = MetaData()
c = Column("id", Integer)
c2 = Column("Name", String)
tbl = Table(
'atable', m, c, c2,
databend_cluster_by=[cast(c, String), c2],
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, \"Name\" VARCHAR) CLUSTER BY ( CAST(id AS VARCHAR), \"Name\" )")

def test_create_table_cluster_by_str(self):
m = MetaData()
c = Column("id", Integer)
c2 = Column("Name", String)
tbl = Table(
'atable', m, c, c2,
databend_cluster_by="CAST(id AS VARCHAR), \"Name\"",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, \"Name\" VARCHAR) CLUSTER BY ( CAST(id AS VARCHAR), \"Name\" )")

#ToDo
# def test_create_table_with_options(self):
# m = MetaData()
# tbl = Table(
# 'atable', m, Column("id", Integer),
# databend_engine_options=(
# ("compression", "snappy"),
# ("storage_format", "parquet"),
# ))
# self.assert_compile(
# schema.CreateTable(tbl),
# "CREATE TABLE atable (id INTEGER)COMPRESSION=\"snappy\" STORAGE_FORMAT=\"parquet\"")


class ReflectDatabendTableOptionsTest(fixtures.TablesTest):
__backend__ = True
__only_on__ = "databend"

# 'once', 'each', None
run_inserts = "None"

# 'each', None
run_deletes = "None"

@classmethod
def define_tables(cls, metadata):
Table(
"t2_engine",
metadata,
Column("id", Integer, primary_key=True),
Column("Name", String),
databend_engine="Memory",
)
c2 = Column("id", Integer, primary_key=True)
Table(
"t2_cluster_by_column",
metadata,
c2,
Column("Name", String),
databend_cluster_by=[c2, "Name"],
)
c3 = Column("id", Integer, primary_key=True)
Table(
"t3_cluster_by_expr",
metadata,
c3,
Column("Name", String),
databend_cluster_by=[cast(c3, String), "Name"],
)
c4 = Column("id", Integer, primary_key=True)
Table(
"t4_cluster_by_str",
metadata,
c4,
Column("Name", String),
databend_cluster_by='CAST(id AS STRING), "Name"',
)

def test_reflect_table_engine(self):
m2 = MetaData()
t1_ref = Table(
"t2_engine", m2, autoload_with=config.db
)
assert t1_ref.dialect_options['databend']['engine'] == 'MEMORY'

def test_reflect_table_cluster_by_column(self):
m2 = MetaData()
t2_ref = Table(
"t2_cluster_by_column", m2, autoload_with=config.db
)
assert t2_ref.dialect_options['databend']['cluster_by'] == 'id, "Name"'

def test_reflect_table_cluster_by_expr(self):
m2 = MetaData()
t3_ref = Table(
"t3_cluster_by_expr", m2, autoload_with=config.db
)
assert t3_ref.dialect_options['databend']['cluster_by'] == 'CAST(id AS STRING), "Name"'

def test_reflect_table_cluster_by_str(self):
m2 = MetaData()
t4_ref = Table(
"t4_cluster_by_str", m2, autoload_with=config.db
)
assert t4_ref.dialect_options['databend']['cluster_by'] == 'CAST(id AS STRING), "Name"'

0 comments on commit 3a62596

Please sign in to comment.