From 109f89dd4e831600e4c553c4fad3abc1362e5a00 Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Mon, 20 Mar 2023 15:54:09 +0300 Subject: [PATCH 1/5] check reselt for every query in multiquery --- dbt/adapters/vertica/connections.py | 52 +++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/vertica/connections.py b/dbt/adapters/vertica/connections.py index 5c1a916..94513a6 100644 --- a/dbt/adapters/vertica/connections.py +++ b/dbt/adapters/vertica/connections.py @@ -55,7 +55,7 @@ class verticaCredentials(Credentials): # backup_server_node: Optional[str] = None # additional_info = { - # 'password': str, + # 'password': str, # 'backup_server_node': list# invalid value to be set in a connection string # } @@ -97,9 +97,9 @@ def open(cls, connection): 'connection_load_balance':credentials.connection_load_balance, 'session_label': f'dbt_{credentials.username}', 'retries': credentials.retries, - + 'backup_server_node':credentials.backup_server_node, - + } # if credentials.ssl.lower() in {'true', 'yes', 'please'}: @@ -119,7 +119,7 @@ def open(cls, connection): context = ssl.create_default_context() conn_info['ssl'] = context logger.debug(f'SSL is on') - + def connect(): handle = vertica_python.connect(**conn_info) logger.debug(f':P Connection work {handle}') @@ -127,8 +127,8 @@ def connect(): connection.handle = handle logger.debug(f':P Connected to database: {credentials.database} at {credentials.host} at {handle}') return handle - - + + except Exception as exc: @@ -184,6 +184,45 @@ def cancel(self, connection): logger.debug(':P Cancel query') connection.handle.cancel() + @classmethod + def get_result_from_cursor(cls, cursor: Any) -> agate.Table: + data: List[Any] = [] + column_names: List[str] = [] + + if cursor.description is not None: + column_names = [col[0] for col in cursor.description] + rows = cursor.fetchall() + + # check result for every query if there are some queries with ; separator + while cursor.nextset(): + check = cursor._message + if isinstance(check, ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + + data = cls.process_results(column_names, rows) + + return dbt.clients.agate_helper.table_from_data_flat(data, column_names) + + def execute( + self, sql: str, auto_begin: bool = False, fetch: bool = False + ) -> Tuple[AdapterResponse, agate.Table]: + sql = self._add_query_comment(sql) + _, cursor = self.add_query(sql, auto_begin) + response = self.get_response(cursor) + if fetch: + table = self.get_result_from_cursor(cursor) + else: + table = dbt.clients.agate_helper.empty_table() + while cursor.nextset(): + check = cursor._message + if isinstance(check, vertica_python.vertica.messages.ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + return response, table + @contextmanager def exception_handler(self, sql): @@ -197,4 +236,3 @@ def exception_handler(self, sql): logger.debug(f':P Error: {exc}') self.release() raise dbt.exceptions.RuntimeException(str(exc)) - From 39684f219218cafdd4bc6c4be974a0d14a5f527d Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Wed, 22 Mar 2023 14:18:52 +0300 Subject: [PATCH 2/5] remove additional table params for temp tables --- .../models/table/create_table_as.sql | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql index a2567ec..02e7c7f 100644 --- a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql @@ -9,7 +9,7 @@ {%- set partition_by_string = config.get('partition_by_string', default=none) -%} {%- set partition_by_group_by_string = config.get('partition_by_group_by_string', default=none) -%} {%- set partition_by_active_count = config.get('partition_by_active_count', default=none) -%} - + create {% if temporary: -%}local temporary{%- endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }} {% if temporary: -%}on commit preserve rows{%- endif %} @@ -17,31 +17,32 @@ {{ sql }} ) - {% if order_by is not none -%} - order by {{ order_by }} - {% endif -%} + {% if not temporary: %} + {% if order_by is not none -%} + order by {{ order_by }} + {% endif -%} - {% if segmented_by_string is not none -%} - segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %} - {% endif %} + {% if segmented_by_string is not none -%} + segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %} + {% endif %} - {% if no_segmentation =='True' or no_segmentation=='true' -%} - UNSEGMENTED ALL NODES - {% endif -%} + {% if no_segmentation =='True' or no_segmentation=='true' -%} + UNSEGMENTED ALL NODES + {% endif -%} - {% if ksafe is not none -%} - ksafe {{ ksafe }} - {% endif -%} - - {% if partition_by_string is not none -%} - ; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }} - {% if partition_by_string is not none and partition_by_group_by_string is not none -%} - group by {{ partition_by_group_by_string }} - {% endif %} - {% if partition_by_string is not none and partition_by_active_count is not none %} - SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }} + {% if ksafe is not none -%} + ksafe {{ ksafe }} + {% endif -%} + + {% if partition_by_string is not none -%} + ; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }} + {% if partition_by_string is not none and partition_by_group_by_string is not none -%} + group by {{ partition_by_group_by_string }} + {% endif %} + {% if partition_by_string is not none and partition_by_active_count is not none %} + SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }} + {% endif %} {% endif %} - {% endif %} + {% endif %} ; {% endmacro %} - From 18abf3888407832c02ed334c4d2967c652ad68df Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Wed, 22 Mar 2023 20:17:20 +0300 Subject: [PATCH 3/5] feature: add freshness --- .../vertica/macros/adapters/freshness.sql | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dbt/include/vertica/macros/adapters/freshness.sql b/dbt/include/vertica/macros/adapters/freshness.sql index 0f90672..54ed792 100644 --- a/dbt/include/vertica/macros/adapters/freshness.sql +++ b/dbt/include/vertica/macros/adapters/freshness.sql @@ -3,7 +3,15 @@ {%- endmacro %} -{% macro vertica__collect_freshness() -%} - {{ exceptions.raise_not_implemented( - 'collect_freshness macro not implemented for adapter '+adapter.type()) }} -{%- endmacro %} \ No newline at end of file +{% macro vertica__collect_freshness(source, loaded_at_field, filter) -%} + {% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%} + select + max({{ loaded_at_field }}) as max_loaded_at, + {{ current_timestamp() }} as snapshotted_at + from {{ source }} + {% if filter %} + where {{ filter }} + {% endif %} + {% endcall %} + {{ return(load_result('collect_freshness')) }} +{%- endmacro %} From 6ba39d5c910692991bb3cdb976f4888c31590800 Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Wed, 22 Mar 2023 20:54:30 +0300 Subject: [PATCH 4/5] feature: add table to return --- dbt/include/vertica/macros/adapters/freshness.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/vertica/macros/adapters/freshness.sql b/dbt/include/vertica/macros/adapters/freshness.sql index 54ed792..e145219 100644 --- a/dbt/include/vertica/macros/adapters/freshness.sql +++ b/dbt/include/vertica/macros/adapters/freshness.sql @@ -13,5 +13,5 @@ where {{ filter }} {% endif %} {% endcall %} - {{ return(load_result('collect_freshness')) }} + {{ return(load_result('collect_freshness').table) }} {%- endmacro %} From e97d50c48c47abeecd7ba8993e58abc799ab8c37 Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Fri, 16 Jun 2023 15:27:28 +0300 Subject: [PATCH 5/5] add reorganize for partitioned tables, remove doubles from if check --- .../materializations/models/table/create_table_as.sql | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql index 02e7c7f..6c772ec 100644 --- a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql @@ -35,13 +35,14 @@ {% endif -%} {% if partition_by_string is not none -%} - ; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }} - {% if partition_by_string is not none and partition_by_group_by_string is not none -%} - group by {{ partition_by_group_by_string }} + ; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} PARTITION BY {{ partition_by_string }} + {% if partition_by_group_by_string is not none -%} + GROUP BY {{ partition_by_group_by_string }} {% endif %} - {% if partition_by_string is not none and partition_by_active_count is not none %} + {% if partition_by_active_count is not none %} SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }} {% endif %} + ; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} REORGANIZE; {% endif %} {% endif %} ;