From ccbffd286850d6c9a6775dfae8306e0ba68324a0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 2 Oct 2024 22:43:38 +0200 Subject: [PATCH] Add `BulkResponse` wrapper for improved decoding of HTTP bulk responses CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling --- CHANGES.txt | 2 + src/crate/client/result.py | 66 +++++++ src/crate/client/test_result.py | 48 +++++ src/crate/client/test_support.py | 273 ++++++++++++++++++++++++++++ src/crate/client/tests.py | 300 ++----------------------------- 5 files changed, 405 insertions(+), 284 deletions(-) create mode 100644 src/crate/client/result.py create mode 100644 src/crate/client/test_result.py create mode 100644 src/crate/client/test_support.py diff --git a/CHANGES.txt b/CHANGES.txt index 4a0f0a48..781f84de 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,8 @@ Unreleased "Threads may share the module, but not connections." - Added ``error_trace`` to string representation of an Error to relay server stacktraces into exception messages. +- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk + responses including ``rowcount=`` items. .. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html .. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/ diff --git a/src/crate/client/result.py b/src/crate/client/result.py new file mode 100644 index 00000000..55ecb19e --- /dev/null +++ b/src/crate/client/result.py @@ -0,0 +1,66 @@ +import typing as t +from functools import cached_property + + +class BulkResultItem(t.TypedDict): + """ + Define the shape of a CrateDB bulk request response item. + """ + + rowcount: int + + +class BulkResponse: + """ + Manage CrateDB bulk request responses. + Accepts a list of bulk arguments (parameter list) and a list of bulk response items. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + """ + + def __init__( + self, + records: t.Union[t.Iterable[t.Dict[str, t.Any]], None], + results: t.Union[t.Iterable[BulkResultItem], None]): + self.records = records + self.results = results + + @cached_property + def failed_records(self) -> t.List[t.Dict[str, t.Any]]: + """ + Compute list of failed records. + + CrateDB signals failed inserts using `rowcount=-2`. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling + """ + if self.records is None or self.results is None: + return [] + errors: t.List[t.Dict[str, t.Any]] = [] + for record, status in zip(self.records, self.results): + if status["rowcount"] == -2: + errors.append(record) + return errors + + @cached_property + def record_count(self) -> int: + """ + Compute bulk size / length of parameter list. + """ + if not self.records: + return 0 + return len(self.records) + + @cached_property + def success_count(self) -> int: + """ + Compute number of succeeding records within a batch. + """ + return self.record_count - self.failed_count + + @cached_property + def failed_count(self) -> int: + """ + Compute number of failed records within a batch. + """ + return len(self.failed_records) diff --git a/src/crate/client/test_result.py b/src/crate/client/test_result.py new file mode 100644 index 00000000..b342d632 --- /dev/null +++ b/src/crate/client/test_result.py @@ -0,0 +1,48 @@ +import sys +import unittest + +from crate import client +from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline +from crate.testing.settings import crate_host + + +class BulkOperationTest(unittest.TestCase): + + def setUp(self): + setUpCrateLayerBaseline(self) + + def tearDown(self): + tearDownDropEntitiesBaseline(self) + + @unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") + def test_executemany_with_bulk_response(self): + + # Import at runtime is on purpose, to permit skipping. + from crate.client.result import BulkResponse + + connection = client.connect(crate_host) + cursor = connection.cursor() + + # Run SQL DDL and a first valid INSERT operation. + cursor.execute("CREATE TABLE foobar (id INTEGER);") + _ = cursor.executemany("INSERT INTO foobar (id) VALUES (?)", [(1, ), (2, ), (3, )]) + cursor.execute("REFRESH TABLE foobar;") + + # Run an invalid INSERT operation that will fail. + invalid_records = [(4, ), ("Hotzenplotz", )] + result = cursor.executemany("INSERT INTO foobar (id) VALUES (?)", invalid_records) + + # Verify CrateDB response. + self.assertEqual(result, [{"rowcount": -2}, {"rowcount": -2}]) + + # Verify decoded response. + bulk_response = BulkResponse(invalid_records, result) + self.assertEqual(bulk_response.failed_records, invalid_records) + + cursor.execute("REFRESH TABLE foobar;") + cursor.execute("SELECT * FROM foobar;") + result = cursor.fetchall() + self.assertEqual(sorted(result), [[1], [2], [3]]) + + cursor.close() + connection.close() diff --git a/src/crate/client/test_support.py b/src/crate/client/test_support.py new file mode 100644 index 00000000..f9d5b7ff --- /dev/null +++ b/src/crate/client/test_support.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from __future__ import absolute_import + +import json +import os +import socket +import unittest +from pprint import pprint +from http.server import HTTPServer, BaseHTTPRequestHandler +import ssl +import time +import threading +import logging + +import stopit + +from crate.testing.layer import CrateLayer +from crate.testing.settings import \ + crate_host, crate_path, crate_port, \ + crate_transport_port, docs_path, localhost +from crate.client import connect + + +makeSuite = unittest.TestLoader().loadTestsFromTestCase + +log = logging.getLogger('crate.testing.layer') +ch = logging.StreamHandler() +ch.setLevel(logging.ERROR) +log.addHandler(ch) + + +def cprint(s): + if isinstance(s, bytes): + s = s.decode('utf-8') + print(s) + + +settings = { + 'udc.enabled': 'false', + 'lang.js.enabled': 'true', + 'auth.host_based.enabled': 'true', + 'auth.host_based.config.0.user': 'crate', + 'auth.host_based.config.0.method': 'trust', + 'auth.host_based.config.98.user': 'trusted_me', + 'auth.host_based.config.98.method': 'trust', + 'auth.host_based.config.99.user': 'me', + 'auth.host_based.config.99.method': 'password', +} +crate_layer = None + + +def ensure_cratedb_layer(): + """ + In order to skip individual tests by manually disabling them within + `def test_suite()`, it is crucial make the test layer not run on each + and every occasion. So, things like this will be possible:: + + ./bin/test -vvvv --ignore_dir=testing + + TODO: Through a subsequent patch, the possibility to individually + unselect specific tests might be added to `def test_suite()` + on behalf of environment variables. + A blueprint for this kind of logic can be found at + https://github.com/crate/crate/commit/414cd833. + """ + global crate_layer + + if crate_layer is None: + crate_layer = CrateLayer('crate', + crate_home=crate_path(), + port=crate_port, + host=localhost, + transport_port=crate_transport_port, + settings=settings) + return crate_layer + + +def setUpCrateLayerBaseline(test): + if hasattr(test, "globs"): + test.globs['crate_host'] = crate_host + test.globs['pprint'] = pprint + test.globs['print'] = cprint + + with connect(crate_host) as conn: + cursor = conn.cursor() + + with open(docs_path('testing/testdata/mappings/locations.sql')) as s: + stmt = s.read() + cursor.execute(stmt) + stmt = ("select count(*) from information_schema.tables " + "where table_name = 'locations'") + cursor.execute(stmt) + assert cursor.fetchall()[0][0] == 1 + + data_path = docs_path('testing/testdata/data/test_a.json') + # load testing data into crate + cursor.execute("copy locations from ?", (data_path,)) + # refresh location table so imported data is visible immediately + cursor.execute("refresh table locations") + # create blob table + cursor.execute("create blob table myfiles clustered into 1 shards " + + "with (number_of_replicas=0)") + + # create users + cursor.execute("CREATE USER me WITH (password = 'my_secret_pw')") + cursor.execute("CREATE USER trusted_me") + + cursor.close() + + +def tearDownDropEntitiesBaseline(test): + """ + Drop all tables, views, and users created by `setUpWithCrateLayer*`. + """ + ddl_statements = [ + "DROP TABLE foobar", + "DROP TABLE locations", + "DROP BLOB TABLE myfiles", + "DROP USER me", + "DROP USER trusted_me", + ] + _execute_statements(ddl_statements) + + +class HttpsTestServerLayer: + PORT = 65534 + HOST = "localhost" + CERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), + "pki/server_valid.pem")) + CACERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), + "pki/cacert_valid.pem")) + + __name__ = "httpsserver" + __bases__ = tuple() + + class HttpsServer(HTTPServer): + def get_request(self): + + # Prepare SSL context. + context = ssl._create_unverified_context( + protocol=ssl.PROTOCOL_TLS_SERVER, + cert_reqs=ssl.CERT_OPTIONAL, + check_hostname=False, + purpose=ssl.Purpose.CLIENT_AUTH, + certfile=HttpsTestServerLayer.CERT_FILE, + keyfile=HttpsTestServerLayer.CERT_FILE, + cafile=HttpsTestServerLayer.CACERT_FILE) + + # Set minimum protocol version, TLSv1 and TLSv1.1 are unsafe. + context.minimum_version = ssl.TLSVersion.TLSv1_2 + + # Wrap TLS encryption around socket. + socket, client_address = HTTPServer.get_request(self) + socket = context.wrap_socket(socket, server_side=True) + + return socket, client_address + + class HttpsHandler(BaseHTTPRequestHandler): + + payload = json.dumps({"name": "test", "status": 200, }) + + def do_GET(self): + self.send_response(200) + payload = self.payload.encode('UTF-8') + self.send_header("Content-Length", len(payload)) + self.send_header("Content-Type", "application/json; charset=UTF-8") + self.end_headers() + self.wfile.write(payload) + + def setUp(self): + self.server = self.HttpsServer( + (self.HOST, self.PORT), + self.HttpsHandler + ) + thread = threading.Thread(target=self.serve_forever) + thread.daemon = True # quit interpreter when only thread exists + thread.start() + self.waitForServer() + + def serve_forever(self): + print("listening on", self.HOST, self.PORT) + self.server.serve_forever() + print("server stopped.") + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + + def isUp(self): + """ + Test if a host is up. + """ + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ex = s.connect_ex((self.HOST, self.PORT)) + s.close() + return ex == 0 + + def waitForServer(self, timeout=5): + """ + Wait for the host to be available. + """ + with stopit.ThreadingTimeout(timeout) as to_ctx_mgr: + while True: + if self.isUp(): + break + time.sleep(0.001) + + if not to_ctx_mgr: + raise TimeoutError("Could not properly start embedded webserver " + "within {} seconds".format(timeout)) + + +def setUpWithHttps(test): + test.globs['crate_host'] = "https://{0}:{1}".format( + HttpsTestServerLayer.HOST, HttpsTestServerLayer.PORT + ) + test.globs['pprint'] = pprint + test.globs['print'] = cprint + + test.globs['cacert_valid'] = os.path.abspath( + os.path.join(os.path.dirname(__file__), "pki/cacert_valid.pem") + ) + test.globs['cacert_invalid'] = os.path.abspath( + os.path.join(os.path.dirname(__file__), "pki/cacert_invalid.pem") + ) + test.globs['clientcert_valid'] = os.path.abspath( + os.path.join(os.path.dirname(__file__), "pki/client_valid.pem") + ) + test.globs['clientcert_invalid'] = os.path.abspath( + os.path.join(os.path.dirname(__file__), "pki/client_invalid.pem") + ) + + +def _execute_statements(statements, on_error="ignore"): + with connect(crate_host) as conn: + cursor = conn.cursor() + for stmt in statements: + _execute_statement(cursor, stmt, on_error=on_error) + cursor.close() + + +def _execute_statement(cursor, stmt, on_error="ignore"): + try: + cursor.execute(stmt) + except Exception: # pragma: no cover + # FIXME: Why does this croak on statements like ``DROP TABLE cities``? + # Note: When needing to debug the test environment, you may want to + # enable this logger statement. + # log.exception("Executing SQL statement failed") + if on_error == "ignore": + pass + elif on_error == "raise": + raise diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index 2f6be428..0e6a73a9 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -1,288 +1,14 @@ -# -*- coding: utf-8; -*- -# -# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor -# license agreements. See the NOTICE file distributed with this work for -# additional information regarding copyright ownership. Crate licenses -# this file to you under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. You may -# obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# However, if you have executed another commercial license agreement -# with Crate these terms will supersede the license and you may use the -# software solely pursuant to the terms of the relevant commercial agreement. - -from __future__ import absolute_import - -import json -import os -import socket -import unittest import doctest -from pprint import pprint -from http.server import HTTPServer, BaseHTTPRequestHandler -import ssl -import time -import threading -import logging - -import stopit - -from crate.testing.layer import CrateLayer -from crate.testing.settings import \ - crate_host, crate_path, crate_port, \ - crate_transport_port, docs_path, localhost -from crate.client import connect - -from .test_cursor import CursorTest -from .test_connection import ConnectionTest -from .test_http import ( - HttpClientTest, - ThreadSafeHttpClientTest, - KeepAliveClientTest, - ParamsTest, - RetryOnTimeoutServerTest, - RequestsCaBundleTest, - TestUsernameSentAsHeader, - TestCrateJsonEncoder, - TestDefaultSchemaHeader, -) - -makeSuite = unittest.TestLoader().loadTestsFromTestCase - -log = logging.getLogger('crate.testing.layer') -ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) -log.addHandler(ch) - - -def cprint(s): - if isinstance(s, bytes): - s = s.decode('utf-8') - print(s) - - -settings = { - 'udc.enabled': 'false', - 'lang.js.enabled': 'true', - 'auth.host_based.enabled': 'true', - 'auth.host_based.config.0.user': 'crate', - 'auth.host_based.config.0.method': 'trust', - 'auth.host_based.config.98.user': 'trusted_me', - 'auth.host_based.config.98.method': 'trust', - 'auth.host_based.config.99.user': 'me', - 'auth.host_based.config.99.method': 'password', -} -crate_layer = None - - -def ensure_cratedb_layer(): - """ - In order to skip individual tests by manually disabling them within - `def test_suite()`, it is crucial make the test layer not run on each - and every occasion. So, things like this will be possible:: - - ./bin/test -vvvv --ignore_dir=testing - - TODO: Through a subsequent patch, the possibility to individually - unselect specific tests might be added to `def test_suite()` - on behalf of environment variables. - A blueprint for this kind of logic can be found at - https://github.com/crate/crate/commit/414cd833. - """ - global crate_layer - - if crate_layer is None: - crate_layer = CrateLayer('crate', - crate_home=crate_path(), - port=crate_port, - host=localhost, - transport_port=crate_transport_port, - settings=settings) - return crate_layer - - -def setUpCrateLayerBaseline(test): - test.globs['crate_host'] = crate_host - test.globs['pprint'] = pprint - test.globs['print'] = cprint - - with connect(crate_host) as conn: - cursor = conn.cursor() - - with open(docs_path('testing/testdata/mappings/locations.sql')) as s: - stmt = s.read() - cursor.execute(stmt) - stmt = ("select count(*) from information_schema.tables " - "where table_name = 'locations'") - cursor.execute(stmt) - assert cursor.fetchall()[0][0] == 1 - - data_path = docs_path('testing/testdata/data/test_a.json') - # load testing data into crate - cursor.execute("copy locations from ?", (data_path,)) - # refresh location table so imported data is visible immediately - cursor.execute("refresh table locations") - # create blob table - cursor.execute("create blob table myfiles clustered into 1 shards " + - "with (number_of_replicas=0)") - - # create users - cursor.execute("CREATE USER me WITH (password = 'my_secret_pw')") - cursor.execute("CREATE USER trusted_me") - - cursor.close() - - -def tearDownDropEntitiesBaseline(test): - """ - Drop all tables, views, and users created by `setUpWithCrateLayer*`. - """ - ddl_statements = [ - "DROP TABLE locations", - "DROP BLOB TABLE myfiles", - "DROP USER me", - "DROP USER trusted_me", - ] - _execute_statements(ddl_statements) - - -class HttpsTestServerLayer: - PORT = 65534 - HOST = "localhost" - CERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), - "pki/server_valid.pem")) - CACERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), - "pki/cacert_valid.pem")) - - __name__ = "httpsserver" - __bases__ = tuple() - - class HttpsServer(HTTPServer): - def get_request(self): - - # Prepare SSL context. - context = ssl._create_unverified_context( - protocol=ssl.PROTOCOL_TLS_SERVER, - cert_reqs=ssl.CERT_OPTIONAL, - check_hostname=False, - purpose=ssl.Purpose.CLIENT_AUTH, - certfile=HttpsTestServerLayer.CERT_FILE, - keyfile=HttpsTestServerLayer.CERT_FILE, - cafile=HttpsTestServerLayer.CACERT_FILE) - - # Set minimum protocol version, TLSv1 and TLSv1.1 are unsafe. - context.minimum_version = ssl.TLSVersion.TLSv1_2 - - # Wrap TLS encryption around socket. - socket, client_address = HTTPServer.get_request(self) - socket = context.wrap_socket(socket, server_side=True) - - return socket, client_address - - class HttpsHandler(BaseHTTPRequestHandler): - - payload = json.dumps({"name": "test", "status": 200, }) - - def do_GET(self): - self.send_response(200) - payload = self.payload.encode('UTF-8') - self.send_header("Content-Length", len(payload)) - self.send_header("Content-Type", "application/json; charset=UTF-8") - self.end_headers() - self.wfile.write(payload) - - def setUp(self): - self.server = self.HttpsServer( - (self.HOST, self.PORT), - self.HttpsHandler - ) - thread = threading.Thread(target=self.serve_forever) - thread.daemon = True # quit interpreter when only thread exists - thread.start() - self.waitForServer() - - def serve_forever(self): - print("listening on", self.HOST, self.PORT) - self.server.serve_forever() - print("server stopped.") - - def tearDown(self): - self.server.shutdown() - self.server.server_close() - - def isUp(self): - """ - Test if a host is up. - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ex = s.connect_ex((self.HOST, self.PORT)) - s.close() - return ex == 0 - - def waitForServer(self, timeout=5): - """ - Wait for the host to be available. - """ - with stopit.ThreadingTimeout(timeout) as to_ctx_mgr: - while True: - if self.isUp(): - break - time.sleep(0.001) - - if not to_ctx_mgr: - raise TimeoutError("Could not properly start embedded webserver " - "within {} seconds".format(timeout)) - - -def setUpWithHttps(test): - test.globs['crate_host'] = "https://{0}:{1}".format( - HttpsTestServerLayer.HOST, HttpsTestServerLayer.PORT - ) - test.globs['pprint'] = pprint - test.globs['print'] = cprint - - test.globs['cacert_valid'] = os.path.abspath( - os.path.join(os.path.dirname(__file__), "pki/cacert_valid.pem") - ) - test.globs['cacert_invalid'] = os.path.abspath( - os.path.join(os.path.dirname(__file__), "pki/cacert_invalid.pem") - ) - test.globs['clientcert_valid'] = os.path.abspath( - os.path.join(os.path.dirname(__file__), "pki/client_valid.pem") - ) - test.globs['clientcert_invalid'] = os.path.abspath( - os.path.join(os.path.dirname(__file__), "pki/client_invalid.pem") - ) - - -def _execute_statements(statements, on_error="ignore"): - with connect(crate_host) as conn: - cursor = conn.cursor() - for stmt in statements: - _execute_statement(cursor, stmt, on_error=on_error) - cursor.close() - +import unittest -def _execute_statement(cursor, stmt, on_error="ignore"): - try: - cursor.execute(stmt) - except Exception: # pragma: no cover - # FIXME: Why does this croak on statements like ``DROP TABLE cities``? - # Note: When needing to debug the test environment, you may want to - # enable this logger statement. - # log.exception("Executing SQL statement failed") - if on_error == "ignore": - pass - elif on_error == "raise": - raise +from crate.client.test_connection import ConnectionTest +from crate.client.test_cursor import CursorTest +from crate.client.test_http import HttpClientTest, KeepAliveClientTest, ThreadSafeHttpClientTest, ParamsTest, \ + RetryOnTimeoutServerTest, RequestsCaBundleTest, TestUsernameSentAsHeader, TestCrateJsonEncoder, \ + TestDefaultSchemaHeader +from crate.client.test_result import BulkOperationTest +from crate.client.test_support import makeSuite, setUpWithHttps, HttpsTestServerLayer, setUpCrateLayerBaseline, \ + tearDownDropEntitiesBaseline, ensure_cratedb_layer def test_suite(): @@ -324,6 +50,12 @@ def test_suite(): suite.addTest(s) # Integration tests. + layer = ensure_cratedb_layer() + + s = makeSuite(BulkOperationTest) + s.layer = layer + suite.addTest(s) + s = doctest.DocFileSuite( 'docs/by-example/http.rst', 'docs/by-example/client.rst', @@ -334,7 +66,7 @@ def test_suite(): optionflags=flags, encoding='utf-8' ) - s.layer = ensure_cratedb_layer() + s.layer = layer suite.addTest(s) return suite