Skip to content

Commit

Permalink
Clickhouse integration (georgia-tech-db#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
Preethi1609 authored Oct 13, 2023
1 parent 65c6cb9 commit 6a38d27
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ parts:
- file: source/reference/databases/sqlite
- file: source/reference/databases/mysql
- file: source/reference/databases/mariadb
- file: source/reference/databases/clickhouse
- file: source/reference/databases/github

- file: source/reference/vector_databases/index
Expand Down
37 changes: 37 additions & 0 deletions docs/source/reference/databases/clickhouse.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Clickhouse
==========

The connection to Clickhouse is based on the `clickhouse-sqlalchemy <https://pypi.org/project/clickhouse-sqlalchemy/>`_ library.

Dependency
----------

* clickhouse-sqlalchemy



Parameters
----------

Required:

* `user` is the database user.
* `password` is the database password.
* `host` is the host name or IP address.
* `port` is the port used to make TCP/IP connection to the Clickhouse server.
* `database` is the database name.
* `protocol` (optional) Default- `native`. Its supported values are `http` and `https`.


Create Connection
-----------------

.. code-block:: text
CREATE DATABASE clickhouse_data WITH ENGINE = 'clickhouse', PARAMETERS = {
"user": "eva",
"password": "password",
"host": "localhost",
"port": "5432",
"database": "evadb"
};
15 changes: 15 additions & 0 deletions evadb/third_party/databases/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Clickhouse integrations"""
174 changes: 174 additions & 0 deletions evadb/third_party/databases/clickhouse/clickhouse_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from sqlalchemy import create_engine

from evadb.third_party.databases.types import (
DBHandler,
DBHandlerResponse,
DBHandlerStatus,
)


class ClickHouseHandler(DBHandler):
def __init__(self, name: str, **kwargs):
"""
Initialize the handler.
Args:
name (str): name of the DB handler instance
**kwargs: arbitrary keyword arguments for establishing the connection.
"""
super().__init__(name)
self.host = kwargs.get("host")
self.port = kwargs.get("port")
self.user = kwargs.get("user")
self.password = kwargs.get("password")
self.database = kwargs.get("database")
self.protocol = kwargs.get("protocol")
protocols_map = {
"native": "clickhouse+native",
"http": "clickhouse+http",
"https": "clickhouse+https",
}
if self.protocol in protocols_map:
self.protocol = protocols_map[self.protocol]

def connect(self):
"""
Set up the connection required by the handler.
Returns:
DBHandlerStatus
"""
try:
protocol = self.protocol
host = self.host
port = self.port
user = self.user
password = self.password
database = self.database
url = f"{protocol}://{user}:{password}@{host}:{port}/{database}"
if self.protocol == "clickhouse+https":
url = url + "?protocol=https"

engine = create_engine(url)
self.connection = engine.raw_connection()
return DBHandlerStatus(status=True)
except Exception as e:
return DBHandlerStatus(status=False, error=str(e))

def disconnect(self):
"""
Close any existing connections.
"""
if self.connection:
self.disconnect()

def check_connection(self) -> DBHandlerStatus:
"""
Check connection to the handler.
Returns:
DBHandlerStatus
"""
if self.connection:
return DBHandlerStatus(status=True)
else:
return DBHandlerStatus(status=False, error="Not connected to the database.")

def get_tables(self) -> DBHandlerResponse:
"""
Return the list of tables in the database.
Returns:
DBHandlerResponse
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SHOW TABLES FROM {self.connection_data['database']}"
tables_df = pd.read_sql_query(query, self.connection)
return DBHandlerResponse(data=tables_df)
except Exception as e:
return DBHandlerResponse(data=None, error=str(e))

def get_columns(self, table_name: str) -> DBHandlerResponse:
"""
Returns the list of columns for the given table.
Args:
table_name (str): name of the table whose columns are to be retrieved.
Returns:
DBHandlerResponse
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"DESCRIBE {table_name}"
columns_df = pd.read_sql_query(query, self.connection)
columns_df["dtype"] = columns_df["dtype"].apply(
self._clickhouse_to_python_types
)
return DBHandlerResponse(data=columns_df)
except Exception as e:
return DBHandlerResponse(data=None, error=str(e))

def _fetch_results_as_df(self, cursor):
try:
res = cursor.fetchall()
if not res:
return pd.DataFrame({"status": ["success"]})
res_df = pd.DataFrame(res, columns=[desc[0] for desc in cursor.description])
return res_df
except Exception as e:
if str(e) == "no results to fetch":
return pd.DataFrame({"status": ["success"]})
raise e

def execute_native_query(self, query_string: str) -> DBHandlerResponse:
"""
Executes the native query on the database.
Args:
query_string (str): query in native format
Returns:
DBHandlerResponse
"""
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
cursor = self.connection.cursor()
cursor.execute(query_string)
return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
except Exception as e:
return DBHandlerResponse(data=None, error=str(e))

def _clickhouse_to_python_types(self, clickhouse_type: str):
mapping = {
"char": str,
"varchar": str,
"text": str,
"boolean": bool,
"integer": int,
"int": int,
"float": float,
"double": float,
# Add more mappings as needed
}

if clickhouse_type in mapping:
return mapping[clickhouse_type]
else:
raise Exception(
f"Unsupported column {clickhouse_type} encountered in the clickhouse table. Please raise a feature request!"
)
1 change: 1 addition & 0 deletions evadb/third_party/databases/clickhouse/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sqlalchemy
2 changes: 2 additions & 0 deletions evadb/third_party/databases/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def _get_database_handler(engine: str, **kwargs):
return mod.MysqlHandler(engine, **kwargs)
elif engine == "mariadb":
return mod.MariaDbHandler(engine, **kwargs)
elif engine == "clickhouse":
return mod.ClickHouseHandler(engine, **kwargs)
elif engine == "github":
return mod.GithubHandler(engine, **kwargs)
else:
Expand Down
19 changes: 19 additions & 0 deletions test/third_party_tests/test_native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ def test_should_run_query_in_mariadb(self):
self._execute_native_query()
self._execute_evadb_query()

def test_should_run_query_in_clickhouse(self):
# Create database.
params = {
"user": "eva",
"password": "password",
"host": "localhost",
"port": "9000",
"database": "evadb",
}
query = f"""CREATE DATABASE test_data_source
WITH ENGINE = "clickhouse",
PARAMETERS = {params};"""
execute_query_fetch_all(self.evadb, query)

# Test executions.
self._execute_native_query()
self._execute_evadb_query()


def test_should_run_query_in_sqlite(self):
# Create database.
import os
Expand Down
Loading

0 comments on commit 6a38d27

Please sign in to comment.