Skip to content

Commit

Permalink
feat(ibis): introduce GCS file connector (#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal authored Feb 5, 2025
1 parent fe58251 commit 50d1240
Show file tree
Hide file tree
Showing 10 changed files with 592 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ibis-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.AWS_REGION }}
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
run: poetry run pytest -m "not bigquery and not snowflake and not canner and not s3_file"
run: poetry run pytest -m "not bigquery and not snowflake and not canner and not s3_file and not gcs_file"
- name: Test bigquery if need
if: contains(github.event.pull_request.labels.*.name, 'bigquery')
env:
Expand Down
1 change: 1 addition & 0 deletions ibis-server/app/mdl/rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def _get_write_dialect(cls, data_source: DataSource) -> str:
DataSource.local_file,
DataSource.s3_file,
DataSource.minio_file,
DataSource.gcs_file,
}:
return "duckdb"
return data_source.name
Expand Down
18 changes: 17 additions & 1 deletion ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class QueryMinioFileDTO(QueryDTO):
connection_info: MinioFileConnectionInfo = connection_info_field


class QueryGcsFileDTO(QueryDTO):
connection_info: GcsFileConnectionInfo = connection_info_field


class BigQueryConnectionInfo(BaseModel):
project_id: SecretStr
dataset_id: SecretStr
Expand Down Expand Up @@ -167,7 +171,7 @@ class S3FileConnectionInfo(BaseModel):


class MinioFileConnectionInfo(BaseModel):
url: SecretStr = Field(description="the root path of the s3 bucket", default="/")
url: SecretStr = Field(description="the root path of the minio bucket", default="/")
format: str = Field(
description="File format", default="csv", examples=["csv", "parquet", "json"]
)
Expand All @@ -180,6 +184,17 @@ class MinioFileConnectionInfo(BaseModel):
secret_key: SecretStr


class GcsFileConnectionInfo(BaseModel):
url: SecretStr = Field(description="the root path of the gcs bucket", default="/")
format: str = Field(
description="File format", default="csv", examples=["csv", "parquet", "json"]
)
bucket: SecretStr
key_id: SecretStr
secret_key: SecretStr
credentials: SecretStr = Field(description="Base64 encode `credentials.json`")


ConnectionInfo = (
BigQueryConnectionInfo
| CannerConnectionInfo
Expand All @@ -192,6 +207,7 @@ class MinioFileConnectionInfo(BaseModel):
| LocalFileConnectionInfo
| S3FileConnectionInfo
| MinioFileConnectionInfo
| GcsFileConnectionInfo
)


Expand Down
6 changes: 5 additions & 1 deletion ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

from app.model import (
ConnectionInfo,
GcsFileConnectionInfo,
MinioFileConnectionInfo,
S3FileConnectionInfo,
UnknownIbisError,
UnprocessableEntityError,
)
from app.model.data_source import DataSource
from app.model.utils import init_duckdb_minio, init_duckdb_s3
from app.model.utils import init_duckdb_gcs, init_duckdb_minio, init_duckdb_s3

# Override datatypes of ibis
importlib.import_module("app.custom_ibis.backends.sql.datatypes")
Expand All @@ -42,6 +43,7 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
DataSource.local_file,
DataSource.s3_file,
DataSource.minio_file,
DataSource.gcs_file,
}:
self._connector = DuckDBConnector(connection_info)
else:
Expand Down Expand Up @@ -167,6 +169,8 @@ def __init__(self, connection_info: ConnectionInfo):
init_duckdb_s3(self.connection, connection_info)
if isinstance(connection_info, MinioFileConnectionInfo):
init_duckdb_minio(self.connection, connection_info)
if isinstance(connection_info, GcsFileConnectionInfo):
init_duckdb_gcs(self.connection, connection_info)

def query(self, sql: str, limit: int) -> pd.DataFrame:
try:
Expand Down
3 changes: 3 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
QueryCannerDTO,
QueryClickHouseDTO,
QueryDTO,
QueryGcsFileDTO,
QueryLocalFileDTO,
QueryMinioFileDTO,
QueryMSSqlDTO,
Expand All @@ -48,6 +49,7 @@ class DataSource(StrEnum):
local_file = auto()
s3_file = auto()
minio_file = auto()
gcs_file = auto()

