diff --git a/CHANGES.md b/CHANGES.md index 3db9c11..24d263c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## In progress - Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`. - Improve write operations to be closer to `target-postgres`. +- Switch to new SQLAlchemy dialect for CrateDB. ## 2023-12-08 v0.0.1 - Make it work. It can run the canonical Meltano GitHub -> DB example. diff --git a/pyproject.toml b/pyproject.toml index 6def6af..8f36b3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,10 +92,10 @@ dynamic = [ "version", ] dependencies = [ - "crate[sqlalchemy]", "cratedb-toolkit", 'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9", "meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector", + "sqlalchemy-cratedb[vector]@ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/type-float-vector", ] [project.optional-dependencies] all = [ diff --git a/target_cratedb/connector.py b/target_cratedb/connector.py index 9f7e355..4f2e02b 100644 --- a/target_cratedb/connector.py +++ b/target_cratedb/connector.py @@ -6,13 +6,14 @@ from datetime import datetime import sqlalchemy as sa -from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray from singer_sdk import typing as th from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type +from sqlalchemy_cratedb.type import FloatVector, ObjectType +from sqlalchemy_cratedb.type.array import _ObjectArray +from sqlalchemy_cratedb.type.object import ObjectTypeImpl from target_postgres.connector import NOTYPE, PostgresConnector from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine -from target_cratedb.sqlalchemy.vector import FloatVector class CrateDBConnector(PostgresConnector): @@ -225,6 +226,9 @@ def _get_type_sort_key( if isinstance(sql_type, NOTYPE): return 0, _len + if not hasattr(sql_type, "python_type"): + raise TypeError(f"Resolving type for sort key failed: {sql_type}") + _pytype = t.cast(type, sql_type.python_type) if issubclass(_pytype, (str, bytes)): return 900, _len diff --git a/target_cratedb/sqlalchemy/patch.py b/target_cratedb/sqlalchemy/patch.py index 3dbba6c..6f56d67 100644 --- a/target_cratedb/sqlalchemy/patch.py +++ b/target_cratedb/sqlalchemy/patch.py @@ -3,9 +3,8 @@ import sqlalchemy as sa from _decimal import Decimal from crate.client.http import CrateJsonEncoder -from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime -from crate.client.sqlalchemy.types import _ObjectArray -from sqlalchemy.sql import sqltypes +from sqlalchemy_cratedb.dialect import TYPES_MAP, DateTime +from sqlalchemy_cratedb.type.array import _ObjectArray def patch_sqlalchemy(): @@ -19,20 +18,21 @@ def patch_types(): TODO: Upstream to crate-python. """ - TYPES_MAP["bigint"] = sqltypes.BIGINT - TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT) - TYPES_MAP["long"] = sqltypes.BIGINT - TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT) - TYPES_MAP["real"] = sqltypes.DOUBLE - TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE) - TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP - TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP + # abc() + TYPES_MAP["bigint"] = sa.BIGINT + TYPES_MAP["bigint_array"] = sa.ARRAY(sa.BIGINT) + TYPES_MAP["long"] = sa.BIGINT + TYPES_MAP["long_array"] = sa.ARRAY(sa.BIGINT) + TYPES_MAP["real"] = sa.DOUBLE + TYPES_MAP["real_array"] = sa.ARRAY(sa.DOUBLE) + TYPES_MAP["timestamp without time zone"] = sa.TIMESTAMP + TYPES_MAP["timestamp with time zone"] = sa.TIMESTAMP # TODO: Can `ARRAY` be inherited from PostgreSQL's # `ARRAY`, to make type checking work? def as_generic(self): - return sqltypes.ARRAY + return sa.ARRAY _ObjectArray.as_generic = as_generic diff --git a/target_cratedb/sqlalchemy/vector.py b/target_cratedb/sqlalchemy/vector.py deleted file mode 100644 index 1fc1287..0000000 --- a/target_cratedb/sqlalchemy/vector.py +++ /dev/null @@ -1,140 +0,0 @@ -# TODO: Refactor to CrateDB SQLAlchemy dialect. -import typing as t - -import numpy as np -import numpy.typing as npt -import sqlalchemy as sa -from crate.client.sqlalchemy.compiler import CrateTypeCompiler -from crate.client.sqlalchemy.dialect import TYPES_MAP -from sqlalchemy import TypeDecorator -from sqlalchemy.sql import sqltypes - -__all__ = ["FloatVector"] - - -def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]: - # from `pgvector.utils` - # could be ndarray if already cast by lower-level driver - if value is None or isinstance(value, np.ndarray): - return value - - return np.array(value, dtype=np.float32) - - -def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]: - # from `pgvector.utils` - if value is None: - return value - - if isinstance(value, np.ndarray): - if value.ndim != 1: - raise ValueError("expected ndim to be 1") - - if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating): - raise ValueError("dtype must be numeric") - - value = value.tolist() - - if dim is not None and len(value) != dim: - raise ValueError("expected %d dimensions, not %d" % (dim, len(value))) - - return value - - -class FloatVector(TypeDecorator[t.Sequence[float]]): - - """ - An improved implementation of the `FloatVector` data type for CrateDB, - compared to the previous implementation on behalf of the LangChain adapter. - - https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector - https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match - - The previous implementation, based on SQLAlchemy's `UserDefinedType`, didn't - respect the `python_type` property on backward/reverse resolution of types. - This was observed on Meltano's database connector machinery doing a - type cast, which led to a `NotImplementedError`. - - typing.cast(type, sql_type.python_type) => NotImplementedError - - The `UserDefinedType` approach is easier to implement, because it doesn't - need compiler support. - - To get full SQLAlchemy type support, including support for forward- and - backward resolution / type casting, the custom data type should derive - from SQLAlchemy's `TypeEngine` base class instead. - - When deriving from `TypeEngine`, you will need to set the `__visit_name__` - attribute, and add a corresponding visitor method to the `CrateTypeCompiler`, - in this case, `visit_FLOAT_VECTOR`. - - Now, rendering a DDL succeeds. However, when reflecting the DDL schema back, - it doesn't work until you will establish a corresponding reverse type mapping. - - By invoking `SELECT DISTINCT(data_type) FROM information_schema.columns;`, - you will find out that the internal type name is `float_vector`, so you - announce it to the dialect using `TYPES_MAP["float_vector"] = FloatVector`. - - Still not there: `NotImplementedError: Default TypeEngine.as_generic() heuristic - method was unsuccessful for target_cratedb.sqlalchemy.vector.FloatVector. A - custom as_generic() method must be implemented for this type class.` - - So, as it signals that the type implementation also needs an `as_generic` - property, let's supply one, returning `sqltypes.ARRAY`. - - It looks like, in exchange to those improvements, the `get_col_spec` - method is not needed any longer. - - TODO: Would it be a good idea to derive from SQLAlchemy's - `ARRAY` right away, to get a few of the features without - the need to redefine them? - - Please note the outcome of this analysis and the corresponding implementation - has been derived from empirical observations, and from the feeling that we also - lack corresponding support on the other special data types of CrateDB (ARRAY and - OBJECT) within the SQLAlchemy dialect, i.e. "that something must be wrong or - incomplete". In this spirit, it is advisable to review and improve their - implementations correspondingly. - """ - - cache_ok = False - - __visit_name__ = "FLOAT_VECTOR" - - _is_array = True - - zero_indexes = False - - impl = sa.ARRAY - - def __init__(self, dimensions: int = None): - super().__init__(sa.FLOAT, dimensions=dimensions) - - def as_generic(self): - return sqltypes.ARRAY - - def bind_processor(self, dialect: sa.Dialect) -> t.Callable: - def process(value: t.Iterable) -> t.Optional[t.List]: - return to_db(value, self.dimensions) - - return process - - def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable: - def process(value: t.Any) -> t.Optional[npt.ArrayLike]: - return from_db(value) - - return process - - -# Accompanies the type definition for reverse type lookups. -TYPES_MAP["float_vector"] = FloatVector - - -def visit_FLOAT_VECTOR(self, type_, **kw): - dimensions = type_.dimensions - if dimensions is None: - raise ValueError("FloatVector must be initialized with dimension size") - return f"FLOAT_VECTOR({dimensions})" - - -CrateTypeCompiler.visit_FLOAT_VECTOR = visit_FLOAT_VECTOR diff --git a/target_cratedb/tests/test_standard_target.py b/target_cratedb/tests/test_standard_target.py index 6849cf5..8e796a7 100644 --- a/target_cratedb/tests/test_standard_target.py +++ b/target_cratedb/tests/test_standard_target.py @@ -8,9 +8,10 @@ import jsonschema import pytest import sqlalchemy as sa -from crate.client.sqlalchemy.types import ObjectTypeImpl from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import sync_end_to_end +from sqlalchemy_cratedb.type import FloatVector +from sqlalchemy_cratedb.type.object import ObjectTypeImpl from target_postgres.tests.samples.aapl.aapl import Fundamentals from target_postgres.tests.samples.sample_tap_countries.countries_tap import ( SampleTapCountries, @@ -20,7 +21,6 @@ from target_cratedb.connector import CrateDBConnector from target_cratedb.sinks import MELTANO_CRATEDB_STRATEGY_DIRECT from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine -from target_cratedb.sqlalchemy.vector import FloatVector from target_cratedb.target import TargetCrateDB try: