Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade dbt to v1.8 #7

Merged
merged 16 commits into from
May 30, 2024
46 changes: 46 additions & 0 deletions .github/workflows/incremental_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Incremental Test

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
services:
databend:
image: datafuselabs/databend
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: true
ports:
- 8000:8000
- 9000:9000
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Pip Install
run: |
pip install pipenv
pip install pytest
pip install -r dev-requirements.txt
pipenv install --dev --skip-lock

- name: Verify Service Running
run: |
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
docker logs ${cid}
curl -v http://localhost:8000/v1/health

- name: dbt databend Incremental Test Suite
run: |
python -m pytest -s tests/functional/adapter/incremental/test_incremental.py
46 changes: 46 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Ci Test

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
services:
databend:
image: datafuselabs/databend
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: true
ports:
- 8000:8000
- 9000:9000
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Pip Install
run: |
pip install pipenv
pip install pytest
pip install -r dev-requirements.txt
pipenv install --dev --skip-lock

- name: Verify Service Running
run: |
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
docker logs ${cid}
curl -v http://localhost:8000/v1/health

- name: dbt databend Unit Test Suite
run: |
python -m pytest -s tests/functional/adapter/unit_testing/test_unit_testing.py
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# dbt-databend Changelog

- This file provides a full account of all changes to `dbt-databend`.
- Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases.
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.

## Previous Releases
For information on prior major and minor releases, see their changelogs:
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ $ pip install dbt-databend-cloud
| ✅ | Sources |
| ✅ | Custom data tests |
| ✅ | Docs generate |
| | Snapshots |
| | Snapshots |
| ✅ | Connection retry |

Note:
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databend/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.4.2"
version = "1.8.1"
10 changes: 8 additions & 2 deletions dbt/adapters/databend/column.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from typing import TypeVar, Optional, Dict, Any

import dbt.exceptions
from dbt.adapters.base.column import Column
from dbt_common.exceptions import DbtRuntimeError

Self = TypeVar("Self", bound="DatabendColumn")

Expand All @@ -14,12 +14,16 @@ def quoted(self) -> str:
return '"{}"'.format(self.column)

def is_string(self) -> bool:
if self.dtype is None:
return False
return self.dtype.lower() in [
"string",
"varchar",
]

def is_integer(self) -> bool:
if self.dtype is None:
return False
return self.dtype.lower().startswith("int") or self.dtype.lower() in (
"tinyint",
"smallint",
Expand All @@ -30,11 +34,13 @@ def is_numeric(self) -> bool:
return False

def is_float(self) -> bool:
if self.dtype is None:
return False
return self.dtype.lower() in ("float", "double")

def string_size(self) -> int:
if not self.is_string():
raise dbt.exceptions.RuntimeException(
raise DbtRuntimeError(
"Called string_size() on non-string field!"
)

Expand Down
58 changes: 35 additions & 23 deletions dbt/adapters/databend/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@
from dataclasses import dataclass

import agate
import dbt.exceptions # noqa
from dbt.adapters.base import Credentials
import dbt_common.exceptions # noqa

from dbt.adapters.exceptions.connection import FailedToConnectError
from dbt.adapters.contracts.connection import AdapterResponse, Connection, Credentials
from dbt_common.clients.agate_helper import empty_table
from dbt.adapters.sql import SQLConnectionManager as connection_cls
from dbt.contracts.connection import AdapterResponse
from dbt.events import AdapterLogger
from dbt.adapters.events.logging import AdapterLogger # type: ignore
from dbt_common.events.functions import warn_or_error
from dbt.adapters.events.types import AdapterEventWarning
from dbt_common.ui import line_wrap_message, warning_tag
from dbt_common.clients.agate_helper import empty_table
from typing import Optional, Tuple, List, Any
from databend_sqlalchemy import connector

from dbt.exceptions import (
Exception,
from dbt_common.exceptions import (
DbtInternalError,
DbtRuntimeError,
DbtConfigError,
)

logger = AdapterLogger("databend")
Expand Down Expand Up @@ -62,7 +69,7 @@ def __post_init__(self):
# databend classifies database and schema as the same thing
self.database = None
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.Exception(
raise DbtRuntimeError(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On Databend, database must be omitted or have the same value as"
Expand All @@ -89,6 +96,11 @@ def _connection_keys(self):
return ("host", "port", "database", "schema", "user")


@dataclass
class DatabendAdapterResponse(AdapterResponse):
query_id: str = ""


class DatabendConnectionManager(connection_cls):
TYPE = "databend"

Expand All @@ -105,7 +117,7 @@ def exception_handler(self, sql: str):
logger.debug("Error running SQL: {}".format(sql))
logger.debug("Rolling back transaction.")
self.rollback_if_open()
raise dbt.exceptions.Exception(str(e))
raise DbtRuntimeError(str(e))

# except for DML statements where explicitly defined
def add_begin_query(self, *args, **kwargs):
Expand Down Expand Up @@ -136,12 +148,6 @@ def open(cls, connection):
credentials = connection.credentials

try:
# handle = mysql.connector.connect(
# # host=credentials.host,
# # port=credentials.port,
# # user=credentials.username,
# # password=credentials.password,
# )
if credentials.secure is None:
credentials.secure = True

Expand All @@ -158,27 +164,30 @@ def open(cls, connection):
logger.debug("Error opening connection: {}".format(e))
connection.handle = None
connection.state = "fail"
raise dbt.exceptions.FailedToConnectException(str(e))
raise FailedToConnectError(str(e))
connection.state = "open"
connection.handle = handle
return connection

@classmethod
def get_response(cls, _):
return "OK"
def get_response(cls, cursor):
return DatabendAdapterResponse(
_message="{} {}".format("adapter response", cursor.rowcount),
rows_affected=cursor.rowcount,
)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
) -> Tuple[AdapterResponse, agate.Table]:
# don't apply the query comment here
# it will be applied after ';' queries are split
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
# table: rows, column_names=None, column_types=None, row_names=None
if fetch:
table = self.get_result_from_cursor(cursor)
table = self.get_result_from_cursor(cursor, limit)
else:
table = dbt.clients.agate_helper.empty_table()
table = dbt_common.clients.agate_helper.empty_table()
return response, table

def add_query(self, sql, auto_begin=False, bindings=None, abridge_sql_log=False):
Expand Down Expand Up @@ -231,13 +240,16 @@ def process_results(cls, column_names, rows):
return [dict(zip(column_names, row)) for row in rows]

@classmethod
def get_result_from_cursor(cls, cursor: Any) -> agate.Table:
def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> agate.Table:
data: List[Any] = []
column_names: List[str] = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
if limit:
rows = cursor.fetchmany(limit)
else:
rows = cursor.fetchall()
data = cls.process_results(column_names, rows)

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)
return dbt_common.clients.agate_helper.table_from_data_flat(data, column_names)
Loading