def get_connection(self, info: ConnectionInfo) -> BaseBackend:
try:
Expand All @@ -74,6 +76,7 @@ class DataSourceExtension(Enum):
local_file = QueryLocalFileDTO
s3_file = QueryS3FileDTO
minio_file = QueryMinioFileDTO
gcs_file = QueryGcsFileDTO

def __init__(self, dto: QueryDTO):
self.dto = dto
Expand Down
2 changes: 2 additions & 0 deletions ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.model.metadata.mssql import MSSQLMetadata
from app.model.metadata.mysql import MySQLMetadata
from app.model.metadata.object_storage import (
GcsFileMetadata,
LocalFileMetadata,
MinioFileMetadata,
S3FileMetadata,
Expand All @@ -26,6 +27,7 @@
DataSource.local_file: LocalFileMetadata,
DataSource.s3_file: S3FileMetadata,
DataSource.minio_file: MinioFileMetadata,
DataSource.gcs_file: GcsFileMetadata,
}


Expand Down
33 changes: 32 additions & 1 deletion ibis-server/app/model/metadata/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from loguru import logger

from app.model import (
GcsFileConnectionInfo,
LocalFileConnectionInfo,
MinioFileConnectionInfo,
S3FileConnectionInfo,
Expand All @@ -17,7 +18,7 @@
TableProperties,
)
from app.model.metadata.metadata import Metadata
from app.model.utils import init_duckdb_minio, init_duckdb_s3
from app.model.utils import init_duckdb_gcs, init_duckdb_minio, init_duckdb_s3


class ObjectStorageMetadata(Metadata):
Expand Down Expand Up @@ -240,3 +241,33 @@ def _get_full_path(self, path):
path = path[1:]

return f"s3://{self.connection_info.bucket.get_secret_value()}/{path}"


class GcsFileMetadata(ObjectStorageMetadata):
def __init__(self, connection_info: GcsFileConnectionInfo):
super().__init__(connection_info)

def get_version(self):
return "GCS"

def _get_connection(self):
conn = duckdb.connect()
init_duckdb_gcs(conn, self.connection_info)
logger.debug("Initialized duckdb minio")
return conn

def _get_dal_operator(self):
info: GcsFileConnectionInfo = self.connection_info

return opendal.Operator(
"gcs",
root=info.url.get_secret_value(),
bucket=info.bucket.get_secret_value(),
credential=info.credentials.get_secret_value(),
)

def _get_full_path(self, path):
if path.startswith("/"):
path = path[1:]

return f"gs://{self.connection_info.bucket.get_secret_value()}/{path}"
24 changes: 23 additions & 1 deletion ibis-server/app/model/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from duckdb import DuckDBPyConnection, HTTPException

from app.model import MinioFileConnectionInfo, S3FileConnectionInfo
from app.model import (
GcsFileConnectionInfo,
MinioFileConnectionInfo,
S3FileConnectionInfo,
)


def init_duckdb_s3(
Expand Down Expand Up @@ -44,3 +48,21 @@ def init_duckdb_minio(
connection.execute("SET s3_use_ssl=?", [connection_info.ssl_enabled])
except HTTPException as e:
raise Exception("Failed to create secret", e)


def init_duckdb_gcs(
connection: DuckDBPyConnection, connection_info: GcsFileConnectionInfo
):
create_secret = f"""
CREATE SECRET wren_gcs (
TYPE GCS,
KEY_ID '{connection_info.key_id.get_secret_value()}',
SECRET '{connection_info.secret_key.get_secret_value()}'
)
"""
try:
result = connection.execute(create_secret).fetchone()
if result is None or not result[0]:
raise Exception("Failed to create secret")
except HTTPException as e:
raise Exception("Failed to create secret", e)
1 change: 1 addition & 0 deletions ibis-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ markers = [
"local_file: mark a test as a local file test",
"s3_file: mark a test as a s3 file test",
"minio_file: mark a test as a minio file test",
"gcs_file: mark a test as a gcs file test",
"beta: mark a test as a test for beta versions of the engine",
]

Expand Down
Loading

0 comments on commit 50d1240

Please sign in to comment.